1use std::{
11 sync::{
12 Arc,
13 Weak,
14 },
15 thread,
16 time::Instant,
17};
18
19use qubit_atomic::Atomic;
20use qubit_function::{
21 Callable,
22 Runnable,
23};
24
25use crate::{
26 TaskHandle,
27 service::{
28 ExecutorService,
29 ExecutorServiceBuilderError,
30 ExecutorServiceLifecycle,
31 StopReport,
32 SubmissionError,
33 },
34 task::spi::TaskEndpointPair,
35};
36
37use super::{
38 completable_scheduled_task::CompletableScheduledTask,
39 scheduled_executor_service::ScheduledExecutorService,
40 scheduled_task::ScheduledTask,
41 scheduled_task_entry::ScheduledTaskEntry,
42 scheduled_task_handle::ScheduledTaskHandle,
43 scheduled_worker::ScheduledWorker,
44 single_thread_scheduled_executor_service_inner::SingleThreadScheduledExecutorServiceInner,
45};
46
47pub struct SingleThreadScheduledExecutorService {
54 inner: Arc<SingleThreadScheduledExecutorServiceInner>,
56}
57
58impl SingleThreadScheduledExecutorService {
59 #[inline]
74 pub fn new(thread_name: &str) -> Result<Self, ExecutorServiceBuilderError> {
75 Self::with_stack_size(thread_name, None)
76 }
77
78 pub fn with_stack_size(thread_name: &str, stack_size: Option<usize>) -> Result<Self, ExecutorServiceBuilderError> {
94 let inner = Arc::new(SingleThreadScheduledExecutorServiceInner::new());
95 let worker_inner = Arc::clone(&inner);
96 let mut builder = thread::Builder::new().name(thread_name.to_string());
97 if let Some(stack_size) = stack_size {
98 builder = builder.stack_size(stack_size);
99 }
100 if let Err(source) = builder.spawn(move || ScheduledWorker::run(worker_inner)) {
101 return Err(ExecutorServiceBuilderError::SpawnWorker { index: Some(0), source });
102 }
103 Ok(Self { inner })
104 }
105
106 #[inline]
112 pub fn queued_count(&self) -> usize {
113 self.inner.queued_count()
114 }
115
116 #[inline]
122 pub fn running_count(&self) -> usize {
123 self.inner.running_count()
124 }
125
126 fn cancellation_callback(&self) -> Arc<dyn Fn() + Send + Sync + 'static> {
133 let inner = Arc::downgrade(&self.inner);
134 Arc::new(move || finish_queued_cancellation(&inner))
135 }
136
137 fn schedule_entry(&self, deadline: Instant, entry: Box<dyn ScheduledTaskEntry>) -> Result<(), SubmissionError> {
148 let mut state = self.inner.state.lock();
149 if state.lifecycle != ExecutorServiceLifecycle::Running {
150 return Err(SubmissionError::Shutdown);
151 }
152 entry.accept();
153 let sequence = state.next_sequence;
154 state.next_sequence = state.next_sequence.wrapping_add(1);
155 state.tasks.push(ScheduledTask::new(deadline, sequence, entry));
156 self.inner.add_queued_task();
157 self.inner.state.notify_all();
158 Ok(())
159 }
160
161 fn schedule_result_handle<C, R, E>(&self, deadline: Instant, task: C) -> Result<TaskHandle<R, E>, SubmissionError>
176 where
177 C: Callable<R, E> + Send + 'static,
178 R: Send + 'static,
179 E: Send + 'static,
180 {
181 let (handle, slot) = TaskEndpointPair::new().into_parts();
182 let cancelled = Arc::new(Atomic::new(false));
183 let entry = CompletableScheduledTask::new(task, slot, cancelled);
184 self.schedule_entry(deadline, Box::new(entry))?;
185 Ok(handle)
186 }
187}
188
189impl Drop for SingleThreadScheduledExecutorService {
190 fn drop(&mut self) {
192 self.inner.shutdown();
193 }
194}
195
196impl ExecutorService for SingleThreadScheduledExecutorService {
197 type ResultHandle<R, E>
198 = TaskHandle<R, E>
199 where
200 R: Send + 'static,
201 E: Send + 'static;
202
203 type TrackedHandle<R, E>
204 = ScheduledTaskHandle<R, E>
205 where
206 R: Send + 'static,
207 E: Send + 'static;
208
209 fn submit<T, E>(&self, task: T) -> Result<(), SubmissionError>
211 where
212 T: Runnable<E> + Send + 'static,
213 E: Send + 'static,
214 {
215 let mut task = task;
216 let handle = self.submit_callable(move || task.run())?;
217 drop(handle);
218 Ok(())
219 }
220
221 fn submit_callable<C, R, E>(&self, task: C) -> Result<Self::ResultHandle<R, E>, SubmissionError>
223 where
224 C: Callable<R, E> + Send + 'static,
225 R: Send + 'static,
226 E: Send + 'static,
227 {
228 self.schedule_result_handle(Instant::now(), task)
229 }
230
231 fn submit_tracked_callable<C, R, E>(&self, task: C) -> Result<Self::TrackedHandle<R, E>, SubmissionError>
233 where
234 C: Callable<R, E> + Send + 'static,
235 R: Send + 'static,
236 E: Send + 'static,
237 {
238 self.schedule_callable_at(Instant::now(), task)
239 }
240
241 #[inline]
243 fn shutdown(&self) {
244 self.inner.shutdown();
245 }
246
247 #[inline]
249 fn stop(&self) -> StopReport {
250 self.inner.stop()
251 }
252
253 #[inline]
255 fn lifecycle(&self) -> ExecutorServiceLifecycle {
256 self.inner.lifecycle()
257 }
258
259 #[inline]
261 fn is_not_running(&self) -> bool {
262 self.inner.is_not_running()
263 }
264
265 #[inline]
267 fn is_terminated(&self) -> bool {
268 self.inner.is_terminated()
269 }
270
271 #[inline]
273 fn wait_termination(&self) {
274 self.inner.wait_for_termination();
275 }
276}
277
278impl ScheduledExecutorService for SingleThreadScheduledExecutorService {
279 fn schedule_callable_at<C, R, E>(
281 &self,
282 instant: Instant,
283 task: C,
284 ) -> Result<Self::TrackedHandle<R, E>, SubmissionError>
285 where
286 C: Callable<R, E> + Send + 'static,
287 R: Send + 'static,
288 E: Send + 'static,
289 {
290 let (tracked, slot) = TaskEndpointPair::new().into_tracked_parts();
291 let cancellation_marker = Arc::new(Atomic::new(false));
292 let entry = CompletableScheduledTask::new(task, slot, Arc::clone(&cancellation_marker));
293 self.schedule_entry(instant, Box::new(entry))?;
294 Ok(ScheduledTaskHandle::new(
295 tracked,
296 cancellation_marker,
297 self.cancellation_callback(),
298 ))
299 }
300}
301
302fn finish_queued_cancellation(inner: &Weak<SingleThreadScheduledExecutorServiceInner>) {
308 if let Some(inner) = inner.upgrade() {
309 inner.finish_queued_cancellation();
310 }
311}