Skip to main content

qubit_task/service/
task_execution_service.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 *    SPDX-License-Identifier: Apache-2.0
6 *
7 *    Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10use std::{
11    collections::HashMap,
12    panic::{
13        AssertUnwindSafe,
14        catch_unwind,
15        resume_unwind,
16    },
17    sync::{
18        Arc,
19        Condvar,
20        Mutex,
21        MutexGuard,
22    },
23};
24
25use qubit_function::{
26    Callable,
27    Runnable,
28};
29
30use qubit_executor::TaskHandle;
31use qubit_executor::service::{
32    ExecutorService,
33    StopReport,
34};
35use qubit_executor::task::spi::{
36    TaskEndpointPair,
37    TaskSlot,
38};
39use qubit_thread_pool::{
40    ExecutorServiceBuilderError,
41    PoolJob,
42    ThreadPool,
43};
44
45use super::{
46    task_execution_service_builder::TaskExecutionServiceBuilder,
47    task_execution_service_error::TaskExecutionServiceError,
48    task_execution_stats::TaskExecutionStats,
49    task_id::TaskId,
50    task_status::TaskStatus,
51};
52
53/// Managed task execution service built on [`ThreadPool`].
54///
55/// Assigns a stable business [`TaskId`] per task and tracks service-level status
56/// (submitted, running, succeeded, failed, cancelled, panicked). The typed task
57/// outcome is still retrieved through [`TaskHandle`].
58///
59/// # Responsibilities
60///
61/// - **Registry**: The same [`TaskId`] cannot be submitted again while a record
62///   for it exists; a duplicate returns [`TaskExecutionServiceError::DuplicateTask`].
63///   Use this when you need lookup by ID, optional pre-start cancellation, or
64///   long-lived task bookkeeping.
65/// - **Thread pool**: Owns a [`ThreadPool`] for queuing and worker threads; queue
66///   internals are not exposed. Configure the pool via [`TaskExecutionServiceBuilder`]
67///   or [`Self::builder`].
68/// - **Submission semantics**: [`Self::submit`] / [`Self::submit_callable`] returning
69///   `Ok(handle)` means only that the **service accepted** the task—not that it
70///   started or succeeded. Observe the final result with [`TaskHandle::get`] or by
71///   awaiting the handle’s [`Future`](std::future::Future) implementation.
72///
73/// # Suspend
74///
75/// [`Self::suspend`] rejects **new** submissions ([`TaskExecutionServiceError::Suspended`]).
76/// Tasks already queued or running are unaffected. [`Self::resume`] re-enables submission.
77///
78/// # Cancel
79///
80/// [`Self::cancel`] may succeed only **before** the task starts running; once running,
81/// cancellation behavior follows [`TaskHandle`] and the internal completion protocol.
82///
83/// # Shutdown
84///
85/// [`Self::shutdown`] and [`Self::stop`] delegate to the backing pool.
86/// [`Self::wait_termination`] blocks the current thread until all accepted work
87/// has completed, failed, panicked, or been cancelled.
88///
89/// # Example: submit, inspect status, wait for idle, shutdown
90///
91/// ```
92/// use std::error::Error;
93/// use qubit_task::service::{TaskExecutionService, TaskId, TaskStatus};
94///
95/// fn main() -> Result<(), Box<dyn Error>> {
96///     let service = TaskExecutionService::new()?;
97///     let id: TaskId = 1001;
98///
99///     let handle = service.submit(id, || Ok::<(), ()>(()))?;
100///     handle.get().unwrap();
101///
102///     assert_eq!(service.status(id), Some(TaskStatus::Succeeded));
103///
104///     service.await_idle();
105///     service.shutdown();
106///     Ok(())
107/// }
108/// ```
109///
110pub struct TaskExecutionService {
111    pool: ThreadPool,
112    state: Arc<TaskExecutionServiceState>,
113}
114
115impl TaskExecutionService {
116    /// Creates a service using the default [`super::ThreadPoolBuilder`] settings (worker
117    /// counts, queue, and other defaults match [`ThreadPool::builder`]).
118    ///
119    /// # Example
120    ///
121    /// ```
122    /// use qubit_task::service::{TaskExecutionService, ExecutorServiceBuilderError};
123    ///
124    /// fn main() -> Result<(), ExecutorServiceBuilderError> {
125    ///     let _service = TaskExecutionService::new()?;
126    ///     Ok(())
127    /// }
128    /// ```
129    ///
130    /// # Returns
131    ///
132    /// `Ok(Self)` on success, or [`ExecutorServiceBuilderError`] if the pool cannot be built.
133    pub fn new() -> Result<Self, ExecutorServiceBuilderError> {
134        Self::builder().build()
135    }
136
137    /// Returns a [`TaskExecutionServiceBuilder`] so you can tune the backing pool
138    /// before [`TaskExecutionServiceBuilder::build`] (for example
139    /// [`super::ThreadPoolBuilder::pool_size`],
140    /// [`super::ThreadPoolBuilder::queue_capacity`]).
141    ///
142    /// # Example
143    ///
144    /// ```
145    /// use qubit_task::service::{
146    ///     TaskExecutionService, ThreadPoolBuilder, ExecutorServiceBuilderError,
147    /// };
148    ///
149    /// fn main() -> Result<(), ExecutorServiceBuilderError> {
150    ///     let _service = TaskExecutionService::builder()
151    ///         .thread_pool(ThreadPoolBuilder::default().pool_size(8))
152    ///         .build()?;
153    ///     Ok(())
154    /// }
155    /// ```
156    ///
157    /// # Returns
158    ///
159    /// A builder holding the default [`super::ThreadPoolBuilder`].
160    #[inline]
161    pub fn builder() -> TaskExecutionServiceBuilder {
162        TaskExecutionServiceBuilder::default()
163    }
164
165    /// Builds a service from an already constructed pool.
166    pub(crate) fn from_thread_pool(pool: ThreadPool) -> Self {
167        Self {
168            pool,
169            state: Arc::new(TaskExecutionServiceState::default()),
170        }
171    }
172
173    /// Submits a runnable task with a business task ID.
174    ///
175    /// # Example
176    ///
177    /// ```
178    /// use std::error::Error;
179    /// use qubit_task::service::TaskExecutionService;
180    ///
181    /// fn main() -> Result<(), Box<dyn Error>> {
182    ///     let service = TaskExecutionService::new()?;
183    ///     let handle = service.submit(42_u64, || Ok::<(), ()>(()))?;
184    ///     handle.get().unwrap();
185    ///     Ok(())
186    /// }
187    /// ```
188    ///
189    /// # Parameters
190    ///
191    /// * `task_id` - Stable business ID for registry operations.
192    /// * `task` - Runnable to execute.
193    ///
194    /// # Returns
195    ///
196    /// `Ok(handle)` if the service accepts the task. This only means
197    /// acceptance; task success is observed through the handle. Returns
198    /// [`TaskExecutionServiceError`] when the ID is duplicated, the service is
199    /// suspended, or the backing pool rejects the task.
200    #[inline]
201    pub fn submit<T, E>(
202        &self,
203        task_id: TaskId,
204        mut task: T,
205    ) -> Result<TaskHandle<(), E>, TaskExecutionServiceError>
206    where
207        T: Runnable<E> + Send + 'static,
208        E: Send + 'static,
209    {
210        self.submit_callable(task_id, move || task.run())
211    }
212
213    /// Submits a callable task with a business task ID.
214    ///
215    /// # Example
216    ///
217    /// ```
218    /// use std::error::Error;
219    /// use qubit_task::service::{TaskExecutionService, TaskId};
220    ///
221    /// fn main() -> Result<(), Box<dyn Error>> {
222    ///     let service = TaskExecutionService::new()?;
223    ///     let id: TaskId = 7;
224    ///     let handle = service.submit_callable(id, || Ok::<i32, ()>(21))?;
225    ///     assert_eq!(handle.get().unwrap(), 21);
226    ///     Ok(())
227    /// }
228    /// ```
229    ///
230    /// # Parameters
231    ///
232    /// * `task_id` - Stable business ID for registry operations.
233    /// * `task` - Callable to execute.
234    ///
235    /// # Returns
236    ///
237    /// `Ok(handle)` if the service accepts the task. The handle reports the
238    /// typed task result while this service records only service-level status.
239    pub fn submit_callable<C, R, E>(
240        &self,
241        task_id: TaskId,
242        task: C,
243    ) -> Result<TaskHandle<R, E>, TaskExecutionServiceError>
244    where
245        C: Callable<R, E> + Send + 'static,
246        R: Send + 'static,
247        E: Send + 'static,
248    {
249        let (handle, slot) = TaskEndpointPair::new().into_parts();
250        let slot = Arc::new(Mutex::new(Some(slot)));
251        let accept_slot = Arc::clone(&slot);
252        let cancel_slot = Arc::clone(&slot);
253        let run_slot = Arc::clone(&slot);
254        let cancel_state = Arc::clone(&self.state);
255        let cancel: Arc<dyn Fn() -> bool + Send + Sync> = Arc::new(move || {
256            let slot = cancel_slot
257                .lock()
258                .expect("task slot lock should not be poisoned")
259                .take();
260            let cancelled = slot.is_some_and(TaskSlot::cancel_unstarted);
261            if cancelled {
262                cancel_state.set_status(task_id, TaskStatus::Cancelled);
263            }
264            cancelled
265        });
266
267        self.state.register(task_id, Arc::clone(&cancel))?;
268
269        let run_state = Arc::clone(&self.state);
270        let cancel_for_job = Arc::clone(&cancel);
271        let job = PoolJob::with_accept(
272            Box::new(move || {
273                if let Some(slot) = accept_slot
274                    .lock()
275                    .expect("task slot lock should not be poisoned")
276                    .as_ref()
277                {
278                    slot.accept();
279                }
280            }),
281            Box::new(move || {
282                let slot = run_slot
283                    .lock()
284                    .expect("task slot lock should not be poisoned")
285                    .take();
286                if let Some(slot) = slot {
287                    let task = StatusReportingTask {
288                        task_id,
289                        task,
290                        state: run_state,
291                    };
292                    if !slot.run(task) {
293                        cancel_for_job();
294                    }
295                }
296            }),
297            Box::new(move || {
298                cancel();
299            }),
300        );
301
302        if let Err(error) = self.pool.submit_job(job) {
303            self.state.remove(task_id);
304            return Err(error.into());
305        }
306        Ok(handle)
307    }
308
309    /// Attempts to cancel a submitted task by ID.
310    ///
311    /// Cancellation succeeds only before the task starts running.
312    ///
313    /// # Example
314    ///
315    /// ```
316    /// use std::error::Error;
317    /// use qubit_task::service::{TaskExecutionService, TaskId};
318    ///
319    /// fn main() -> Result<(), Box<dyn Error>> {
320    ///     let service = TaskExecutionService::new()?;
321    ///     let id: TaskId = 1;
322    ///     let handle = service.submit(id, || Ok::<(), ()>(()))?;
323    ///     // `true` only if cancelled before a worker starts the task (race with the pool).
324    ///     let _cancelled = service.cancel(id);
325    ///     match handle.get() {
326    ///         Ok(()) => {}
327    ///         Err(e) if e.is_cancelled() => {}
328    ///         Err(e) => panic!("unexpected task outcome: {e:?}"),
329    ///     }
330    ///     Ok(())
331    /// }
332    /// ```
333    ///
334    /// # Parameters
335    ///
336    /// * `task_id` - ID of the task to cancel.
337    ///
338    /// # Returns
339    ///
340    /// `true` if the task was cancelled before start, or `false` if no active
341    /// task with this ID can be cancelled.
342    pub fn cancel(&self, task_id: TaskId) -> bool {
343        let cancel = self.state.cancel_callback(task_id);
344        cancel.is_some_and(|cancel| cancel())
345    }
346
347    /// Returns the current status of a task.
348    ///
349    /// # Example
350    ///
351    /// ```
352    /// use std::error::Error;
353    /// use qubit_task::service::{TaskExecutionService, TaskId, TaskStatus};
354    ///
355    /// fn main() -> Result<(), Box<dyn Error>> {
356    ///     let service = TaskExecutionService::new()?;
357    ///     let id: TaskId = 10;
358    ///     let handle = service.submit(id, || Ok::<(), ()>(()))?;
359    ///     handle.get().unwrap();
360    ///     assert_eq!(service.status(id), Some(TaskStatus::Succeeded));
361    ///     Ok(())
362    /// }
363    /// ```
364    ///
365    /// # Parameters
366    ///
367    /// * `task_id` - ID of the task to inspect.
368    ///
369    /// # Returns
370    ///
371    /// `Some(status)` if the service retains a record for this ID, or `None`
372    /// if the ID is unknown.
373    #[inline]
374    pub fn status(&self, task_id: TaskId) -> Option<TaskStatus> {
375        self.state.status(task_id)
376    }
377
378    /// Returns registry-derived task statistics.
379    ///
380    /// # Example
381    ///
382    /// ```
383    /// use std::error::Error;
384    /// use qubit_task::service::TaskExecutionService;
385    ///
386    /// fn main() -> Result<(), Box<dyn Error>> {
387    ///     let service = TaskExecutionService::new()?;
388    ///     let handle = service.submit(1_u64, || Ok::<(), ()>(()))?;
389    ///     handle.get().unwrap();
390    ///     let snapshot = service.stats();
391    ///     assert!(snapshot.total >= 1);
392    ///     Ok(())
393    /// }
394    /// ```
395    ///
396    /// # Returns
397    ///
398    /// A snapshot of retained task records grouped by status.
399    #[inline]
400    pub fn stats(&self) -> TaskExecutionStats {
401        self.state.stats()
402    }
403
404    /// Suspends new submissions.
405    ///
406    /// Existing submitted and running tasks continue normally.
407    ///
408    /// # Example
409    ///
410    /// ```
411    /// use qubit_task::service::{TaskExecutionService, ExecutorServiceBuilderError};
412    ///
413    /// fn main() -> Result<(), ExecutorServiceBuilderError> {
414    ///     let service = TaskExecutionService::new()?;
415    ///     service.suspend();
416    ///     assert!(service.is_suspended());
417    ///     service.resume();
418    ///     assert!(!service.is_suspended());
419    ///     Ok(())
420    /// }
421    /// ```
422    #[inline]
423    pub fn suspend(&self) {
424        self.state.set_suspended(true);
425    }
426
427    /// Resumes accepting new submissions.
428    #[inline]
429    pub fn resume(&self) {
430        self.state.set_suspended(false);
431    }
432
433    /// Returns whether the service is suspended.
434    ///
435    /// # Returns
436    ///
437    /// `true` if new submissions are rejected before reaching the pool.
438    #[inline]
439    pub fn is_suspended(&self) -> bool {
440        self.state.is_suspended()
441    }
442
443    /// Waits for the active task snapshot observed at call time to finish.
444    ///
445    /// Tasks submitted after this method starts are not part of the waited
446    /// snapshot. This method blocks the current thread.
447    ///
448    /// # Example
449    ///
450    /// ```
451    /// use std::error::Error;
452    /// use qubit_task::service::{TaskExecutionService, TaskId};
453    ///
454    /// fn main() -> Result<(), Box<dyn Error>> {
455    ///     let service = TaskExecutionService::new()?;
456    ///     let a: TaskId = 1;
457    ///     let b: TaskId = 2;
458    ///     let h1 = service.submit(a, || Ok::<(), ()>(()))?;
459    ///     let h2 = service.submit(b, || Ok::<(), ()>(()))?;
460    ///     service.await_in_flight_tasks_completion();
461    ///     h1.get().unwrap();
462    ///     h2.get().unwrap();
463    ///     Ok(())
464    /// }
465    /// ```
466    pub fn await_in_flight_tasks_completion(&self) {
467        self.state.await_in_flight_tasks_completion();
468    }
469
470    /// Waits until the service registry has no submitted or running tasks.
471    ///
472    /// This method blocks the current thread and observes real-time idleness.
473    ///
474    /// # Example
475    ///
476    /// ```
477    /// use std::error::Error;
478    /// use qubit_task::service::{TaskExecutionService, TaskId};
479    ///
480    /// fn main() -> Result<(), Box<dyn Error>> {
481    ///     let service = TaskExecutionService::new()?;
482    ///     let id: TaskId = 1;
483    ///     let handle = service.submit(id, || Ok::<(), ()>(()))?;
484    ///     handle.get().unwrap();
485    ///     service.await_idle();
486    ///     Ok(())
487    /// }
488    /// ```
489    pub fn await_idle(&self) {
490        self.state.await_idle();
491    }
492
493    /// Initiates graceful shutdown of the backing pool.
494    ///
495    /// # Example
496    ///
497    /// ```
498    /// use qubit_task::service::{TaskExecutionService, ExecutorServiceBuilderError};
499    ///
500    /// fn main() -> Result<(), ExecutorServiceBuilderError> {
501    ///     let service = TaskExecutionService::new()?;
502    ///     service.shutdown();
503    ///     assert!(service.is_not_running());
504    ///     Ok(())
505    /// }
506    /// ```
507    #[inline]
508    pub fn shutdown(&self) {
509        self.pool.shutdown();
510    }
511
512    /// Initiates immediate stop of the backing pool.
513    ///
514    /// # Example
515    ///
516    /// ```
517    /// use qubit_task::service::{TaskExecutionService, ExecutorServiceBuilderError};
518    ///
519    /// fn main() -> Result<(), ExecutorServiceBuilderError> {
520    ///     let service = TaskExecutionService::new()?;
521    ///     let _report = service.stop();
522    ///     Ok(())
523    /// }
524    /// ```
525    ///
526    /// # Returns
527    ///
528    /// A count-based report from the backing pool.
529    #[inline]
530    pub fn stop(&self) -> StopReport {
531        self.pool.stop()
532    }
533
534    /// Returns whether the backing pool no longer accepts new work.
535    #[inline]
536    pub fn is_not_running(&self) -> bool {
537        self.pool.is_not_running()
538    }
539
540    /// Returns whether the backing pool has terminated.
541    #[inline]
542    pub fn is_terminated(&self) -> bool {
543        self.pool.is_terminated()
544    }
545
546    /// Blocks until the backing pool has terminated.
547    ///
548    /// # Example
549    ///
550    /// ```
551    /// use qubit_task::service::{TaskExecutionService, ExecutorServiceBuilderError};
552    ///
553    /// fn main() -> Result<(), ExecutorServiceBuilderError> {
554    ///     let service = TaskExecutionService::new()?;
555    ///     service.shutdown();
556    ///     service.wait_termination();
557    ///     assert!(service.is_terminated());
558    ///     Ok(())
559    /// }
560    /// ```
561    ///
562    /// # Returns
563    ///
564    /// Returns after shutdown and worker exit.
565    #[inline]
566    pub fn wait_termination(&self) {
567        self.pool.wait_termination();
568    }
569
570    /// Returns the backing thread pool.
571    ///
572    /// # Example
573    ///
574    /// ```
575    /// use qubit_task::service::{TaskExecutionService, ExecutorServiceBuilderError};
576    ///
577    /// fn main() -> Result<(), ExecutorServiceBuilderError> {
578    ///     let service = TaskExecutionService::new()?;
579    ///     let pool = service.thread_pool();
580    ///     assert!(pool.maximum_pool_size() > 0);
581    ///     Ok(())
582    /// }
583    /// ```
584    ///
585    /// # Returns
586    ///
587    /// A shared reference for low-level inspection such as pool statistics.
588    #[inline]
589    pub fn thread_pool(&self) -> &ThreadPool {
590        &self.pool
591    }
592}
593
594/// Shared state for [`TaskExecutionService`].
595#[derive(Default)]
596struct TaskExecutionServiceState {
597    inner: Mutex<TaskExecutionServiceInner>,
598    idle: Condvar,
599}
600
601impl TaskExecutionServiceState {
602    /// Acquires service state.
603    fn lock_inner(&self) -> MutexGuard<'_, TaskExecutionServiceInner> {
604        self.inner
605            .lock()
606            .expect("task execution service state lock should not be poisoned")
607    }
608
609    /// Registers a submitted task.
610    fn register(
611        &self,
612        task_id: TaskId,
613        cancel: Arc<dyn Fn() -> bool + Send + Sync>,
614    ) -> Result<(), TaskExecutionServiceError> {
615        let mut inner = self.lock_inner();
616        if inner.suspended {
617            return Err(TaskExecutionServiceError::Suspended);
618        }
619        if inner.tasks.contains_key(&task_id) {
620            return Err(TaskExecutionServiceError::DuplicateTask(task_id));
621        }
622        inner.tasks.insert(
623            task_id,
624            TaskRecord {
625                status: TaskStatus::Submitted,
626                cancel,
627            },
628        );
629        Ok(())
630    }
631
632    /// Removes a task record.
633    fn remove(&self, task_id: TaskId) {
634        let mut inner = self.lock_inner();
635        inner.tasks.remove(&task_id);
636        self.idle.notify_all();
637    }
638
639    /// Gets a task status.
640    fn status(&self, task_id: TaskId) -> Option<TaskStatus> {
641        self.lock_inner()
642            .tasks
643            .get(&task_id)
644            .map(|record| record.status)
645    }
646
647    /// Gets a task cancel callback if the task is active.
648    fn cancel_callback(&self, task_id: TaskId) -> Option<Arc<dyn Fn() -> bool + Send + Sync>> {
649        let inner = self.lock_inner();
650        let record = inner.tasks.get(&task_id)?;
651        record
652            .status
653            .is_active()
654            .then(|| Arc::clone(&record.cancel))
655    }
656
657    /// Updates a task status.
658    fn set_status(&self, task_id: TaskId, status: TaskStatus) {
659        let mut inner = self.lock_inner();
660        let record = inner
661            .tasks
662            .get_mut(&task_id)
663            .expect("task status can only be updated for a registered task");
664        record.status = status;
665        self.idle.notify_all();
666    }
667
668    /// Updates suspended flag.
669    fn set_suspended(&self, suspended: bool) {
670        self.lock_inner().suspended = suspended;
671    }
672
673    /// Returns whether new submissions are suspended.
674    fn is_suspended(&self) -> bool {
675        self.lock_inner().suspended
676    }
677
678    /// Returns task statistics.
679    fn stats(&self) -> TaskExecutionStats {
680        let inner = self.lock_inner();
681        let mut stats = TaskExecutionStats::default();
682        for record in inner.tasks.values() {
683            stats.add_status(record.status);
684        }
685        stats
686    }
687
688    /// Waits for active task IDs observed at call time.
689    fn await_in_flight_tasks_completion(&self) {
690        let mut inner = self.lock_inner();
691        let task_ids = inner
692            .tasks
693            .iter()
694            .filter_map(|(&task_id, record)| record.status.is_active().then_some(task_id))
695            .collect::<Vec<_>>();
696        while task_ids
697            .iter()
698            .any(|task_id| inner.task_is_active(*task_id))
699        {
700            inner = self.wait_for_idle_notification(inner);
701        }
702    }
703
704    /// Waits until no retained task record is active.
705    fn await_idle(&self) {
706        let mut inner = self.lock_inner();
707        while inner.has_active_tasks() {
708            inner = self.wait_for_idle_notification(inner);
709        }
710    }
711
712    /// Waits for a state transition notification.
713    fn wait_for_idle_notification<'a>(
714        &self,
715        inner: MutexGuard<'a, TaskExecutionServiceInner>,
716    ) -> MutexGuard<'a, TaskExecutionServiceInner> {
717        self.idle
718            .wait(inner)
719            .expect("task execution service state lock should not be poisoned")
720    }
721}
722
723/// Mutable service state protected by a mutex.
724#[derive(Default)]
725struct TaskExecutionServiceInner {
726    suspended: bool,
727    tasks: HashMap<TaskId, TaskRecord>,
728}
729
730impl TaskExecutionServiceInner {
731    /// Returns whether a retained task ID is still active.
732    fn task_is_active(&self, task_id: TaskId) -> bool {
733        self.tasks
734            .get(&task_id)
735            .is_some_and(|record| record.status.is_active())
736    }
737
738    /// Returns whether any retained task is still active.
739    fn has_active_tasks(&self) -> bool {
740        self.tasks.values().any(|record| record.status.is_active())
741    }
742}
743
744/// Registry record for one managed task.
745struct TaskRecord {
746    status: TaskStatus,
747    cancel: Arc<dyn Fn() -> bool + Send + Sync>,
748}
749
750/// Callable wrapper that keeps service-level status aligned with task outcome.
751struct StatusReportingTask<C> {
752    /// Stable business task ID.
753    task_id: TaskId,
754    /// User task to execute.
755    task: C,
756    /// Shared service registry.
757    state: Arc<TaskExecutionServiceState>,
758}
759
760impl<C, R, E> Callable<R, E> for StatusReportingTask<C>
761where
762    C: Callable<R, E>,
763{
764    /// Runs the user task and records the corresponding service-level status.
765    fn call(&mut self) -> Result<R, E> {
766        self.state.set_status(self.task_id, TaskStatus::Running);
767        match catch_unwind(AssertUnwindSafe(|| self.task.call())) {
768            Ok(Ok(value)) => {
769                self.state.set_status(self.task_id, TaskStatus::Succeeded);
770                Ok(value)
771            }
772            Ok(Err(error)) => {
773                self.state.set_status(self.task_id, TaskStatus::Failed);
774                Err(error)
775            }
776            Err(payload) => {
777                self.state.set_status(self.task_id, TaskStatus::Panicked);
778                resume_unwind(payload);
779            }
780        }
781    }
782}