anput_jobs/
lib.rs

1pub mod coroutine;
2
3use crate::coroutine::context;
4#[cfg(target_arch = "wasm32")]
5use instant::{Duration, Instant};
6use intuicio_data::managed::{DynamicManagedLazy, ManagedLazy};
7#[cfg(not(target_arch = "wasm32"))]
8use std::time::{Duration, Instant};
9use std::{
10    collections::{HashMap, HashSet, VecDeque},
11    error::Error,
12    hash::{DefaultHasher, Hash, Hasher},
13    pin::Pin,
14    sync::{
15        Arc, Condvar, Mutex, RwLock,
16        atomic::{AtomicBool, Ordering},
17        mpsc::{Receiver, Sender},
18    },
19    task::{Context, Poll, RawWaker, RawWakerVTable, Wake, Waker},
20    thread::{JoinHandle, ThreadId, available_parallelism, spawn},
21};
22use typid::ID;
23
24struct Job(Pin<Box<dyn Future<Output = ()> + Send + Sync>>);
25
26impl Job {
27    fn poll(mut self, cx: &mut Context<'_>) -> Option<Self> {
28        match self.0.as_mut().poll(cx) {
29            Poll::Ready(_) => None,
30            Poll::Pending => Some(self),
31        }
32    }
33}
34
35#[inline]
36fn traced_spin_loop() {
37    #[cfg(feature = "deadlock-trace")]
38    println!(
39        "* DEADLOCK BACKTRACE: {}",
40        std::backtrace::Backtrace::force_capture()
41    );
42    std::hint::spin_loop();
43}
44
45pub struct JobHandle<T: Send + 'static> {
46    result: Arc<Mutex<Option<Option<T>>>>,
47    cancel: Arc<AtomicBool>,
48    suspend: Arc<AtomicBool>,
49    meta: Arc<RwLock<HashMap<String, DynamicManagedLazy>>>,
50}
51
52impl<T: Send + 'static> Default for JobHandle<T> {
53    fn default() -> Self {
54        Self {
55            result: Default::default(),
56            cancel: Default::default(),
57            suspend: Default::default(),
58            meta: Default::default(),
59        }
60    }
61}
62
63impl<T: Send + 'static> JobHandle<T> {
64    pub fn new(value: T) -> Self {
65        Self {
66            result: Arc::new(Mutex::new(Some(Some(value)))),
67            cancel: Default::default(),
68            suspend: Default::default(),
69            meta: Default::default(),
70        }
71    }
72
73    pub(crate) fn with_meta(
74        self,
75        iter: impl IntoIterator<Item = (String, DynamicManagedLazy)>,
76    ) -> Self {
77        if let Ok(mut meta) = self.meta.write() {
78            meta.extend(iter);
79        }
80        self
81    }
82
83    pub fn is_cancelled(&self) -> bool {
84        self.cancel.load(Ordering::Relaxed)
85    }
86
87    pub fn is_suspended(&self) -> bool {
88        self.suspend.load(Ordering::Relaxed)
89    }
90
91    pub fn is_done(&self) -> bool {
92        self.result
93            .try_lock()
94            .ok()
95            .map(|guard| guard.is_some())
96            .unwrap_or_default()
97    }
98
99    pub fn try_take(&self) -> Option<Option<T>> {
100        self.result
101            .try_lock()
102            .ok()
103            .and_then(|mut result| result.take())
104    }
105
106    pub fn wait(self) -> Option<T> {
107        loop {
108            if let Some(result) = self.try_take() {
109                return result;
110            } else {
111                traced_spin_loop();
112            }
113        }
114    }
115
116    pub fn cancel(&self) {
117        self.cancel.store(true, Ordering::Relaxed);
118        if let Ok(mut result) = self.result.lock() {
119            *result = Some(None);
120        }
121        self.resume();
122    }
123
124    pub fn suspend(&self) {
125        self.suspend.store(true, Ordering::Relaxed);
126    }
127
128    pub fn resume(&self) {
129        self.suspend.store(false, Ordering::Relaxed);
130    }
131
132    fn put(&self, value: T) {
133        if let Ok(mut result) = self.result.lock() {
134            *result = Some(Some(value));
135        }
136    }
137}
138
139impl<T: Send + 'static> Clone for JobHandle<T> {
140    fn clone(&self) -> Self {
141        Self {
142            result: self.result.clone(),
143            cancel: self.cancel.clone(),
144            suspend: self.suspend.clone(),
145            meta: self.meta.clone(),
146        }
147    }
148}
149
150impl<T: Send + 'static> Future for JobHandle<T> {
151    type Output = Option<T>;
152
153    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
154        if let Some(result) = self.try_take() {
155            cx.waker().wake_by_ref();
156            Poll::Ready(result)
157        } else {
158            cx.waker().wake_by_ref();
159            Poll::Pending
160        }
161    }
162}
163
164pub struct AllJobsHandle<T: Send + 'static> {
165    jobs: Vec<JobHandle<T>>,
166}
167
168impl<T: Send + 'static> Default for AllJobsHandle<T> {
169    fn default() -> Self {
170        Self {
171            jobs: Default::default(),
172        }
173    }
174}
175
176impl<T: Send + 'static> AllJobsHandle<T> {
177    pub fn new(value: T) -> Self {
178        Self {
179            jobs: vec![JobHandle::new(value)],
180        }
181    }
182
183    pub fn into_inner(self) -> Vec<JobHandle<T>> {
184        self.jobs
185    }
186
187    pub fn many(handles: impl IntoIterator<Item = JobHandle<T>>) -> Self {
188        Self {
189            jobs: handles.into_iter().collect(),
190        }
191    }
192
193    pub fn add(&mut self, handle: JobHandle<T>) {
194        self.jobs.push(handle);
195    }
196
197    pub fn extend(&mut self, handles: impl IntoIterator<Item = JobHandle<T>>) {
198        self.jobs.extend(handles);
199    }
200
201    pub fn is_done(&self) -> bool {
202        self.jobs.iter().all(|job| job.is_done())
203    }
204
205    pub fn try_take(&self) -> Option<Option<Vec<T>>> {
206        self.is_done()
207            .then(|| self.jobs.iter().flat_map(|job| job.try_take()).collect())
208    }
209
210    pub fn wait(self) -> Option<Vec<T>> {
211        self.jobs.into_iter().map(|job| job.wait()).collect()
212    }
213}
214
215impl<T: Send + 'static> Clone for AllJobsHandle<T> {
216    fn clone(&self) -> Self {
217        Self {
218            jobs: self.jobs.clone(),
219        }
220    }
221}
222
223impl<T: Send + 'static> Future for AllJobsHandle<T> {
224    type Output = Option<Vec<T>>;
225
226    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
227        if let Some(result) = self.try_take() {
228            cx.waker().wake_by_ref();
229            Poll::Ready(result)
230        } else {
231            cx.waker().wake_by_ref();
232            Poll::Pending
233        }
234    }
235}
236
237pub struct AnyJobHandle<T: Send + 'static> {
238    jobs: Vec<JobHandle<T>>,
239}
240
241impl<T: Send + 'static> Default for AnyJobHandle<T> {
242    fn default() -> Self {
243        Self {
244            jobs: Default::default(),
245        }
246    }
247}
248
249impl<T: Send + 'static> AnyJobHandle<T> {
250    pub fn new(value: T) -> Self {
251        Self {
252            jobs: vec![JobHandle::new(value)],
253        }
254    }
255
256    pub fn into_inner(self) -> Vec<JobHandle<T>> {
257        self.jobs
258    }
259
260    pub fn many(handles: impl IntoIterator<Item = JobHandle<T>>) -> Self {
261        Self {
262            jobs: handles.into_iter().collect(),
263        }
264    }
265
266    pub fn add(&mut self, handle: JobHandle<T>) {
267        self.jobs.push(handle);
268    }
269
270    pub fn extend(&mut self, handles: impl IntoIterator<Item = JobHandle<T>>) {
271        self.jobs.extend(handles);
272    }
273
274    pub fn is_done(&self) -> bool {
275        self.jobs.iter().any(|job| job.is_done())
276    }
277
278    pub fn try_take(&self) -> Option<Option<T>> {
279        self.is_done()
280            .then(|| self.jobs.iter().find_map(|job| job.try_take()).flatten())
281    }
282
283    pub fn wait(self) -> Option<T> {
284        loop {
285            if let Some(result) = self.try_take() {
286                return result;
287            } else {
288                traced_spin_loop();
289            }
290        }
291    }
292}
293
294impl<T: Send + 'static> Clone for AnyJobHandle<T> {
295    fn clone(&self) -> Self {
296        Self {
297            jobs: self.jobs.clone(),
298        }
299    }
300}
301
302impl<T: Send + 'static> Future for AnyJobHandle<T> {
303    type Output = Option<T>;
304
305    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
306        if let Some(result) = self.try_take() {
307            cx.waker().wake_by_ref();
308            Poll::Ready(result)
309        } else {
310            cx.waker().wake_by_ref();
311            Poll::Pending
312        }
313    }
314}
315
316#[derive(Debug, Clone, Copy, PartialEq, Eq)]
317pub struct JobContext {
318    pub work_group_index: usize,
319    pub work_groups_count: usize,
320}
321
322impl std::fmt::Display for JobContext {
323    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
324        write!(
325            f,
326            "JobContext {{ work_group_index: {}, work_groups_count: {} }}",
327            self.work_group_index, self.work_groups_count
328        )
329    }
330}
331
332#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
333pub enum JobPriority {
334    #[default]
335    Normal,
336    High,
337}
338
339impl std::fmt::Display for JobPriority {
340    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
341        match self {
342            JobPriority::Normal => write!(f, "Normal"),
343            JobPriority::High => write!(f, "High"),
344        }
345    }
346}
347
348struct JobObject {
349    pub id: ID<Jobs>,
350    pub job: Job,
351    pub context: JobContext,
352    pub location: JobLocation,
353    pub priority: JobPriority,
354    pub cancel: Arc<AtomicBool>,
355    pub suspend: Arc<AtomicBool>,
356    pub meta: Arc<RwLock<HashMap<String, DynamicManagedLazy>>>,
357}
358
359#[derive(Default, Clone)]
360pub struct JobQueue {
361    queue: Arc<RwLock<VecDeque<JobObject>>>,
362}
363
364impl JobQueue {
365    pub fn is_empty(&self) -> bool {
366        self.queue.read().map_or(true, |queue| queue.is_empty())
367    }
368
369    pub fn len(&self) -> usize {
370        self.queue.read().map_or(0, |queue| queue.len())
371    }
372
373    pub fn clear(&self) {
374        if let Ok(mut queue) = self.queue.write() {
375            queue.clear();
376        }
377    }
378
379    pub fn append(&self, other: &Self) {
380        if let Ok(mut other_queue) = other.queue.write() {
381            self.extend(other_queue.drain(..));
382        }
383    }
384
385    pub fn spawn_on<T: Send + 'static>(
386        &self,
387        location: JobLocation,
388        priority: JobPriority,
389        job: impl Future<Output = T> + Send + Sync + 'static,
390    ) -> JobHandle<T> {
391        let handle = JobHandle::<T>::default();
392        let handle2 = handle.clone();
393        let job = Job(Box::pin(async move {
394            handle2.put(job.await);
395        }));
396        self.schedule(location, priority, handle, job)
397    }
398
399    pub fn spawn_on_with_meta<T: Send + 'static>(
400        &self,
401        location: JobLocation,
402        priority: JobPriority,
403        meta: impl IntoIterator<Item = (String, DynamicManagedLazy)>,
404        job: impl Future<Output = T> + Send + Sync + 'static,
405    ) -> JobHandle<T> {
406        let handle = JobHandle::<T>::default().with_meta(meta);
407        let handle2 = handle.clone();
408        let job = Job(Box::pin(async move {
409            handle2.put(job.await);
410        }));
411        self.schedule(location, priority, handle, job)
412    }
413
414    pub fn queue_on<T: Send + 'static>(
415        &self,
416        location: JobLocation,
417        priority: JobPriority,
418        job: impl FnOnce(JobContext) -> T + Send + Sync + 'static,
419    ) -> JobHandle<T> {
420        let handle = JobHandle::<T>::default();
421        let handle2 = handle.clone();
422        let job = Job(Box::pin(async move {
423            handle2.put(job(context().await));
424        }));
425        self.schedule(location, priority, handle, job)
426    }
427
428    fn schedule<T: Send + 'static>(
429        &self,
430        location: JobLocation,
431        priority: JobPriority,
432        handle: JobHandle<T>,
433        job: Job,
434    ) -> JobHandle<T> {
435        self.enqueue(JobObject {
436            id: ID::new(),
437            job,
438            context: JobContext {
439                work_group_index: 0,
440                work_groups_count: 1,
441            },
442            location,
443            priority,
444            cancel: handle.cancel.clone(),
445            suspend: handle.suspend.clone(),
446            meta: handle.meta.clone(),
447        });
448        handle
449    }
450
451    fn enqueue(&self, object: JobObject) {
452        if let Ok(mut queue) = self.queue.write() {
453            if object.priority == JobPriority::High {
454                queue.push_back(object);
455            } else {
456                queue.push_front(object);
457            }
458        }
459    }
460
461    fn dequeue(&self, target_location: &JobLocation, ignore_location: bool) -> Option<JobObject> {
462        let mut queue = self.queue.write().ok()?;
463        let object = queue.pop_back()?;
464        if ignore_location {
465            return Some(object);
466        }
467        match (&object.location, target_location) {
468            (JobLocation::Local, JobLocation::Local)
469            | (JobLocation::UnnamedWorker, JobLocation::UnnamedWorker) => Some(object),
470            (JobLocation::NamedWorker(a), JobLocation::NamedWorker(b)) if a == b => Some(object),
471            (JobLocation::ExactThread(a), _) => {
472                if *a == std::thread::current().id() {
473                    Some(object)
474                } else {
475                    queue.push_front(object);
476                    None
477                }
478            }
479            (JobLocation::OtherThanThread(a), _) => {
480                if *a != std::thread::current().id() {
481                    Some(object)
482                } else {
483                    queue.push_front(object);
484                    None
485                }
486            }
487            (JobLocation::NonLocal, JobLocation::Local) => {
488                queue.push_front(object);
489                None
490            }
491            (JobLocation::Unknown, _) => Some(object),
492            _ => {
493                queue.push_front(object);
494                None
495            }
496        }
497    }
498
499    fn extend(&self, queue: impl IntoIterator<Item = JobObject>) {
500        if let Ok(mut current_queue) = self.queue.write() {
501            for object in queue {
502                if object.priority == JobPriority::High {
503                    current_queue.push_back(object);
504                } else {
505                    current_queue.push_front(object);
506                }
507            }
508        }
509    }
510}
511
512struct Worker {
513    location: JobLocation,
514    thread: Option<JoinHandle<()>>,
515    terminate: Arc<AtomicBool>,
516}
517
518impl Worker {
519    fn new(
520        worker_location: JobLocation,
521        queue: JobQueue,
522        global_meta: Arc<RwLock<HashMap<String, DynamicManagedLazy>>>,
523        worker_meta: Arc<RwLock<HashMap<String, DynamicManagedLazy>>>,
524        hash_tokens: Arc<Mutex<HashSet<u64>>>,
525        notify: Arc<(Mutex<bool>, Condvar)>,
526    ) -> Worker {
527        let terminate = Arc::new(AtomicBool::default());
528        let terminate2 = terminate.clone();
529        let worker_location2 = worker_location.clone();
530        let thread = spawn(move || {
531            let mut pending = vec![];
532            loop {
533                if terminate2.load(Ordering::Relaxed) {
534                    return;
535                }
536                while let Some(object) = queue.dequeue(&worker_location2, false) {
537                    let JobObject {
538                        id,
539                        job,
540                        context,
541                        location,
542                        mut priority,
543                        cancel,
544                        suspend,
545                        meta,
546                    } = object;
547                    let mut notify_workers = false;
548                    let (poll_result, receiver) = if suspend.load(Ordering::Relaxed) {
549                        let (_, rx) = std::sync::mpsc::channel();
550                        (Some(job), rx)
551                    } else {
552                        let (waker, receiver) = JobsWaker::new_waker(
553                            queue.clone(),
554                            location.clone(),
555                            context,
556                            priority,
557                            global_meta.clone(),
558                            worker_meta.clone(),
559                            meta.clone(),
560                            hash_tokens.clone(),
561                            cancel.clone(),
562                            suspend.clone(),
563                        );
564                        let mut cx = Context::from_waker(&waker);
565                        #[cfg(feature = "tracing")]
566                        let _span = tracing::span!(
567                            tracing::Level::TRACE,
568                            "Job poll",
569                            id = id.to_string(),
570                            location = location.to_string(),
571                            context = context.to_string(),
572                            priority = priority.to_string(),
573                            thread_id = format!("{:?}", std::thread::current().id()),
574                        )
575                        .entered();
576                        let poll_result = job.poll(&mut cx);
577                        (poll_result, receiver)
578                    };
579                    if let Some(job) = poll_result {
580                        let mut move_to = None;
581                        for command in receiver.try_iter() {
582                            notify_workers = true;
583                            match command {
584                                JobsWakerCommand::MoveTo(location) => {
585                                    move_to = Some(location);
586                                }
587                                JobsWakerCommand::ChangePriority(new_priority) => {
588                                    priority = new_priority;
589                                }
590                            }
591                        }
592                        if let Some(location) = move_to {
593                            pending.push(JobObject {
594                                id,
595                                job,
596                                context,
597                                location,
598                                priority,
599                                cancel,
600                                suspend,
601                                meta,
602                            });
603                        } else {
604                            pending.push(JobObject {
605                                id,
606                                job,
607                                context,
608                                location,
609                                priority,
610                                cancel,
611                                suspend,
612                                meta,
613                            });
614                        }
615                    }
616                    if terminate2.load(Ordering::Relaxed) {
617                        return;
618                    }
619                    if notify_workers {
620                        let (lock, cvar) = &*notify;
621                        if let Ok(mut running) = lock.lock() {
622                            *running = true;
623                        }
624                        cvar.notify_all();
625                    }
626                }
627                queue.extend(pending.drain(..));
628                if !queue.is_empty() {
629                    continue;
630                }
631                let (lock, cvar) = &*notify;
632                let Ok(mut ready) = lock.lock() else {
633                    return;
634                };
635                loop {
636                    let Ok((new, _)) = cvar.wait_timeout(ready, Duration::from_millis(10)) else {
637                        return;
638                    };
639                    ready = new;
640                    if *ready {
641                        break;
642                    }
643                }
644            }
645        });
646        Worker {
647            location: worker_location,
648            thread: Some(thread),
649            terminate,
650        }
651    }
652}
653
654pub(crate) enum JobsWakerCommand {
655    MoveTo(JobLocation),
656    ChangePriority(JobPriority),
657}
658
659#[derive(Debug, Default, Clone, PartialEq, Eq)]
660pub enum JobLocation {
661    #[default]
662    Unknown,
663    Local,
664    NonLocal,
665    UnnamedWorker,
666    NamedWorker(String),
667    ExactThread(ThreadId),
668    OtherThanThread(ThreadId),
669}
670
671impl JobLocation {
672    pub fn named_worker(name: impl ToString) -> Self {
673        JobLocation::NamedWorker(name.to_string())
674    }
675
676    pub fn thread(thread: ThreadId) -> Self {
677        JobLocation::ExactThread(thread)
678    }
679
680    pub fn current_thread() -> Self {
681        JobLocation::ExactThread(std::thread::current().id())
682    }
683
684    pub fn other_than_current_thread() -> Self {
685        JobLocation::OtherThanThread(std::thread::current().id())
686    }
687}
688
689impl std::fmt::Display for JobLocation {
690    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
691        match self {
692            JobLocation::Unknown => write!(f, "Unknown"),
693            JobLocation::Local => write!(f, "Local"),
694            JobLocation::NonLocal => write!(f, "Non-local"),
695            JobLocation::UnnamedWorker => write!(f, "Unnamed worker"),
696            JobLocation::NamedWorker(name) => write!(f, "Named worker: {name}"),
697            JobLocation::ExactThread(id) => write!(f, "Exact thread: {id:?}"),
698            JobLocation::OtherThanThread(id) => write!(f, "Other than thread: {id:?}"),
699        }
700    }
701}
702
703#[derive(Default)]
704pub struct JobToken {
705    hash_tokens: Arc<Mutex<HashSet<u64>>>,
706    hash: u64,
707}
708
709impl Drop for JobToken {
710    fn drop(&mut self) {
711        let mut hash_tokens = self.hash_tokens.lock().unwrap();
712        hash_tokens.remove(&self.hash);
713    }
714}
715
716pub(crate) struct JobsWaker {
717    sender: Sender<JobsWakerCommand>,
718    queue: JobQueue,
719    location: JobLocation,
720    context: JobContext,
721    priority: JobPriority,
722    global_meta: Arc<RwLock<HashMap<String, DynamicManagedLazy>>>,
723    worker_meta: Arc<RwLock<HashMap<String, DynamicManagedLazy>>>,
724    local_meta: Arc<RwLock<HashMap<String, DynamicManagedLazy>>>,
725    hash_tokens: Arc<Mutex<HashSet<u64>>>,
726    cancel: Arc<AtomicBool>,
727    suspend: Arc<AtomicBool>,
728}
729
730impl JobsWaker {
731    const VTABLE: RawWakerVTable =
732        RawWakerVTable::new(Self::vtable_clone, |_| {}, |_| {}, Self::vtable_drop);
733
734    fn vtable_clone(data: *const ()) -> RawWaker {
735        let arc = unsafe { Arc::<Self>::from_raw(data as *const Self) };
736        let cloned = arc.clone();
737        std::mem::forget(arc);
738        RawWaker::new(Arc::into_raw(cloned) as *const (), &Self::VTABLE)
739    }
740
741    fn vtable_drop(data: *const ()) {
742        let _ = unsafe { Arc::from_raw(data as *const Self) };
743    }
744
745    #[allow(clippy::too_many_arguments)]
746    pub fn new_waker(
747        queue: JobQueue,
748        location: JobLocation,
749        context: JobContext,
750        priority: JobPriority,
751        global_meta: Arc<RwLock<HashMap<String, DynamicManagedLazy>>>,
752        worker_meta: Arc<RwLock<HashMap<String, DynamicManagedLazy>>>,
753        local_meta: Arc<RwLock<HashMap<String, DynamicManagedLazy>>>,
754        hash_tokens: Arc<Mutex<HashSet<u64>>>,
755        cancel: Arc<AtomicBool>,
756        suspend: Arc<AtomicBool>,
757    ) -> (Waker, Receiver<JobsWakerCommand>) {
758        let (sender, receiver) = std::sync::mpsc::channel();
759        let arc = Arc::new(Self {
760            sender,
761            queue,
762            location,
763            context,
764            priority,
765            global_meta,
766            worker_meta,
767            local_meta,
768            hash_tokens,
769            cancel,
770            suspend,
771        });
772        let raw = RawWaker::new(Arc::into_raw(arc) as *const (), &Self::VTABLE);
773        (unsafe { Waker::from_raw(raw) }, receiver)
774    }
775
776    pub fn try_cast(waker: &Waker) -> Option<&Self> {
777        if waker.vtable() == &Self::VTABLE {
778            unsafe { waker.data().cast::<Self>().as_ref() }
779        } else {
780            None
781        }
782    }
783
784    pub fn command(&self, command: JobsWakerCommand) {
785        let _ = self.sender.send(command);
786    }
787
788    pub fn enqueue(&self, object: JobObject) {
789        self.queue.enqueue(object);
790    }
791
792    pub fn queue(&self) -> JobQueue {
793        self.queue.clone()
794    }
795
796    pub fn location(&self) -> JobLocation {
797        self.location.clone()
798    }
799
800    pub fn context(&self) -> JobContext {
801        self.context
802    }
803
804    pub fn priority(&self) -> JobPriority {
805        self.priority
806    }
807
808    pub fn get_meta(&self, name: &str) -> Option<DynamicManagedLazy> {
809        self.local_meta
810            .read()
811            .ok()
812            .and_then(|meta| meta.get(name).cloned())
813            .or_else(|| {
814                self.worker_meta
815                    .read()
816                    .ok()
817                    .and_then(|meta| meta.get(name).cloned())
818                    .or_else(|| {
819                        self.global_meta
820                            .read()
821                            .ok()
822                            .and_then(|meta| meta.get(name).cloned())
823                    })
824            })
825    }
826
827    pub fn local_meta(&self) -> Arc<RwLock<HashMap<String, DynamicManagedLazy>>> {
828        self.local_meta.clone()
829    }
830
831    pub fn cancel(&self) -> Arc<AtomicBool> {
832        self.cancel.clone()
833    }
834
835    pub fn suspend(&self) -> Arc<AtomicBool> {
836        self.suspend.clone()
837    }
838
839    pub fn acquire_token<T: Hash>(&self, subject: &T) -> Option<JobToken> {
840        let mut hasher = DefaultHasher::new();
841        subject.hash(&mut hasher);
842        let hash = hasher.finish();
843        let mut hash_tokens = self.hash_tokens.lock().unwrap();
844        if hash_tokens.contains(&hash) {
845            None
846        } else {
847            hash_tokens.insert(hash);
848            Some(JobToken {
849                hash_tokens: self.hash_tokens.clone(),
850                hash,
851            })
852        }
853    }
854
855    pub fn acquire_token_timeout<T: Hash>(
856        &self,
857        subject: &T,
858        timeout: Duration,
859    ) -> Option<JobToken> {
860        let mut hasher = DefaultHasher::new();
861        subject.hash(&mut hasher);
862        let hash = hasher.finish();
863        let timer = Instant::now();
864        while timer.elapsed() < timeout {
865            let mut hash_tokens = self.hash_tokens.try_lock().unwrap();
866            if hash_tokens.contains(&hash) {
867                traced_spin_loop();
868                continue;
869            } else {
870                hash_tokens.insert(hash);
871                return Some(JobToken {
872                    hash_tokens: self.hash_tokens.clone(),
873                    hash,
874                });
875            }
876        }
877        None
878    }
879}
880
881impl Wake for JobsWaker {
882    fn wake(self: Arc<Self>) {}
883}
884
885pub struct Jobs {
886    workers: Vec<Worker>,
887    queue: JobQueue,
888    meta: Arc<RwLock<HashMap<String, DynamicManagedLazy>>>,
889    hash_tokens: Arc<Mutex<HashSet<u64>>>,
890    /// (ready, cond var)
891    notify: Arc<(Mutex<bool>, Condvar)>,
892}
893
894impl Drop for Jobs {
895    fn drop(&mut self) {
896        for worker in &self.workers {
897            worker.terminate.store(true, Ordering::Relaxed);
898        }
899        let (lock, cvar) = &*self.notify;
900        if let Ok(mut ready) = lock.lock() {
901            *ready = true;
902        };
903        cvar.notify_all();
904        for worker in &mut self.workers {
905            if let Some(thread) = worker.thread.take() {
906                let _ = thread.join();
907            }
908        }
909    }
910}
911
912impl Default for Jobs {
913    fn default() -> Self {
914        Self::new(
915            available_parallelism()
916                .ok()
917                .map(|v| v.get())
918                .unwrap_or_default(),
919        )
920    }
921}
922
923impl Jobs {
924    pub fn new(count: usize) -> Jobs {
925        let queue = JobQueue::default();
926        let notify = Arc::new((Mutex::default(), Condvar::new()));
927        let global_meta = Arc::new(RwLock::new(HashMap::default()));
928        let worker_meta = Arc::new(RwLock::new(HashMap::default()));
929        let hash_tokens = Arc::new(Mutex::new(HashSet::default()));
930        Jobs {
931            workers: (0..count)
932                .map(|_| {
933                    Worker::new(
934                        JobLocation::UnnamedWorker,
935                        queue.clone(),
936                        global_meta.clone(),
937                        worker_meta.clone(),
938                        hash_tokens.clone(),
939                        notify.clone(),
940                    )
941                })
942                .collect(),
943            queue,
944            meta: global_meta,
945            hash_tokens,
946            notify,
947        }
948    }
949
950    pub fn with_unnamed_worker(mut self) -> Self {
951        self.add_unnamed_worker();
952        self
953    }
954
955    pub fn with_named_worker(mut self, name: impl ToString) -> Self {
956        self.add_named_worker(name);
957        self
958    }
959
960    pub fn add_unnamed_worker(&mut self) {
961        self.add_unnamed_worker_with_meta([]);
962    }
963
964    pub fn add_unnamed_worker_with_meta(
965        &mut self,
966        meta: impl IntoIterator<Item = (String, DynamicManagedLazy)>,
967    ) {
968        self.workers.push(Worker::new(
969            JobLocation::UnnamedWorker,
970            self.queue.clone(),
971            self.meta.clone(),
972            Arc::new(RwLock::new(meta.into_iter().collect::<HashMap<_, _>>())),
973            self.hash_tokens.clone(),
974            self.notify.clone(),
975        ));
976    }
977
978    pub fn add_named_worker(&mut self, name: impl ToString) {
979        self.add_named_worker_with_meta(name, []);
980    }
981
982    pub fn add_named_worker_with_meta(
983        &mut self,
984        name: impl ToString,
985        meta: impl IntoIterator<Item = (String, DynamicManagedLazy)>,
986    ) {
987        self.workers.push(Worker::new(
988            JobLocation::named_worker(name),
989            self.queue.clone(),
990            self.meta.clone(),
991            Arc::new(RwLock::new(meta.into_iter().collect::<HashMap<_, _>>())),
992            self.hash_tokens.clone(),
993            self.notify.clone(),
994        ));
995    }
996
997    pub fn remove_named_worker(&mut self, name: &str) {
998        if let Some(index) = self.workers.iter().position(|worker| {
999            if let JobLocation::NamedWorker(worker_name) = &worker.location {
1000                worker_name == name
1001            } else {
1002                false
1003            }
1004        }) {
1005            let mut worker = self.workers.swap_remove(index);
1006            worker.terminate.store(true, Ordering::Relaxed);
1007            let (lock, cvar) = &*self.notify;
1008            if let Ok(mut ready) = lock.lock() {
1009                *ready = true;
1010            };
1011            cvar.notify_all();
1012            if let Some(thread) = worker.thread.take() {
1013                let _ = thread.join();
1014            }
1015        }
1016    }
1017
1018    pub fn unnamed_workers(&self) -> usize {
1019        self.workers
1020            .iter()
1021            .filter(|worker| worker.location == JobLocation::UnnamedWorker)
1022            .count()
1023    }
1024
1025    pub fn named_workers(&self) -> impl Iterator<Item = &str> {
1026        self.workers.iter().filter_map(|worker| {
1027            if let JobLocation::NamedWorker(name) = &worker.location {
1028                Some(name.as_str())
1029            } else {
1030                None
1031            }
1032        })
1033    }
1034
1035    pub fn set_meta(&self, name: impl ToString, value: DynamicManagedLazy) {
1036        let mut meta = self.meta.write().unwrap();
1037        meta.insert(name.to_string(), value);
1038    }
1039
1040    pub fn unset_meta(&self, name: &str) {
1041        let mut meta = self.meta.write().unwrap();
1042        meta.remove(name);
1043    }
1044
1045    pub fn get_meta<T>(&self, name: &str) -> Option<ManagedLazy<T>> {
1046        let meta = self.meta.read().unwrap();
1047        meta.get(name)
1048            .cloned()
1049            .and_then(|value| value.into_typed::<T>().ok())
1050    }
1051
1052    pub fn get_meta_dynamic(&self, name: &str) -> Option<DynamicManagedLazy> {
1053        let meta = self.meta.read().unwrap();
1054        meta.get(name).cloned()
1055    }
1056
1057    pub fn run_local(&self) {
1058        self.run_local_inner(Duration::MAX, []);
1059    }
1060
1061    pub fn run_local_with_meta(
1062        &self,
1063        meta: impl IntoIterator<Item = (String, DynamicManagedLazy)>,
1064    ) {
1065        self.run_local_inner(Duration::MAX, meta);
1066    }
1067
1068    pub fn run_local_timeout(&self, timeout: Duration) {
1069        self.run_local_inner(timeout, []);
1070    }
1071
1072    pub fn run_local_timeout_with_meta(
1073        &self,
1074        timeout: Duration,
1075        meta: impl IntoIterator<Item = (String, DynamicManagedLazy)>,
1076    ) {
1077        self.run_local_inner(timeout, meta);
1078    }
1079
1080    fn run_local_inner(
1081        &self,
1082        timeout: Duration,
1083        meta: impl IntoIterator<Item = (String, DynamicManagedLazy)>,
1084    ) {
1085        let timer = Instant::now();
1086        let mut pending = vec![];
1087        let worker_meta = Arc::new(RwLock::new(meta.into_iter().collect::<HashMap<_, _>>()));
1088        while let Some(object) = self
1089            .queue
1090            .dequeue(&JobLocation::Local, self.workers.is_empty())
1091        {
1092            let JobObject {
1093                id,
1094                job,
1095                context,
1096                location,
1097                mut priority,
1098                cancel,
1099                suspend,
1100                meta,
1101            } = object;
1102            let mut notify_workers = false;
1103            let (poll_result, receiver) = if suspend.load(Ordering::Relaxed) {
1104                let (_, rx) = std::sync::mpsc::channel();
1105                (Some(job), rx)
1106            } else {
1107                let (waker, receiver) = JobsWaker::new_waker(
1108                    self.queue.clone(),
1109                    location.clone(),
1110                    context,
1111                    priority,
1112                    self.meta.clone(),
1113                    worker_meta.clone(),
1114                    meta.clone(),
1115                    self.hash_tokens.clone(),
1116                    cancel.clone(),
1117                    suspend.clone(),
1118                );
1119                let mut cx = Context::from_waker(&waker);
1120                #[cfg(feature = "tracing")]
1121                let _span = tracing::span!(
1122                    tracing::Level::TRACE,
1123                    "Job poll",
1124                    id = id.to_string(),
1125                    location = location.to_string(),
1126                    context = context.to_string(),
1127                    priority = priority.to_string(),
1128                    thread_id = format!("{:?}", std::thread::current().id()),
1129                )
1130                .entered();
1131                let poll_result = job.poll(&mut cx);
1132                (poll_result, receiver)
1133            };
1134            if let Some(job) = poll_result {
1135                let mut move_to = None;
1136                for command in receiver.try_iter() {
1137                    notify_workers = true;
1138                    match command {
1139                        JobsWakerCommand::MoveTo(location) => move_to = Some(location),
1140                        JobsWakerCommand::ChangePriority(new_priority) => {
1141                            priority = new_priority;
1142                        }
1143                    }
1144                }
1145                if let Some(location) = move_to {
1146                    pending.push(JobObject {
1147                        id,
1148                        job,
1149                        context,
1150                        location,
1151                        priority,
1152                        cancel,
1153                        suspend,
1154                        meta,
1155                    });
1156                } else {
1157                    pending.push(JobObject {
1158                        id,
1159                        job,
1160                        context,
1161                        location,
1162                        priority,
1163                        cancel,
1164                        suspend,
1165                        meta,
1166                    });
1167                }
1168            }
1169            if notify_workers {
1170                let (lock, cvar) = &*self.notify;
1171                if let Ok(mut running) = lock.lock() {
1172                    *running = true;
1173                }
1174                cvar.notify_all();
1175            }
1176            if timer.elapsed() >= timeout {
1177                break;
1178            }
1179        }
1180        self.queue.extend(pending);
1181    }
1182
1183    pub fn submit_queue(&self, queue: &JobQueue) {
1184        self.queue.append(queue);
1185        let (lock, cvar) = &*self.notify;
1186        if let Ok(mut running) = lock.lock() {
1187            *running = true;
1188        }
1189        cvar.notify_all();
1190    }
1191
1192    pub fn run_queue(&self, queue: &JobQueue) {
1193        self.run_queue_inner(queue, Duration::MAX, []);
1194    }
1195
1196    pub fn run_queue_with_meta(
1197        &self,
1198        queue: &JobQueue,
1199        meta: impl IntoIterator<Item = (String, DynamicManagedLazy)>,
1200    ) {
1201        self.run_queue_inner(queue, Duration::MAX, meta);
1202    }
1203
1204    pub fn run_queue_timeout(&self, queue: &JobQueue, timeout: Duration) {
1205        self.run_queue_inner(queue, timeout, []);
1206    }
1207
1208    pub fn run_queue_timeout_with_meta(
1209        &self,
1210        queue: &JobQueue,
1211        timeout: Duration,
1212        meta: impl IntoIterator<Item = (String, DynamicManagedLazy)>,
1213    ) {
1214        self.run_queue_inner(queue, timeout, meta);
1215    }
1216
1217    fn run_queue_inner(
1218        &self,
1219        queue: &JobQueue,
1220        timeout: Duration,
1221        meta: impl IntoIterator<Item = (String, DynamicManagedLazy)>,
1222    ) {
1223        let timer = Instant::now();
1224        let mut pending = vec![];
1225        let worker_meta = Arc::new(RwLock::new(meta.into_iter().collect::<HashMap<_, _>>()));
1226        while let Some(object) = queue.dequeue(&JobLocation::Unknown, true) {
1227            let JobObject {
1228                id,
1229                job,
1230                context,
1231                location,
1232                mut priority,
1233                cancel,
1234                suspend,
1235                meta,
1236            } = object;
1237            let mut notify_workers = false;
1238            let (poll_result, receiver) = if suspend.load(Ordering::Relaxed) {
1239                let (_, rx) = std::sync::mpsc::channel();
1240                (Some(job), rx)
1241            } else {
1242                let (waker, receiver) = JobsWaker::new_waker(
1243                    queue.clone(),
1244                    location.clone(),
1245                    context,
1246                    priority,
1247                    self.meta.clone(),
1248                    worker_meta.clone(),
1249                    meta.clone(),
1250                    self.hash_tokens.clone(),
1251                    cancel.clone(),
1252                    suspend.clone(),
1253                );
1254                let mut cx = Context::from_waker(&waker);
1255                #[cfg(feature = "tracing")]
1256                let _span = tracing::span!(
1257                    tracing::Level::TRACE,
1258                    "Job poll",
1259                    id = id.to_string(),
1260                    location = location.to_string(),
1261                    context = context.to_string(),
1262                    priority = priority.to_string(),
1263                    thread_id = format!("{:?}", std::thread::current().id()),
1264                )
1265                .entered();
1266                let poll_result = job.poll(&mut cx);
1267                (poll_result, receiver)
1268            };
1269            if let Some(job) = poll_result {
1270                let mut move_to = None;
1271                for command in receiver.try_iter() {
1272                    notify_workers = true;
1273                    match command {
1274                        JobsWakerCommand::MoveTo(location) => move_to = Some(location),
1275                        JobsWakerCommand::ChangePriority(new_priority) => {
1276                            priority = new_priority;
1277                        }
1278                    }
1279                }
1280                if let Some(location) = move_to {
1281                    pending.push(JobObject {
1282                        id,
1283                        job,
1284                        context,
1285                        location,
1286                        priority,
1287                        cancel,
1288                        suspend,
1289                        meta,
1290                    });
1291                } else {
1292                    pending.push(JobObject {
1293                        id,
1294                        job,
1295                        context,
1296                        location,
1297                        priority,
1298                        cancel,
1299                        suspend,
1300                        meta,
1301                    });
1302                }
1303            }
1304            if notify_workers {
1305                let (lock, cvar) = &*self.notify;
1306                if let Ok(mut running) = lock.lock() {
1307                    *running = true;
1308                }
1309                cvar.notify_all();
1310            }
1311            if timer.elapsed() >= timeout {
1312                break;
1313            }
1314        }
1315        self.queue.extend(pending);
1316    }
1317
1318    #[inline]
1319    pub fn is_empty(&self) -> bool {
1320        self.workers.is_empty()
1321    }
1322
1323    #[inline]
1324    pub fn len(&self) -> usize {
1325        self.workers.len()
1326    }
1327
1328    pub fn spawn_on<T: Send + 'static>(
1329        &self,
1330        location: JobLocation,
1331        priority: JobPriority,
1332        job: impl Future<Output = T> + Send + Sync + 'static,
1333    ) -> Result<JobHandle<T>, Box<dyn Error>> {
1334        let handle = self.queue.spawn_on(location, priority, job);
1335        let (lock, cvar) = &*self.notify;
1336        let mut running = lock.lock().map_err(|error| format!("{error}"))?;
1337        *running = true;
1338        cvar.notify_all();
1339        Ok(handle)
1340    }
1341
1342    pub fn spawn_on_with_meta<T: Send + 'static>(
1343        &self,
1344        location: JobLocation,
1345        priority: JobPriority,
1346        meta: impl IntoIterator<Item = (String, DynamicManagedLazy)>,
1347        job: impl Future<Output = T> + Send + Sync + 'static,
1348    ) -> Result<JobHandle<T>, Box<dyn Error>> {
1349        let handle = self.queue.spawn_on_with_meta(location, priority, meta, job);
1350        let (lock, cvar) = &*self.notify;
1351        let mut running = lock.lock().map_err(|error| format!("{error}"))?;
1352        *running = true;
1353        cvar.notify_all();
1354        Ok(handle)
1355    }
1356
1357    pub fn queue_on<T: Send + 'static>(
1358        &self,
1359        location: JobLocation,
1360        priority: JobPriority,
1361        job: impl FnOnce(JobContext) -> T + Send + Sync + 'static,
1362    ) -> Result<JobHandle<T>, Box<dyn Error>> {
1363        let handle = self.queue.queue_on(location, priority, job);
1364        let (lock, cvar) = &*self.notify;
1365        let mut running = lock.lock().map_err(|error| format!("{error}"))?;
1366        *running = true;
1367        cvar.notify_all();
1368        Ok(handle)
1369    }
1370
1371    pub fn broadcast<T: Send + 'static>(
1372        &self,
1373        job: impl Fn(JobContext) -> T + Send + Sync + 'static,
1374    ) -> Result<AllJobsHandle<T>, Box<dyn Error>> {
1375        self.broadcast_n(self.workers.len(), job)
1376    }
1377
1378    pub fn broadcast_n<T: Send + 'static>(
1379        &self,
1380        work_groups: usize,
1381        job: impl Fn(JobContext) -> T + Send + Sync + 'static,
1382    ) -> Result<AllJobsHandle<T>, Box<dyn Error>> {
1383        if self.workers.is_empty() {
1384            return Ok(AllJobsHandle::new(job(JobContext {
1385                work_group_index: 0,
1386                work_groups_count: 1,
1387            })));
1388        }
1389        let job = Arc::new(job);
1390        let handle = AllJobsHandle {
1391            jobs: (0..work_groups)
1392                .map(|group| {
1393                    let job = Arc::clone(&job);
1394                    let handle = JobHandle::<T>::default();
1395                    let handle2 = handle.clone();
1396                    self.queue.enqueue(JobObject {
1397                        id: ID::new(),
1398                        job: Job(Box::pin(async move {
1399                            handle2.put(job(context().await));
1400                        })),
1401                        context: JobContext {
1402                            work_group_index: group,
1403                            work_groups_count: work_groups,
1404                        },
1405                        location: JobLocation::other_than_current_thread(),
1406                        priority: JobPriority::High,
1407                        cancel: handle.cancel.clone(),
1408                        suspend: handle.suspend.clone(),
1409                        meta: handle.meta.clone(),
1410                    });
1411                    handle
1412                })
1413                .collect::<Vec<_>>(),
1414        };
1415        let (lock, cvar) = &*self.notify;
1416        let mut running = lock.lock().map_err(|error| format!("{error}"))?;
1417        *running = true;
1418        cvar.notify_all();
1419        Ok(handle)
1420    }
1421}
1422
1423pub struct ScopedJobs<'env, T: Send + 'static> {
1424    jobs: &'env Jobs,
1425    handles: AllJobsHandle<T>,
1426}
1427
1428impl<T: Send + 'static> Drop for ScopedJobs<'_, T> {
1429    fn drop(&mut self) {
1430        self.execute_inner();
1431    }
1432}
1433
1434impl<'env, T: Send + 'static> ScopedJobs<'env, T> {
1435    pub fn new(jobs: &'env Jobs) -> Self {
1436        Self {
1437            jobs,
1438            handles: Default::default(),
1439        }
1440    }
1441
1442    pub fn spawn_on(
1443        &mut self,
1444        location: JobLocation,
1445        priority: JobPriority,
1446        job: impl Future<Output = T> + Send + Sync + 'env,
1447    ) -> Result<(), Box<dyn Error>> {
1448        let job = unsafe {
1449            std::mem::transmute::<
1450                Pin<Box<dyn Future<Output = T> + Send + Sync + 'env>>,
1451                Pin<Box<dyn Future<Output = T> + Send + Sync + 'static>>,
1452            >(Box::pin(job))
1453        };
1454        let handle = self.jobs.spawn_on(location, priority, job)?;
1455        self.handles.add(handle);
1456        Ok(())
1457    }
1458
1459    pub fn queue_on(
1460        &mut self,
1461        location: JobLocation,
1462        priority: JobPriority,
1463        job: impl FnOnce(JobContext) -> T + Send + Sync + 'env,
1464    ) -> Result<(), Box<dyn Error>> {
1465        let job = unsafe {
1466            std::mem::transmute::<
1467                Box<dyn FnOnce(JobContext) -> T + Send + Sync + 'env>,
1468                Box<dyn FnOnce(JobContext) -> T + Send + Sync + 'static>,
1469            >(Box::new(job))
1470        };
1471        self.handles
1472            .add(self.jobs.queue_on(location, priority, job)?);
1473        Ok(())
1474    }
1475
1476    pub fn broadcast(
1477        &mut self,
1478        job: impl Fn(JobContext) -> T + Send + Sync + 'env,
1479    ) -> Result<(), Box<dyn Error>> {
1480        let job = unsafe {
1481            std::mem::transmute::<
1482                Box<dyn Fn(JobContext) -> T + Send + Sync + 'env>,
1483                Box<dyn Fn(JobContext) -> T + Send + Sync + 'static>,
1484            >(Box::new(job))
1485        };
1486        self.handles.extend(self.jobs.broadcast(job)?.into_inner());
1487        Ok(())
1488    }
1489
1490    pub fn broadcast_n(
1491        &mut self,
1492        work_groups: usize,
1493        job: impl Fn(JobContext) -> T + Send + Sync + 'env,
1494    ) -> Result<(), Box<dyn Error>> {
1495        let job = unsafe {
1496            std::mem::transmute::<
1497                Box<dyn Fn(JobContext) -> T + Send + Sync + 'env>,
1498                Box<dyn Fn(JobContext) -> T + Send + Sync + 'static>,
1499            >(Box::new(job))
1500        };
1501        self.handles
1502            .extend(self.jobs.broadcast_n(work_groups, job)?.into_inner());
1503        Ok(())
1504    }
1505
1506    pub fn execute(mut self) -> Vec<T> {
1507        self.execute_inner()
1508    }
1509
1510    fn execute_inner(&mut self) -> Vec<T> {
1511        std::mem::take(&mut self.handles).wait().unwrap_or_default()
1512    }
1513}
1514
1515#[cfg(test)]
1516mod tests {
1517    use super::*;
1518    use crate::coroutine::{
1519        acquire_token, block_on, location, meta, move_to, on_exit, queue_on, spawn_on, suspend,
1520        wait_polls, wait_time, with_all, with_any, yield_now,
1521    };
1522    use std::sync::atomic::AtomicUsize;
1523
1524    #[test]
1525    fn test_jobs() {
1526        fn is_async<T: Send + Sync>() {}
1527
1528        is_async::<Jobs>();
1529
1530        let jobs = Jobs::default();
1531        let data = (0..100).collect::<Vec<_>>();
1532        let data2 = data.clone();
1533
1534        let job = jobs
1535            .queue_on(JobLocation::Unknown, JobPriority::Normal, move |_| {
1536                data.into_iter().sum::<usize>()
1537            })
1538            .unwrap();
1539
1540        let result = job.wait().unwrap();
1541        assert_eq!(result, 4950);
1542
1543        let job = jobs
1544            .queue_on(JobLocation::Local, JobPriority::Normal, move |_| {
1545                data2.into_iter().sum::<usize>()
1546            })
1547            .unwrap();
1548
1549        while !job.is_done() {
1550            jobs.run_local();
1551        }
1552        let result = job.try_take().unwrap().unwrap();
1553        assert_eq!(result, 4950);
1554
1555        let job = jobs.broadcast(move |ctx| ctx.work_group_index).unwrap();
1556        let result = job.wait().unwrap().into_iter().sum::<usize>();
1557        assert_eq!(result, (0..jobs.workers.len()).sum());
1558
1559        let job = jobs
1560            .broadcast_n(10, move |ctx| ctx.work_group_index)
1561            .unwrap();
1562        let result = job.wait().unwrap().into_iter().sum::<usize>();
1563        assert_eq!(result, {
1564            let mut accum = 0;
1565            for index in 0..10 {
1566                accum += index;
1567            }
1568            accum
1569        });
1570    }
1571
1572    #[test]
1573    fn test_local_thread_only_jobs() {
1574        let jobs = Jobs::new(0);
1575        let data = (0..100).collect::<Vec<_>>();
1576        let data2 = data.clone();
1577        let data3 = data.clone();
1578        let data4 = data.clone();
1579        let data5 = data.clone();
1580        let data6 = data.clone();
1581
1582        let job = jobs
1583            .queue_on(JobLocation::Unknown, JobPriority::Normal, move |_| {
1584                data.into_iter().sum::<usize>()
1585            })
1586            .unwrap();
1587
1588        while !job.is_done() {
1589            jobs.run_local();
1590        }
1591        let result = job.try_take().unwrap().unwrap();
1592        assert_eq!(result, 4950);
1593
1594        let job = jobs
1595            .queue_on(JobLocation::Local, JobPriority::Normal, move |_| {
1596                data2.into_iter().sum::<usize>()
1597            })
1598            .unwrap();
1599
1600        while !job.is_done() {
1601            jobs.run_local();
1602        }
1603        let result = job.try_take().unwrap().unwrap();
1604        assert_eq!(result, 4950);
1605
1606        let job = jobs
1607            .queue_on(JobLocation::UnnamedWorker, JobPriority::Normal, move |_| {
1608                data3.into_iter().sum::<usize>()
1609            })
1610            .unwrap();
1611
1612        while !job.is_done() {
1613            jobs.run_local();
1614        }
1615        let result = job.try_take().unwrap().unwrap();
1616        assert_eq!(result, 4950);
1617
1618        let job = jobs
1619            .queue_on(
1620                JobLocation::named_worker("temp"),
1621                JobPriority::Normal,
1622                move |_| data4.into_iter().sum::<usize>(),
1623            )
1624            .unwrap();
1625
1626        while !job.is_done() {
1627            jobs.run_local();
1628        }
1629        let result = job.try_take().unwrap().unwrap();
1630        assert_eq!(result, 4950);
1631
1632        let job = jobs
1633            .queue_on(
1634                JobLocation::current_thread(),
1635                JobPriority::Normal,
1636                move |_| data5.into_iter().sum::<usize>(),
1637            )
1638            .unwrap();
1639
1640        while !job.is_done() {
1641            jobs.run_local();
1642        }
1643        let result = job.try_take().unwrap().unwrap();
1644        assert_eq!(result, 4950);
1645
1646        let job = jobs
1647            .queue_on(
1648                JobLocation::other_than_current_thread(),
1649                JobPriority::Normal,
1650                move |_| data6.into_iter().sum::<usize>(),
1651            )
1652            .unwrap();
1653
1654        while !job.is_done() {
1655            jobs.run_local();
1656        }
1657        let result = job.try_take().unwrap().unwrap();
1658        assert_eq!(result, 4950);
1659
1660        let job = jobs.broadcast(move |_| 1).unwrap();
1661
1662        while !job.is_done() {
1663            jobs.run_local();
1664        }
1665        let result = job.wait().unwrap().into_iter().sum::<usize>();
1666        assert_eq!(result, 1);
1667
1668        let job = jobs.broadcast_n(10, move |_| 1).unwrap();
1669
1670        while !job.is_done() {
1671            jobs.run_local();
1672        }
1673        let result = job.wait().unwrap().into_iter().sum::<usize>();
1674        assert_eq!(result, 1);
1675    }
1676
1677    #[test]
1678    fn test_queue_jobs() {
1679        let jobs = Jobs::new(0);
1680        let queue = JobQueue::default();
1681        let data = (0..100).collect::<Vec<_>>();
1682
1683        let job = queue.queue_on(JobLocation::Unknown, JobPriority::Normal, move |_| {
1684            data.into_iter().sum::<usize>()
1685        });
1686
1687        while !job.is_done() {
1688            jobs.run_queue(&queue);
1689        }
1690        let result = job.try_take().unwrap().unwrap();
1691        assert_eq!(result, 4950);
1692    }
1693
1694    #[test]
1695    fn test_scoped_jobs() {
1696        let jobs = Jobs::default();
1697        let mut data = (0..100).collect::<Vec<_>>();
1698
1699        let mut scope = ScopedJobs::new(&jobs);
1700        scope
1701            .queue_on(JobLocation::Unknown, JobPriority::Normal, |_| {
1702                for value in &mut data {
1703                    *value *= 2;
1704                }
1705                data.iter().copied().sum::<usize>()
1706            })
1707            .unwrap();
1708
1709        let result = scope.execute().into_iter().sum::<usize>();
1710        assert_eq!(result, 9900);
1711    }
1712
1713    #[test]
1714    fn test_futures_spawn() {
1715        let jobs = Jobs::default();
1716        let data = (0..100).collect::<Vec<_>>();
1717        let data2 = data.clone();
1718
1719        let job = jobs
1720            .spawn_on(JobLocation::Unknown, JobPriority::Normal, async move {
1721                let mut result = 0;
1722                for value in data {
1723                    result += value;
1724                    yield_now().await;
1725                }
1726                result
1727            })
1728            .unwrap();
1729
1730        let result = block_on(job).unwrap();
1731        assert_eq!(result, 4950);
1732
1733        let job = jobs
1734            .spawn_on(JobLocation::Local, JobPriority::Normal, async move {
1735                let mut result = 0;
1736                for value in data2 {
1737                    result += value;
1738                    yield_now().await;
1739                }
1740                result
1741            })
1742            .unwrap();
1743
1744        while !job.is_done() {
1745            jobs.run_local();
1746        }
1747        let result = job.try_take().unwrap().unwrap();
1748        assert_eq!(result, 4950);
1749
1750        let job = jobs
1751            .spawn_on(JobLocation::Unknown, JobPriority::Normal, async {
1752                let result = Arc::new(AtomicUsize::new(0));
1753                let result1 = result.clone();
1754                let result2 = result.clone();
1755                with_all(vec![
1756                    Box::pin(async move {
1757                        wait_time(Duration::from_millis(10)).await;
1758                        result1.fetch_add(1, Ordering::SeqCst);
1759                    }),
1760                    Box::pin(async move {
1761                        wait_time(Duration::from_millis(5)).await;
1762                        result2.fetch_add(2, Ordering::SeqCst);
1763                    }),
1764                ])
1765                .await;
1766                result.load(Ordering::SeqCst)
1767            })
1768            .unwrap();
1769        let result = block_on(job).unwrap();
1770        assert_eq!(result, 3);
1771
1772        let job = jobs
1773            .spawn_on(JobLocation::Unknown, JobPriority::Normal, async {
1774                let result = Arc::new(AtomicUsize::new(0));
1775                let result1 = result.clone();
1776                let result2 = result.clone();
1777                with_any(vec![
1778                    Box::pin(async move {
1779                        wait_polls(10).await;
1780                        result1.store(1, Ordering::SeqCst);
1781                    }),
1782                    Box::pin(async move {
1783                        wait_polls(5).await;
1784                        result2.store(2, Ordering::SeqCst);
1785                    }),
1786                ])
1787                .await;
1788                result.load(Ordering::SeqCst)
1789            })
1790            .unwrap();
1791        let result = block_on(job).unwrap();
1792        assert!(result > 0);
1793    }
1794
1795    #[test]
1796    fn test_futures_move() {
1797        let jobs = Jobs::new(1).with_named_worker("foo");
1798
1799        let job = jobs
1800            .spawn_on(JobLocation::Local, JobPriority::Normal, async {
1801                yield_now().await;
1802                // A: Local
1803                println!("A: {:?}", location().await);
1804                move_to(JobLocation::Unknown).await;
1805                // B: UnnamedWorker
1806                println!("B: {:?}", location().await);
1807                move_to(JobLocation::named_worker("foo")).await;
1808                // C: NamedWorker("foo")
1809                println!("C: {:?}", location().await);
1810                move_to(JobLocation::Local).await;
1811                // D: Local
1812                println!("D: {:?}", location().await);
1813                42
1814            })
1815            .unwrap();
1816
1817        while !job.is_done() {
1818            jobs.run_local();
1819        }
1820        let result = job.try_take().unwrap().unwrap();
1821        assert_eq!(result, 42);
1822    }
1823
1824    #[test]
1825    fn test_futures_schedule() {
1826        let jobs = Jobs::new(1).with_named_worker("foo");
1827
1828        let job = jobs
1829            .spawn_on(JobLocation::Local, JobPriority::Normal, async {
1830                spawn_on(JobLocation::Local, JobPriority::Normal, async {
1831                    println!("A: {:?}", location().await);
1832                })
1833                .await;
1834                spawn_on(JobLocation::Unknown, JobPriority::Normal, async {
1835                    println!("B: {:?}", location().await);
1836                })
1837                .await;
1838                spawn_on(
1839                    JobLocation::named_worker("foo"),
1840                    JobPriority::Normal,
1841                    async {
1842                        println!("C: {:?}", location().await);
1843                    },
1844                )
1845                .await;
1846                queue_on(JobLocation::Local, JobPriority::Normal, |_| {
1847                    println!("D: Local closure");
1848                })
1849                .await;
1850                queue_on(JobLocation::Unknown, JobPriority::Normal, |_| {
1851                    println!("E: Unnamed worker closure");
1852                })
1853                .await;
1854                queue_on(
1855                    JobLocation::named_worker("foo"),
1856                    JobPriority::Normal,
1857                    |_| {
1858                        println!("F: Named worker closure");
1859                    },
1860                )
1861                .await;
1862                42
1863            })
1864            .unwrap();
1865
1866        while !job.is_done() {
1867            jobs.run_local();
1868        }
1869        let result = job.try_take().unwrap().unwrap();
1870        assert_eq!(result, 42);
1871    }
1872
1873    #[test]
1874    fn test_futures_meta() {
1875        let jobs = Jobs::default();
1876        let mut value = 42usize;
1877        let (value_lazy, _value_lifetime) = DynamicManagedLazy::make(&mut value);
1878        jobs.set_meta("value", value_lazy);
1879
1880        let job = jobs
1881            .spawn_on(JobLocation::Unknown, JobPriority::Normal, async {
1882                let value = meta::<usize>("value").await.unwrap();
1883                *value.read().unwrap()
1884            })
1885            .unwrap();
1886
1887        let result = block_on(job).unwrap();
1888        assert_eq!(result, 42);
1889
1890        let mut flag = true;
1891        let (flag_lazy, _flag_lifetime) = DynamicManagedLazy::make(&mut flag);
1892        let job = jobs
1893            .spawn_on_with_meta(
1894                JobLocation::Unknown,
1895                JobPriority::Normal,
1896                [("flag".to_owned(), flag_lazy)],
1897                async {
1898                    let flag = meta::<bool>("flag").await.unwrap();
1899                    *flag.read().unwrap()
1900                },
1901            )
1902            .unwrap();
1903
1904        let result = block_on(job).unwrap();
1905        assert!(result);
1906
1907        let mut flag = true;
1908        let (flag_lazy, _flag_lifetime) = DynamicManagedLazy::make(&mut flag);
1909        let job = jobs
1910            .spawn_on(JobLocation::Local, JobPriority::Normal, async {
1911                let flag = meta::<bool>("flag").await.unwrap();
1912                *flag.read().unwrap()
1913            })
1914            .unwrap();
1915
1916        while !job.is_done() {
1917            jobs.run_local_with_meta([("flag".to_owned(), flag_lazy.clone())]);
1918        }
1919        let result = job.try_take().unwrap().unwrap();
1920        assert!(result);
1921    }
1922
1923    #[test]
1924    fn test_futures_acquire_token() {
1925        let jobs = Jobs::new(3);
1926
1927        let a = jobs
1928            .spawn_on(JobLocation::Unknown, JobPriority::Normal, async {
1929                let _token = acquire_token(&"foo").await;
1930                std::thread::sleep(Duration::from_millis(50));
1931                for i in 0..10 {
1932                    println!("{i}");
1933                    std::thread::sleep(Duration::from_millis(10));
1934                }
1935            })
1936            .unwrap();
1937        let b = jobs
1938            .spawn_on(JobLocation::Unknown, JobPriority::Normal, async {
1939                let _token = acquire_token(&"foo").await;
1940                std::thread::sleep(Duration::from_millis(50));
1941                for i in 10..20 {
1942                    println!("{i}");
1943                    std::thread::sleep(Duration::from_millis(10));
1944                }
1945            })
1946            .unwrap();
1947        block_on(AllJobsHandle::many([a, b])).unwrap();
1948
1949        #[cfg(not(miri))]
1950        {
1951            use std::path::Path;
1952
1953            async fn load_file(path: impl AsRef<Path>) -> Option<String> {
1954                let path = path.as_ref();
1955                let _token = acquire_token(&path).await;
1956                std::fs::read_to_string(path).ok()
1957            }
1958
1959            async fn save_file(path: impl AsRef<Path>, content: &str) {
1960                let path = path.as_ref();
1961                let _token = acquire_token(&path).await;
1962                let _ = std::fs::write(path, content);
1963            }
1964
1965            const PATH: &str = "./resources/test.txt";
1966            let content = "Hello, Jobs!".repeat(1000);
1967
1968            let a = jobs
1969                .spawn_on(JobLocation::Unknown, JobPriority::Normal, async move {
1970                    std::thread::sleep(Duration::from_millis(50));
1971                    save_file(PATH, &content).await;
1972                })
1973                .unwrap();
1974            let b = jobs
1975                .spawn_on(JobLocation::Unknown, JobPriority::Normal, async {
1976                    std::thread::sleep(Duration::from_millis(50));
1977                    let _ = load_file(PATH).await;
1978                })
1979                .unwrap();
1980            block_on(AllJobsHandle::many([a, b])).unwrap();
1981        }
1982    }
1983
1984    #[test]
1985    fn test_futures_on_exit() {
1986        let jobs = Jobs::default();
1987
1988        let state = Arc::new(AtomicBool::new(false));
1989        let state1 = state.clone();
1990        let job = jobs
1991            .spawn_on(JobLocation::Unknown, JobPriority::Normal, async {
1992                let _exit = on_exit(async move {
1993                    state1.store(true, Ordering::SeqCst);
1994                })
1995                .await;
1996                42
1997            })
1998            .unwrap();
1999
2000        let result = block_on(job).unwrap();
2001        assert_eq!(result, 42);
2002        std::thread::sleep(Duration::from_millis(100));
2003        assert!(state.load(Ordering::SeqCst));
2004
2005        let state = Arc::new(AtomicBool::new(false));
2006        let state1 = state.clone();
2007        let job = jobs
2008            .spawn_on(JobLocation::Unknown, JobPriority::Normal, async {
2009                let exit = on_exit(async move {
2010                    state1.store(true, Ordering::SeqCst);
2011                })
2012                .await;
2013                exit.invalidate();
2014                42
2015            })
2016            .unwrap();
2017
2018        let result = block_on(job).unwrap();
2019        assert_eq!(result, 42);
2020        std::thread::sleep(Duration::from_millis(100));
2021        assert!(!state.load(Ordering::SeqCst));
2022
2023        let state = Arc::new(AtomicBool::new(false));
2024        let state1 = state.clone();
2025        let job = jobs
2026            .spawn_on(JobLocation::Local, JobPriority::Normal, async {
2027                let exit = on_exit(async move {
2028                    state1.store(true, Ordering::SeqCst);
2029                })
2030                .await;
2031                // job gets cancelled at this point,
2032                // so exit future won't get invalidated.
2033                exit.invalidate();
2034                42
2035            })
2036            .unwrap();
2037
2038        job.cancel();
2039        while !job.is_done() {
2040            jobs.run_local();
2041        }
2042        assert_eq!(job.try_take(), Some(None));
2043        assert!(!state.load(Ordering::SeqCst));
2044    }
2045
2046    #[test]
2047    fn test_futures_suspend() {
2048        let jobs = Jobs::default();
2049
2050        let job = jobs
2051            .spawn_on(JobLocation::Local, JobPriority::Normal, async {
2052                suspend().await;
2053                42
2054            })
2055            .unwrap();
2056
2057        assert!(!job.is_done());
2058        for _ in 0..10 {
2059            jobs.run_local();
2060        }
2061        assert!(!job.is_done());
2062
2063        job.resume();
2064        while !job.is_done() {
2065            jobs.run_local();
2066        }
2067        let result = job.try_take().unwrap().unwrap();
2068        assert_eq!(result, 42);
2069    }
2070}