qubit_task/service/task_execution_service.rs
1/*******************************************************************************
2 *
3 * Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 * SPDX-License-Identifier: Apache-2.0
6 *
7 * Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10use std::{
11 collections::HashMap,
12 panic::{
13 AssertUnwindSafe,
14 catch_unwind,
15 resume_unwind,
16 },
17 sync::{
18 Arc,
19 Condvar,
20 Mutex,
21 MutexGuard,
22 },
23};
24
25use qubit_function::{
26 Callable,
27 Runnable,
28};
29
30use qubit_executor::TaskHandle;
31use qubit_executor::service::{
32 ExecutorService,
33 StopReport,
34};
35use qubit_executor::task::spi::{
36 TaskEndpointPair,
37 TaskSlot,
38};
39use qubit_thread_pool::{
40 ExecutorServiceBuilderError,
41 PoolJob,
42 ThreadPool,
43};
44
45use super::{
46 task_execution_service_builder::TaskExecutionServiceBuilder,
47 task_execution_service_error::TaskExecutionServiceError,
48 task_execution_stats::TaskExecutionStats,
49 task_id::TaskId,
50 task_status::TaskStatus,
51};
52
53/// Managed task execution service built on [`ThreadPool`].
54///
55/// Assigns a stable business [`TaskId`] per task and tracks service-level status
56/// (submitted, running, succeeded, failed, cancelled, panicked). The typed task
57/// outcome is still retrieved through [`TaskHandle`].
58///
59/// # Responsibilities
60///
61/// - **Registry**: The same [`TaskId`] cannot be submitted again while a record
62/// for it exists; a duplicate returns [`TaskExecutionServiceError::DuplicateTask`].
63/// Use this when you need lookup by ID, optional pre-start cancellation, or
64/// long-lived task bookkeeping.
65/// - **Thread pool**: Owns a [`ThreadPool`] for queuing and worker threads; queue
66/// internals are not exposed. Configure the pool via [`TaskExecutionServiceBuilder`]
67/// or [`Self::builder`].
68/// - **Submission semantics**: [`Self::submit`] / [`Self::submit_callable`] returning
69/// `Ok(handle)` means only that the **service accepted** the task—not that it
70/// started or succeeded. Observe the final result with [`TaskHandle::get`] or by
71/// awaiting the handle’s [`Future`](std::future::Future) implementation.
72///
73/// # Suspend
74///
75/// [`Self::suspend`] rejects **new** submissions ([`TaskExecutionServiceError::Suspended`]).
76/// Tasks already queued or running are unaffected. [`Self::resume`] re-enables submission.
77///
78/// # Cancel
79///
80/// [`Self::cancel`] may succeed only **before** the task starts running; once running,
81/// cancellation behavior follows [`TaskHandle`] and the internal completion protocol.
82///
83/// # Shutdown
84///
85/// [`Self::shutdown`] and [`Self::stop`] delegate to the backing pool.
86/// [`Self::wait_termination`] blocks the current thread until all accepted work
87/// has completed, failed, panicked, or been cancelled.
88///
89/// # Example: submit, inspect status, wait for idle, shutdown
90///
91/// ```
92/// use std::error::Error;
93/// use qubit_task::service::{TaskExecutionService, TaskId, TaskStatus};
94///
95/// fn main() -> Result<(), Box<dyn Error>> {
96/// let service = TaskExecutionService::new()?;
97/// let id: TaskId = 1001;
98///
99/// let handle = service.submit(id, || Ok::<(), ()>(()))?;
100/// handle.get().unwrap();
101///
102/// assert_eq!(service.status(id), Some(TaskStatus::Succeeded));
103///
104/// service.await_idle();
105/// service.shutdown();
106/// Ok(())
107/// }
108/// ```
109///
110pub struct TaskExecutionService {
111 pool: ThreadPool,
112 state: Arc<TaskExecutionServiceState>,
113}
114
115impl TaskExecutionService {
116 /// Creates a service using the default [`super::ThreadPoolBuilder`] settings (worker
117 /// counts, queue, and other defaults match [`ThreadPool::builder`]).
118 ///
119 /// # Example
120 ///
121 /// ```
122 /// use qubit_task::service::{TaskExecutionService, ExecutorServiceBuilderError};
123 ///
124 /// fn main() -> Result<(), ExecutorServiceBuilderError> {
125 /// let _service = TaskExecutionService::new()?;
126 /// Ok(())
127 /// }
128 /// ```
129 ///
130 /// # Returns
131 ///
132 /// `Ok(Self)` on success, or [`ExecutorServiceBuilderError`] if the pool cannot be built.
133 pub fn new() -> Result<Self, ExecutorServiceBuilderError> {
134 Self::builder().build()
135 }
136
137 /// Returns a [`TaskExecutionServiceBuilder`] so you can tune the backing pool
138 /// before [`TaskExecutionServiceBuilder::build`] (for example
139 /// [`super::ThreadPoolBuilder::pool_size`],
140 /// [`super::ThreadPoolBuilder::queue_capacity`]).
141 ///
142 /// # Example
143 ///
144 /// ```
145 /// use qubit_task::service::{
146 /// TaskExecutionService, ThreadPoolBuilder, ExecutorServiceBuilderError,
147 /// };
148 ///
149 /// fn main() -> Result<(), ExecutorServiceBuilderError> {
150 /// let _service = TaskExecutionService::builder()
151 /// .thread_pool(ThreadPoolBuilder::default().pool_size(8))
152 /// .build()?;
153 /// Ok(())
154 /// }
155 /// ```
156 ///
157 /// # Returns
158 ///
159 /// A builder holding the default [`super::ThreadPoolBuilder`].
160 #[inline]
161 pub fn builder() -> TaskExecutionServiceBuilder {
162 TaskExecutionServiceBuilder::default()
163 }
164
165 /// Builds a service from an already constructed pool.
166 pub(crate) fn from_thread_pool(pool: ThreadPool) -> Self {
167 Self {
168 pool,
169 state: Arc::new(TaskExecutionServiceState::default()),
170 }
171 }
172
173 /// Submits a runnable task with a business task ID.
174 ///
175 /// # Example
176 ///
177 /// ```
178 /// use std::error::Error;
179 /// use qubit_task::service::TaskExecutionService;
180 ///
181 /// fn main() -> Result<(), Box<dyn Error>> {
182 /// let service = TaskExecutionService::new()?;
183 /// let handle = service.submit(42_u64, || Ok::<(), ()>(()))?;
184 /// handle.get().unwrap();
185 /// Ok(())
186 /// }
187 /// ```
188 ///
189 /// # Parameters
190 ///
191 /// * `task_id` - Stable business ID for registry operations.
192 /// * `task` - Runnable to execute.
193 ///
194 /// # Returns
195 ///
196 /// `Ok(handle)` if the service accepts the task. This only means
197 /// acceptance; task success is observed through the handle. Returns
198 /// [`TaskExecutionServiceError`] when the ID is duplicated, the service is
199 /// suspended, or the backing pool rejects the task.
200 #[inline]
201 pub fn submit<T, E>(
202 &self,
203 task_id: TaskId,
204 mut task: T,
205 ) -> Result<TaskHandle<(), E>, TaskExecutionServiceError>
206 where
207 T: Runnable<E> + Send + 'static,
208 E: Send + 'static,
209 {
210 self.submit_callable(task_id, move || task.run())
211 }
212
213 /// Submits a callable task with a business task ID.
214 ///
215 /// # Example
216 ///
217 /// ```
218 /// use std::error::Error;
219 /// use qubit_task::service::{TaskExecutionService, TaskId};
220 ///
221 /// fn main() -> Result<(), Box<dyn Error>> {
222 /// let service = TaskExecutionService::new()?;
223 /// let id: TaskId = 7;
224 /// let handle = service.submit_callable(id, || Ok::<i32, ()>(21))?;
225 /// assert_eq!(handle.get().unwrap(), 21);
226 /// Ok(())
227 /// }
228 /// ```
229 ///
230 /// # Parameters
231 ///
232 /// * `task_id` - Stable business ID for registry operations.
233 /// * `task` - Callable to execute.
234 ///
235 /// # Returns
236 ///
237 /// `Ok(handle)` if the service accepts the task. The handle reports the
238 /// typed task result while this service records only service-level status.
239 pub fn submit_callable<C, R, E>(
240 &self,
241 task_id: TaskId,
242 task: C,
243 ) -> Result<TaskHandle<R, E>, TaskExecutionServiceError>
244 where
245 C: Callable<R, E> + Send + 'static,
246 R: Send + 'static,
247 E: Send + 'static,
248 {
249 let (handle, slot) = TaskEndpointPair::new().into_parts();
250 let slot = Arc::new(Mutex::new(Some(slot)));
251 let accept_slot = Arc::clone(&slot);
252 let cancel_slot = Arc::clone(&slot);
253 let run_slot = Arc::clone(&slot);
254 let cancel_state = Arc::clone(&self.state);
255 let cancel: Arc<dyn Fn() -> bool + Send + Sync> = Arc::new(move || {
256 let slot = cancel_slot
257 .lock()
258 .expect("task slot lock should not be poisoned")
259 .take();
260 let cancelled = slot.is_some_and(TaskSlot::cancel_unstarted);
261 if cancelled {
262 cancel_state.set_status(task_id, TaskStatus::Cancelled);
263 }
264 cancelled
265 });
266
267 self.state.register(task_id, Arc::clone(&cancel))?;
268
269 let run_state = Arc::clone(&self.state);
270 let cancel_for_job = Arc::clone(&cancel);
271 let job = PoolJob::with_accept(
272 Box::new(move || {
273 if let Some(slot) = accept_slot
274 .lock()
275 .expect("task slot lock should not be poisoned")
276 .as_ref()
277 {
278 slot.accept();
279 }
280 }),
281 Box::new(move || {
282 let slot = run_slot
283 .lock()
284 .expect("task slot lock should not be poisoned")
285 .take();
286 if let Some(slot) = slot {
287 let task = StatusReportingTask {
288 task_id,
289 task,
290 state: run_state,
291 };
292 if !slot.run(task) {
293 cancel_for_job();
294 }
295 }
296 }),
297 Box::new(move || {
298 cancel();
299 }),
300 );
301
302 if let Err(error) = self.pool.submit_job(job) {
303 self.state.remove(task_id);
304 return Err(error.into());
305 }
306 Ok(handle)
307 }
308
309 /// Attempts to cancel a submitted task by ID.
310 ///
311 /// Cancellation succeeds only before the task starts running.
312 ///
313 /// # Example
314 ///
315 /// ```
316 /// use std::error::Error;
317 /// use qubit_task::service::{TaskExecutionService, TaskId};
318 ///
319 /// fn main() -> Result<(), Box<dyn Error>> {
320 /// let service = TaskExecutionService::new()?;
321 /// let id: TaskId = 1;
322 /// let handle = service.submit(id, || Ok::<(), ()>(()))?;
323 /// // `true` only if cancelled before a worker starts the task (race with the pool).
324 /// let _cancelled = service.cancel(id);
325 /// match handle.get() {
326 /// Ok(()) => {}
327 /// Err(e) if e.is_cancelled() => {}
328 /// Err(e) => panic!("unexpected task outcome: {e:?}"),
329 /// }
330 /// Ok(())
331 /// }
332 /// ```
333 ///
334 /// # Parameters
335 ///
336 /// * `task_id` - ID of the task to cancel.
337 ///
338 /// # Returns
339 ///
340 /// `true` if the task was cancelled before start, or `false` if no active
341 /// task with this ID can be cancelled.
342 pub fn cancel(&self, task_id: TaskId) -> bool {
343 let cancel = self.state.cancel_callback(task_id);
344 cancel.is_some_and(|cancel| cancel())
345 }
346
347 /// Returns the current status of a task.
348 ///
349 /// # Example
350 ///
351 /// ```
352 /// use std::error::Error;
353 /// use qubit_task::service::{TaskExecutionService, TaskId, TaskStatus};
354 ///
355 /// fn main() -> Result<(), Box<dyn Error>> {
356 /// let service = TaskExecutionService::new()?;
357 /// let id: TaskId = 10;
358 /// let handle = service.submit(id, || Ok::<(), ()>(()))?;
359 /// handle.get().unwrap();
360 /// assert_eq!(service.status(id), Some(TaskStatus::Succeeded));
361 /// Ok(())
362 /// }
363 /// ```
364 ///
365 /// # Parameters
366 ///
367 /// * `task_id` - ID of the task to inspect.
368 ///
369 /// # Returns
370 ///
371 /// `Some(status)` if the service retains a record for this ID, or `None`
372 /// if the ID is unknown.
373 #[inline]
374 pub fn status(&self, task_id: TaskId) -> Option<TaskStatus> {
375 self.state.status(task_id)
376 }
377
378 /// Returns registry-derived task statistics.
379 ///
380 /// # Example
381 ///
382 /// ```
383 /// use std::error::Error;
384 /// use qubit_task::service::TaskExecutionService;
385 ///
386 /// fn main() -> Result<(), Box<dyn Error>> {
387 /// let service = TaskExecutionService::new()?;
388 /// let handle = service.submit(1_u64, || Ok::<(), ()>(()))?;
389 /// handle.get().unwrap();
390 /// let snapshot = service.stats();
391 /// assert!(snapshot.total >= 1);
392 /// Ok(())
393 /// }
394 /// ```
395 ///
396 /// # Returns
397 ///
398 /// A snapshot of retained task records grouped by status.
399 #[inline]
400 pub fn stats(&self) -> TaskExecutionStats {
401 self.state.stats()
402 }
403
404 /// Suspends new submissions.
405 ///
406 /// Existing submitted and running tasks continue normally.
407 ///
408 /// # Example
409 ///
410 /// ```
411 /// use qubit_task::service::{TaskExecutionService, ExecutorServiceBuilderError};
412 ///
413 /// fn main() -> Result<(), ExecutorServiceBuilderError> {
414 /// let service = TaskExecutionService::new()?;
415 /// service.suspend();
416 /// assert!(service.is_suspended());
417 /// service.resume();
418 /// assert!(!service.is_suspended());
419 /// Ok(())
420 /// }
421 /// ```
422 #[inline]
423 pub fn suspend(&self) {
424 self.state.set_suspended(true);
425 }
426
427 /// Resumes accepting new submissions.
428 #[inline]
429 pub fn resume(&self) {
430 self.state.set_suspended(false);
431 }
432
433 /// Returns whether the service is suspended.
434 ///
435 /// # Returns
436 ///
437 /// `true` if new submissions are rejected before reaching the pool.
438 #[inline]
439 pub fn is_suspended(&self) -> bool {
440 self.state.is_suspended()
441 }
442
443 /// Waits for the active task snapshot observed at call time to finish.
444 ///
445 /// Tasks submitted after this method starts are not part of the waited
446 /// snapshot. This method blocks the current thread.
447 ///
448 /// # Example
449 ///
450 /// ```
451 /// use std::error::Error;
452 /// use qubit_task::service::{TaskExecutionService, TaskId};
453 ///
454 /// fn main() -> Result<(), Box<dyn Error>> {
455 /// let service = TaskExecutionService::new()?;
456 /// let a: TaskId = 1;
457 /// let b: TaskId = 2;
458 /// let h1 = service.submit(a, || Ok::<(), ()>(()))?;
459 /// let h2 = service.submit(b, || Ok::<(), ()>(()))?;
460 /// service.await_in_flight_tasks_completion();
461 /// h1.get().unwrap();
462 /// h2.get().unwrap();
463 /// Ok(())
464 /// }
465 /// ```
466 pub fn await_in_flight_tasks_completion(&self) {
467 self.state.await_in_flight_tasks_completion();
468 }
469
470 /// Waits until the service registry has no submitted or running tasks.
471 ///
472 /// This method blocks the current thread and observes real-time idleness.
473 ///
474 /// # Example
475 ///
476 /// ```
477 /// use std::error::Error;
478 /// use qubit_task::service::{TaskExecutionService, TaskId};
479 ///
480 /// fn main() -> Result<(), Box<dyn Error>> {
481 /// let service = TaskExecutionService::new()?;
482 /// let id: TaskId = 1;
483 /// let handle = service.submit(id, || Ok::<(), ()>(()))?;
484 /// handle.get().unwrap();
485 /// service.await_idle();
486 /// Ok(())
487 /// }
488 /// ```
489 pub fn await_idle(&self) {
490 self.state.await_idle();
491 }
492
493 /// Initiates graceful shutdown of the backing pool.
494 ///
495 /// # Example
496 ///
497 /// ```
498 /// use qubit_task::service::{TaskExecutionService, ExecutorServiceBuilderError};
499 ///
500 /// fn main() -> Result<(), ExecutorServiceBuilderError> {
501 /// let service = TaskExecutionService::new()?;
502 /// service.shutdown();
503 /// assert!(service.is_not_running());
504 /// Ok(())
505 /// }
506 /// ```
507 #[inline]
508 pub fn shutdown(&self) {
509 self.pool.shutdown();
510 }
511
512 /// Initiates immediate stop of the backing pool.
513 ///
514 /// # Example
515 ///
516 /// ```
517 /// use qubit_task::service::{TaskExecutionService, ExecutorServiceBuilderError};
518 ///
519 /// fn main() -> Result<(), ExecutorServiceBuilderError> {
520 /// let service = TaskExecutionService::new()?;
521 /// let _report = service.stop();
522 /// Ok(())
523 /// }
524 /// ```
525 ///
526 /// # Returns
527 ///
528 /// A count-based report from the backing pool.
529 #[inline]
530 pub fn stop(&self) -> StopReport {
531 self.pool.stop()
532 }
533
534 /// Returns whether the backing pool no longer accepts new work.
535 #[inline]
536 pub fn is_not_running(&self) -> bool {
537 self.pool.is_not_running()
538 }
539
540 /// Returns whether the backing pool has terminated.
541 #[inline]
542 pub fn is_terminated(&self) -> bool {
543 self.pool.is_terminated()
544 }
545
546 /// Blocks until the backing pool has terminated.
547 ///
548 /// # Example
549 ///
550 /// ```
551 /// use qubit_task::service::{TaskExecutionService, ExecutorServiceBuilderError};
552 ///
553 /// fn main() -> Result<(), ExecutorServiceBuilderError> {
554 /// let service = TaskExecutionService::new()?;
555 /// service.shutdown();
556 /// service.wait_termination();
557 /// assert!(service.is_terminated());
558 /// Ok(())
559 /// }
560 /// ```
561 ///
562 /// # Returns
563 ///
564 /// Returns after shutdown and worker exit.
565 #[inline]
566 pub fn wait_termination(&self) {
567 self.pool.wait_termination();
568 }
569
570 /// Returns the backing thread pool.
571 ///
572 /// # Example
573 ///
574 /// ```
575 /// use qubit_task::service::{TaskExecutionService, ExecutorServiceBuilderError};
576 ///
577 /// fn main() -> Result<(), ExecutorServiceBuilderError> {
578 /// let service = TaskExecutionService::new()?;
579 /// let pool = service.thread_pool();
580 /// assert!(pool.maximum_pool_size() > 0);
581 /// Ok(())
582 /// }
583 /// ```
584 ///
585 /// # Returns
586 ///
587 /// A shared reference for low-level inspection such as pool statistics.
588 #[inline]
589 pub fn thread_pool(&self) -> &ThreadPool {
590 &self.pool
591 }
592}
593
594/// Shared state for [`TaskExecutionService`].
595#[derive(Default)]
596struct TaskExecutionServiceState {
597 inner: Mutex<TaskExecutionServiceInner>,
598 idle: Condvar,
599}
600
601impl TaskExecutionServiceState {
602 /// Acquires service state.
603 fn lock_inner(&self) -> MutexGuard<'_, TaskExecutionServiceInner> {
604 self.inner
605 .lock()
606 .expect("task execution service state lock should not be poisoned")
607 }
608
609 /// Registers a submitted task.
610 fn register(
611 &self,
612 task_id: TaskId,
613 cancel: Arc<dyn Fn() -> bool + Send + Sync>,
614 ) -> Result<(), TaskExecutionServiceError> {
615 let mut inner = self.lock_inner();
616 if inner.suspended {
617 return Err(TaskExecutionServiceError::Suspended);
618 }
619 if inner.tasks.contains_key(&task_id) {
620 return Err(TaskExecutionServiceError::DuplicateTask(task_id));
621 }
622 inner.tasks.insert(
623 task_id,
624 TaskRecord {
625 status: TaskStatus::Submitted,
626 cancel,
627 },
628 );
629 Ok(())
630 }
631
632 /// Removes a task record.
633 fn remove(&self, task_id: TaskId) {
634 let mut inner = self.lock_inner();
635 inner.tasks.remove(&task_id);
636 self.idle.notify_all();
637 }
638
639 /// Gets a task status.
640 fn status(&self, task_id: TaskId) -> Option<TaskStatus> {
641 self.lock_inner()
642 .tasks
643 .get(&task_id)
644 .map(|record| record.status)
645 }
646
647 /// Gets a task cancel callback if the task is active.
648 fn cancel_callback(&self, task_id: TaskId) -> Option<Arc<dyn Fn() -> bool + Send + Sync>> {
649 let inner = self.lock_inner();
650 let record = inner.tasks.get(&task_id)?;
651 record
652 .status
653 .is_active()
654 .then(|| Arc::clone(&record.cancel))
655 }
656
657 /// Updates a task status.
658 fn set_status(&self, task_id: TaskId, status: TaskStatus) {
659 let mut inner = self.lock_inner();
660 let record = inner
661 .tasks
662 .get_mut(&task_id)
663 .expect("task status can only be updated for a registered task");
664 record.status = status;
665 self.idle.notify_all();
666 }
667
668 /// Updates suspended flag.
669 fn set_suspended(&self, suspended: bool) {
670 self.lock_inner().suspended = suspended;
671 }
672
673 /// Returns whether new submissions are suspended.
674 fn is_suspended(&self) -> bool {
675 self.lock_inner().suspended
676 }
677
678 /// Returns task statistics.
679 fn stats(&self) -> TaskExecutionStats {
680 let inner = self.lock_inner();
681 let mut stats = TaskExecutionStats::default();
682 for record in inner.tasks.values() {
683 stats.add_status(record.status);
684 }
685 stats
686 }
687
688 /// Waits for active task IDs observed at call time.
689 fn await_in_flight_tasks_completion(&self) {
690 let mut inner = self.lock_inner();
691 let task_ids = inner
692 .tasks
693 .iter()
694 .filter_map(|(&task_id, record)| record.status.is_active().then_some(task_id))
695 .collect::<Vec<_>>();
696 while task_ids
697 .iter()
698 .any(|task_id| inner.task_is_active(*task_id))
699 {
700 inner = self.wait_for_idle_notification(inner);
701 }
702 }
703
704 /// Waits until no retained task record is active.
705 fn await_idle(&self) {
706 let mut inner = self.lock_inner();
707 while inner.has_active_tasks() {
708 inner = self.wait_for_idle_notification(inner);
709 }
710 }
711
712 /// Waits for a state transition notification.
713 fn wait_for_idle_notification<'a>(
714 &self,
715 inner: MutexGuard<'a, TaskExecutionServiceInner>,
716 ) -> MutexGuard<'a, TaskExecutionServiceInner> {
717 self.idle
718 .wait(inner)
719 .expect("task execution service state lock should not be poisoned")
720 }
721}
722
723/// Mutable service state protected by a mutex.
724#[derive(Default)]
725struct TaskExecutionServiceInner {
726 suspended: bool,
727 tasks: HashMap<TaskId, TaskRecord>,
728}
729
730impl TaskExecutionServiceInner {
731 /// Returns whether a retained task ID is still active.
732 fn task_is_active(&self, task_id: TaskId) -> bool {
733 self.tasks
734 .get(&task_id)
735 .is_some_and(|record| record.status.is_active())
736 }
737
738 /// Returns whether any retained task is still active.
739 fn has_active_tasks(&self) -> bool {
740 self.tasks.values().any(|record| record.status.is_active())
741 }
742}
743
744/// Registry record for one managed task.
745struct TaskRecord {
746 status: TaskStatus,
747 cancel: Arc<dyn Fn() -> bool + Send + Sync>,
748}
749
750/// Callable wrapper that keeps service-level status aligned with task outcome.
751struct StatusReportingTask<C> {
752 /// Stable business task ID.
753 task_id: TaskId,
754 /// User task to execute.
755 task: C,
756 /// Shared service registry.
757 state: Arc<TaskExecutionServiceState>,
758}
759
760impl<C, R, E> Callable<R, E> for StatusReportingTask<C>
761where
762 C: Callable<R, E>,
763{
764 /// Runs the user task and records the corresponding service-level status.
765 fn call(&mut self) -> Result<R, E> {
766 self.state.set_status(self.task_id, TaskStatus::Running);
767 match catch_unwind(AssertUnwindSafe(|| self.task.call())) {
768 Ok(Ok(value)) => {
769 self.state.set_status(self.task_id, TaskStatus::Succeeded);
770 Ok(value)
771 }
772 Ok(Err(error)) => {
773 self.state.set_status(self.task_id, TaskStatus::Failed);
774 Err(error)
775 }
776 Err(payload) => {
777 self.state.set_status(self.task_id, TaskStatus::Panicked);
778 resume_unwind(payload);
779 }
780 }
781 }
782}