1pub mod coroutine;
2
3use crate::coroutine::context;
4#[cfg(target_arch = "wasm32")]
5use instant::{Duration, Instant};
6use intuicio_data::managed::{DynamicManagedLazy, ManagedLazy};
7#[cfg(not(target_arch = "wasm32"))]
8use std::time::{Duration, Instant};
9use std::{
10 collections::{HashMap, HashSet, VecDeque},
11 error::Error,
12 hash::{DefaultHasher, Hash, Hasher},
13 pin::Pin,
14 sync::{
15 Arc, Condvar, Mutex, RwLock,
16 atomic::{AtomicBool, Ordering},
17 mpsc::{Receiver, Sender},
18 },
19 task::{Context, Poll, RawWaker, RawWakerVTable, Wake, Waker},
20 thread::{JoinHandle, ThreadId, available_parallelism, spawn},
21};
22use typid::ID;
23
24struct Job(Pin<Box<dyn Future<Output = ()> + Send + Sync>>);
25
26impl Job {
27 fn poll(mut self, cx: &mut Context<'_>) -> Option<Self> {
28 match self.0.as_mut().poll(cx) {
29 Poll::Ready(_) => None,
30 Poll::Pending => Some(self),
31 }
32 }
33}
34
35#[inline]
36fn traced_spin_loop() {
37 #[cfg(feature = "deadlock-trace")]
38 println!(
39 "* DEADLOCK BACKTRACE: {}",
40 std::backtrace::Backtrace::force_capture()
41 );
42 std::hint::spin_loop();
43}
44
45pub struct JobHandle<T: Send + 'static> {
46 result: Arc<Mutex<Option<Option<T>>>>,
47 cancel: Arc<AtomicBool>,
48 suspend: Arc<AtomicBool>,
49 meta: Arc<RwLock<HashMap<String, DynamicManagedLazy>>>,
50}
51
52impl<T: Send + 'static> Default for JobHandle<T> {
53 fn default() -> Self {
54 Self {
55 result: Default::default(),
56 cancel: Default::default(),
57 suspend: Default::default(),
58 meta: Default::default(),
59 }
60 }
61}
62
63impl<T: Send + 'static> JobHandle<T> {
64 pub fn new(value: T) -> Self {
65 Self {
66 result: Arc::new(Mutex::new(Some(Some(value)))),
67 cancel: Default::default(),
68 suspend: Default::default(),
69 meta: Default::default(),
70 }
71 }
72
73 pub(crate) fn with_meta(
74 self,
75 iter: impl IntoIterator<Item = (String, DynamicManagedLazy)>,
76 ) -> Self {
77 if let Ok(mut meta) = self.meta.write() {
78 meta.extend(iter);
79 }
80 self
81 }
82
83 pub fn is_cancelled(&self) -> bool {
84 self.cancel.load(Ordering::Relaxed)
85 }
86
87 pub fn is_suspended(&self) -> bool {
88 self.suspend.load(Ordering::Relaxed)
89 }
90
91 pub fn is_done(&self) -> bool {
92 self.result
93 .try_lock()
94 .ok()
95 .map(|guard| guard.is_some())
96 .unwrap_or_default()
97 }
98
99 pub fn try_take(&self) -> Option<Option<T>> {
100 self.result
101 .try_lock()
102 .ok()
103 .and_then(|mut result| result.take())
104 }
105
106 pub fn wait(self) -> Option<T> {
107 loop {
108 if let Some(result) = self.try_take() {
109 return result;
110 } else {
111 traced_spin_loop();
112 }
113 }
114 }
115
116 pub fn cancel(&self) {
117 self.cancel.store(true, Ordering::Relaxed);
118 if let Ok(mut result) = self.result.lock() {
119 *result = Some(None);
120 }
121 self.resume();
122 }
123
124 pub fn suspend(&self) {
125 self.suspend.store(true, Ordering::Relaxed);
126 }
127
128 pub fn resume(&self) {
129 self.suspend.store(false, Ordering::Relaxed);
130 }
131
132 fn put(&self, value: T) {
133 if let Ok(mut result) = self.result.lock() {
134 *result = Some(Some(value));
135 }
136 }
137}
138
139impl<T: Send + 'static> Clone for JobHandle<T> {
140 fn clone(&self) -> Self {
141 Self {
142 result: self.result.clone(),
143 cancel: self.cancel.clone(),
144 suspend: self.suspend.clone(),
145 meta: self.meta.clone(),
146 }
147 }
148}
149
150impl<T: Send + 'static> Future for JobHandle<T> {
151 type Output = Option<T>;
152
153 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
154 if let Some(result) = self.try_take() {
155 cx.waker().wake_by_ref();
156 Poll::Ready(result)
157 } else {
158 cx.waker().wake_by_ref();
159 Poll::Pending
160 }
161 }
162}
163
164pub struct AllJobsHandle<T: Send + 'static> {
165 jobs: Vec<JobHandle<T>>,
166}
167
168impl<T: Send + 'static> Default for AllJobsHandle<T> {
169 fn default() -> Self {
170 Self {
171 jobs: Default::default(),
172 }
173 }
174}
175
176impl<T: Send + 'static> AllJobsHandle<T> {
177 pub fn new(value: T) -> Self {
178 Self {
179 jobs: vec![JobHandle::new(value)],
180 }
181 }
182
183 pub fn into_inner(self) -> Vec<JobHandle<T>> {
184 self.jobs
185 }
186
187 pub fn many(handles: impl IntoIterator<Item = JobHandle<T>>) -> Self {
188 Self {
189 jobs: handles.into_iter().collect(),
190 }
191 }
192
193 pub fn add(&mut self, handle: JobHandle<T>) {
194 self.jobs.push(handle);
195 }
196
197 pub fn extend(&mut self, handles: impl IntoIterator<Item = JobHandle<T>>) {
198 self.jobs.extend(handles);
199 }
200
201 pub fn is_done(&self) -> bool {
202 self.jobs.iter().all(|job| job.is_done())
203 }
204
205 pub fn try_take(&self) -> Option<Option<Vec<T>>> {
206 self.is_done()
207 .then(|| self.jobs.iter().flat_map(|job| job.try_take()).collect())
208 }
209
210 pub fn wait(self) -> Option<Vec<T>> {
211 self.jobs.into_iter().map(|job| job.wait()).collect()
212 }
213}
214
215impl<T: Send + 'static> Clone for AllJobsHandle<T> {
216 fn clone(&self) -> Self {
217 Self {
218 jobs: self.jobs.clone(),
219 }
220 }
221}
222
223impl<T: Send + 'static> Future for AllJobsHandle<T> {
224 type Output = Option<Vec<T>>;
225
226 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
227 if let Some(result) = self.try_take() {
228 cx.waker().wake_by_ref();
229 Poll::Ready(result)
230 } else {
231 cx.waker().wake_by_ref();
232 Poll::Pending
233 }
234 }
235}
236
237pub struct AnyJobHandle<T: Send + 'static> {
238 jobs: Vec<JobHandle<T>>,
239}
240
241impl<T: Send + 'static> Default for AnyJobHandle<T> {
242 fn default() -> Self {
243 Self {
244 jobs: Default::default(),
245 }
246 }
247}
248
249impl<T: Send + 'static> AnyJobHandle<T> {
250 pub fn new(value: T) -> Self {
251 Self {
252 jobs: vec![JobHandle::new(value)],
253 }
254 }
255
256 pub fn into_inner(self) -> Vec<JobHandle<T>> {
257 self.jobs
258 }
259
260 pub fn many(handles: impl IntoIterator<Item = JobHandle<T>>) -> Self {
261 Self {
262 jobs: handles.into_iter().collect(),
263 }
264 }
265
266 pub fn add(&mut self, handle: JobHandle<T>) {
267 self.jobs.push(handle);
268 }
269
270 pub fn extend(&mut self, handles: impl IntoIterator<Item = JobHandle<T>>) {
271 self.jobs.extend(handles);
272 }
273
274 pub fn is_done(&self) -> bool {
275 self.jobs.iter().any(|job| job.is_done())
276 }
277
278 pub fn try_take(&self) -> Option<Option<T>> {
279 self.is_done()
280 .then(|| self.jobs.iter().find_map(|job| job.try_take()).flatten())
281 }
282
283 pub fn wait(self) -> Option<T> {
284 loop {
285 if let Some(result) = self.try_take() {
286 return result;
287 } else {
288 traced_spin_loop();
289 }
290 }
291 }
292}
293
294impl<T: Send + 'static> Clone for AnyJobHandle<T> {
295 fn clone(&self) -> Self {
296 Self {
297 jobs: self.jobs.clone(),
298 }
299 }
300}
301
302impl<T: Send + 'static> Future for AnyJobHandle<T> {
303 type Output = Option<T>;
304
305 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
306 if let Some(result) = self.try_take() {
307 cx.waker().wake_by_ref();
308 Poll::Ready(result)
309 } else {
310 cx.waker().wake_by_ref();
311 Poll::Pending
312 }
313 }
314}
315
316#[derive(Debug, Clone, Copy, PartialEq, Eq)]
317pub struct JobContext {
318 pub work_group_index: usize,
319 pub work_groups_count: usize,
320}
321
322impl std::fmt::Display for JobContext {
323 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
324 write!(
325 f,
326 "JobContext {{ work_group_index: {}, work_groups_count: {} }}",
327 self.work_group_index, self.work_groups_count
328 )
329 }
330}
331
332#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
333pub enum JobPriority {
334 #[default]
335 Normal,
336 High,
337}
338
339impl std::fmt::Display for JobPriority {
340 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
341 match self {
342 JobPriority::Normal => write!(f, "Normal"),
343 JobPriority::High => write!(f, "High"),
344 }
345 }
346}
347
348struct JobObject {
349 pub id: ID<Jobs>,
350 pub job: Job,
351 pub context: JobContext,
352 pub location: JobLocation,
353 pub priority: JobPriority,
354 pub cancel: Arc<AtomicBool>,
355 pub suspend: Arc<AtomicBool>,
356 pub meta: Arc<RwLock<HashMap<String, DynamicManagedLazy>>>,
357}
358
359#[derive(Default, Clone)]
360pub struct JobQueue {
361 queue: Arc<RwLock<VecDeque<JobObject>>>,
362}
363
364impl JobQueue {
365 pub fn is_empty(&self) -> bool {
366 self.queue.read().map_or(true, |queue| queue.is_empty())
367 }
368
369 pub fn len(&self) -> usize {
370 self.queue.read().map_or(0, |queue| queue.len())
371 }
372
373 pub fn clear(&self) {
374 if let Ok(mut queue) = self.queue.write() {
375 queue.clear();
376 }
377 }
378
379 pub fn append(&self, other: &Self) {
380 if let Ok(mut other_queue) = other.queue.write() {
381 self.extend(other_queue.drain(..));
382 }
383 }
384
385 pub fn spawn_on<T: Send + 'static>(
386 &self,
387 location: JobLocation,
388 priority: JobPriority,
389 job: impl Future<Output = T> + Send + Sync + 'static,
390 ) -> JobHandle<T> {
391 let handle = JobHandle::<T>::default();
392 let handle2 = handle.clone();
393 let job = Job(Box::pin(async move {
394 handle2.put(job.await);
395 }));
396 self.schedule(location, priority, handle, job)
397 }
398
399 pub fn spawn_on_with_meta<T: Send + 'static>(
400 &self,
401 location: JobLocation,
402 priority: JobPriority,
403 meta: impl IntoIterator<Item = (String, DynamicManagedLazy)>,
404 job: impl Future<Output = T> + Send + Sync + 'static,
405 ) -> JobHandle<T> {
406 let handle = JobHandle::<T>::default().with_meta(meta);
407 let handle2 = handle.clone();
408 let job = Job(Box::pin(async move {
409 handle2.put(job.await);
410 }));
411 self.schedule(location, priority, handle, job)
412 }
413
414 pub fn queue_on<T: Send + 'static>(
415 &self,
416 location: JobLocation,
417 priority: JobPriority,
418 job: impl FnOnce(JobContext) -> T + Send + Sync + 'static,
419 ) -> JobHandle<T> {
420 let handle = JobHandle::<T>::default();
421 let handle2 = handle.clone();
422 let job = Job(Box::pin(async move {
423 handle2.put(job(context().await));
424 }));
425 self.schedule(location, priority, handle, job)
426 }
427
428 fn schedule<T: Send + 'static>(
429 &self,
430 location: JobLocation,
431 priority: JobPriority,
432 handle: JobHandle<T>,
433 job: Job,
434 ) -> JobHandle<T> {
435 self.enqueue(JobObject {
436 id: ID::new(),
437 job,
438 context: JobContext {
439 work_group_index: 0,
440 work_groups_count: 1,
441 },
442 location,
443 priority,
444 cancel: handle.cancel.clone(),
445 suspend: handle.suspend.clone(),
446 meta: handle.meta.clone(),
447 });
448 handle
449 }
450
451 fn enqueue(&self, object: JobObject) {
452 if let Ok(mut queue) = self.queue.write() {
453 if object.priority == JobPriority::High {
454 queue.push_back(object);
455 } else {
456 queue.push_front(object);
457 }
458 }
459 }
460
461 fn dequeue(&self, target_location: &JobLocation, ignore_location: bool) -> Option<JobObject> {
462 let mut queue = self.queue.write().ok()?;
463 let object = queue.pop_back()?;
464 if ignore_location {
465 return Some(object);
466 }
467 match (&object.location, target_location) {
468 (JobLocation::Local, JobLocation::Local)
469 | (JobLocation::UnnamedWorker, JobLocation::UnnamedWorker) => Some(object),
470 (JobLocation::NamedWorker(a), JobLocation::NamedWorker(b)) if a == b => Some(object),
471 (JobLocation::ExactThread(a), _) => {
472 if *a == std::thread::current().id() {
473 Some(object)
474 } else {
475 queue.push_front(object);
476 None
477 }
478 }
479 (JobLocation::OtherThanThread(a), _) => {
480 if *a != std::thread::current().id() {
481 Some(object)
482 } else {
483 queue.push_front(object);
484 None
485 }
486 }
487 (JobLocation::NonLocal, JobLocation::Local) => {
488 queue.push_front(object);
489 None
490 }
491 (JobLocation::Unknown, _) => Some(object),
492 _ => {
493 queue.push_front(object);
494 None
495 }
496 }
497 }
498
499 fn extend(&self, queue: impl IntoIterator<Item = JobObject>) {
500 if let Ok(mut current_queue) = self.queue.write() {
501 for object in queue {
502 if object.priority == JobPriority::High {
503 current_queue.push_back(object);
504 } else {
505 current_queue.push_front(object);
506 }
507 }
508 }
509 }
510}
511
512struct Worker {
513 location: JobLocation,
514 thread: Option<JoinHandle<()>>,
515 terminate: Arc<AtomicBool>,
516}
517
518impl Worker {
519 fn new(
520 worker_location: JobLocation,
521 queue: JobQueue,
522 global_meta: Arc<RwLock<HashMap<String, DynamicManagedLazy>>>,
523 worker_meta: Arc<RwLock<HashMap<String, DynamicManagedLazy>>>,
524 hash_tokens: Arc<Mutex<HashSet<u64>>>,
525 notify: Arc<(Mutex<bool>, Condvar)>,
526 ) -> Worker {
527 let terminate = Arc::new(AtomicBool::default());
528 let terminate2 = terminate.clone();
529 let worker_location2 = worker_location.clone();
530 let thread = spawn(move || {
531 let mut pending = vec![];
532 loop {
533 if terminate2.load(Ordering::Relaxed) {
534 return;
535 }
536 while let Some(object) = queue.dequeue(&worker_location2, false) {
537 let JobObject {
538 id,
539 job,
540 context,
541 location,
542 mut priority,
543 cancel,
544 suspend,
545 meta,
546 } = object;
547 let mut notify_workers = false;
548 let (poll_result, receiver) = if suspend.load(Ordering::Relaxed) {
549 let (_, rx) = std::sync::mpsc::channel();
550 (Some(job), rx)
551 } else {
552 let (waker, receiver) = JobsWaker::new_waker(
553 queue.clone(),
554 location.clone(),
555 context,
556 priority,
557 global_meta.clone(),
558 worker_meta.clone(),
559 meta.clone(),
560 hash_tokens.clone(),
561 cancel.clone(),
562 suspend.clone(),
563 );
564 let mut cx = Context::from_waker(&waker);
565 #[cfg(feature = "tracing")]
566 let _span = tracing::span!(
567 tracing::Level::TRACE,
568 "Job poll",
569 id = id.to_string(),
570 location = location.to_string(),
571 context = context.to_string(),
572 priority = priority.to_string(),
573 thread_id = format!("{:?}", std::thread::current().id()),
574 )
575 .entered();
576 let poll_result = job.poll(&mut cx);
577 (poll_result, receiver)
578 };
579 if let Some(job) = poll_result {
580 let mut move_to = None;
581 for command in receiver.try_iter() {
582 notify_workers = true;
583 match command {
584 JobsWakerCommand::MoveTo(location) => {
585 move_to = Some(location);
586 }
587 JobsWakerCommand::ChangePriority(new_priority) => {
588 priority = new_priority;
589 }
590 }
591 }
592 if let Some(location) = move_to {
593 pending.push(JobObject {
594 id,
595 job,
596 context,
597 location,
598 priority,
599 cancel,
600 suspend,
601 meta,
602 });
603 } else {
604 pending.push(JobObject {
605 id,
606 job,
607 context,
608 location,
609 priority,
610 cancel,
611 suspend,
612 meta,
613 });
614 }
615 }
616 if terminate2.load(Ordering::Relaxed) {
617 return;
618 }
619 if notify_workers {
620 let (lock, cvar) = &*notify;
621 if let Ok(mut running) = lock.lock() {
622 *running = true;
623 }
624 cvar.notify_all();
625 }
626 }
627 queue.extend(pending.drain(..));
628 if !queue.is_empty() {
629 continue;
630 }
631 let (lock, cvar) = &*notify;
632 let Ok(mut ready) = lock.lock() else {
633 return;
634 };
635 loop {
636 let Ok((new, _)) = cvar.wait_timeout(ready, Duration::from_millis(10)) else {
637 return;
638 };
639 ready = new;
640 if *ready {
641 break;
642 }
643 }
644 }
645 });
646 Worker {
647 location: worker_location,
648 thread: Some(thread),
649 terminate,
650 }
651 }
652}
653
654pub(crate) enum JobsWakerCommand {
655 MoveTo(JobLocation),
656 ChangePriority(JobPriority),
657}
658
659#[derive(Debug, Default, Clone, PartialEq, Eq)]
660pub enum JobLocation {
661 #[default]
662 Unknown,
663 Local,
664 NonLocal,
665 UnnamedWorker,
666 NamedWorker(String),
667 ExactThread(ThreadId),
668 OtherThanThread(ThreadId),
669}
670
671impl JobLocation {
672 pub fn named_worker(name: impl ToString) -> Self {
673 JobLocation::NamedWorker(name.to_string())
674 }
675
676 pub fn thread(thread: ThreadId) -> Self {
677 JobLocation::ExactThread(thread)
678 }
679
680 pub fn current_thread() -> Self {
681 JobLocation::ExactThread(std::thread::current().id())
682 }
683
684 pub fn other_than_current_thread() -> Self {
685 JobLocation::OtherThanThread(std::thread::current().id())
686 }
687}
688
689impl std::fmt::Display for JobLocation {
690 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
691 match self {
692 JobLocation::Unknown => write!(f, "Unknown"),
693 JobLocation::Local => write!(f, "Local"),
694 JobLocation::NonLocal => write!(f, "Non-local"),
695 JobLocation::UnnamedWorker => write!(f, "Unnamed worker"),
696 JobLocation::NamedWorker(name) => write!(f, "Named worker: {name}"),
697 JobLocation::ExactThread(id) => write!(f, "Exact thread: {id:?}"),
698 JobLocation::OtherThanThread(id) => write!(f, "Other than thread: {id:?}"),
699 }
700 }
701}
702
703#[derive(Default)]
704pub struct JobToken {
705 hash_tokens: Arc<Mutex<HashSet<u64>>>,
706 hash: u64,
707}
708
709impl Drop for JobToken {
710 fn drop(&mut self) {
711 let mut hash_tokens = self.hash_tokens.lock().unwrap();
712 hash_tokens.remove(&self.hash);
713 }
714}
715
716pub(crate) struct JobsWaker {
717 sender: Sender<JobsWakerCommand>,
718 queue: JobQueue,
719 location: JobLocation,
720 context: JobContext,
721 priority: JobPriority,
722 global_meta: Arc<RwLock<HashMap<String, DynamicManagedLazy>>>,
723 worker_meta: Arc<RwLock<HashMap<String, DynamicManagedLazy>>>,
724 local_meta: Arc<RwLock<HashMap<String, DynamicManagedLazy>>>,
725 hash_tokens: Arc<Mutex<HashSet<u64>>>,
726 cancel: Arc<AtomicBool>,
727 suspend: Arc<AtomicBool>,
728}
729
730impl JobsWaker {
731 const VTABLE: RawWakerVTable =
732 RawWakerVTable::new(Self::vtable_clone, |_| {}, |_| {}, Self::vtable_drop);
733
734 fn vtable_clone(data: *const ()) -> RawWaker {
735 let arc = unsafe { Arc::<Self>::from_raw(data as *const Self) };
736 let cloned = arc.clone();
737 std::mem::forget(arc);
738 RawWaker::new(Arc::into_raw(cloned) as *const (), &Self::VTABLE)
739 }
740
741 fn vtable_drop(data: *const ()) {
742 let _ = unsafe { Arc::from_raw(data as *const Self) };
743 }
744
745 #[allow(clippy::too_many_arguments)]
746 pub fn new_waker(
747 queue: JobQueue,
748 location: JobLocation,
749 context: JobContext,
750 priority: JobPriority,
751 global_meta: Arc<RwLock<HashMap<String, DynamicManagedLazy>>>,
752 worker_meta: Arc<RwLock<HashMap<String, DynamicManagedLazy>>>,
753 local_meta: Arc<RwLock<HashMap<String, DynamicManagedLazy>>>,
754 hash_tokens: Arc<Mutex<HashSet<u64>>>,
755 cancel: Arc<AtomicBool>,
756 suspend: Arc<AtomicBool>,
757 ) -> (Waker, Receiver<JobsWakerCommand>) {
758 let (sender, receiver) = std::sync::mpsc::channel();
759 let arc = Arc::new(Self {
760 sender,
761 queue,
762 location,
763 context,
764 priority,
765 global_meta,
766 worker_meta,
767 local_meta,
768 hash_tokens,
769 cancel,
770 suspend,
771 });
772 let raw = RawWaker::new(Arc::into_raw(arc) as *const (), &Self::VTABLE);
773 (unsafe { Waker::from_raw(raw) }, receiver)
774 }
775
776 pub fn try_cast(waker: &Waker) -> Option<&Self> {
777 if waker.vtable() == &Self::VTABLE {
778 unsafe { waker.data().cast::<Self>().as_ref() }
779 } else {
780 None
781 }
782 }
783
784 pub fn command(&self, command: JobsWakerCommand) {
785 let _ = self.sender.send(command);
786 }
787
788 pub fn enqueue(&self, object: JobObject) {
789 self.queue.enqueue(object);
790 }
791
792 pub fn queue(&self) -> JobQueue {
793 self.queue.clone()
794 }
795
796 pub fn location(&self) -> JobLocation {
797 self.location.clone()
798 }
799
800 pub fn context(&self) -> JobContext {
801 self.context
802 }
803
804 pub fn priority(&self) -> JobPriority {
805 self.priority
806 }
807
808 pub fn get_meta(&self, name: &str) -> Option<DynamicManagedLazy> {
809 self.local_meta
810 .read()
811 .ok()
812 .and_then(|meta| meta.get(name).cloned())
813 .or_else(|| {
814 self.worker_meta
815 .read()
816 .ok()
817 .and_then(|meta| meta.get(name).cloned())
818 .or_else(|| {
819 self.global_meta
820 .read()
821 .ok()
822 .and_then(|meta| meta.get(name).cloned())
823 })
824 })
825 }
826
827 pub fn local_meta(&self) -> Arc<RwLock<HashMap<String, DynamicManagedLazy>>> {
828 self.local_meta.clone()
829 }
830
831 pub fn cancel(&self) -> Arc<AtomicBool> {
832 self.cancel.clone()
833 }
834
835 pub fn suspend(&self) -> Arc<AtomicBool> {
836 self.suspend.clone()
837 }
838
839 pub fn acquire_token<T: Hash>(&self, subject: &T) -> Option<JobToken> {
840 let mut hasher = DefaultHasher::new();
841 subject.hash(&mut hasher);
842 let hash = hasher.finish();
843 let mut hash_tokens = self.hash_tokens.lock().unwrap();
844 if hash_tokens.contains(&hash) {
845 None
846 } else {
847 hash_tokens.insert(hash);
848 Some(JobToken {
849 hash_tokens: self.hash_tokens.clone(),
850 hash,
851 })
852 }
853 }
854
855 pub fn acquire_token_timeout<T: Hash>(
856 &self,
857 subject: &T,
858 timeout: Duration,
859 ) -> Option<JobToken> {
860 let mut hasher = DefaultHasher::new();
861 subject.hash(&mut hasher);
862 let hash = hasher.finish();
863 let timer = Instant::now();
864 while timer.elapsed() < timeout {
865 let mut hash_tokens = self.hash_tokens.try_lock().unwrap();
866 if hash_tokens.contains(&hash) {
867 traced_spin_loop();
868 continue;
869 } else {
870 hash_tokens.insert(hash);
871 return Some(JobToken {
872 hash_tokens: self.hash_tokens.clone(),
873 hash,
874 });
875 }
876 }
877 None
878 }
879}
880
881impl Wake for JobsWaker {
882 fn wake(self: Arc<Self>) {}
883}
884
885pub struct Jobs {
886 workers: Vec<Worker>,
887 queue: JobQueue,
888 meta: Arc<RwLock<HashMap<String, DynamicManagedLazy>>>,
889 hash_tokens: Arc<Mutex<HashSet<u64>>>,
890 notify: Arc<(Mutex<bool>, Condvar)>,
892}
893
894impl Drop for Jobs {
895 fn drop(&mut self) {
896 for worker in &self.workers {
897 worker.terminate.store(true, Ordering::Relaxed);
898 }
899 let (lock, cvar) = &*self.notify;
900 if let Ok(mut ready) = lock.lock() {
901 *ready = true;
902 };
903 cvar.notify_all();
904 for worker in &mut self.workers {
905 if let Some(thread) = worker.thread.take() {
906 let _ = thread.join();
907 }
908 }
909 }
910}
911
912impl Default for Jobs {
913 fn default() -> Self {
914 Self::new(
915 available_parallelism()
916 .ok()
917 .map(|v| v.get())
918 .unwrap_or_default(),
919 )
920 }
921}
922
923impl Jobs {
924 pub fn new(count: usize) -> Jobs {
925 let queue = JobQueue::default();
926 let notify = Arc::new((Mutex::default(), Condvar::new()));
927 let global_meta = Arc::new(RwLock::new(HashMap::default()));
928 let worker_meta = Arc::new(RwLock::new(HashMap::default()));
929 let hash_tokens = Arc::new(Mutex::new(HashSet::default()));
930 Jobs {
931 workers: (0..count)
932 .map(|_| {
933 Worker::new(
934 JobLocation::UnnamedWorker,
935 queue.clone(),
936 global_meta.clone(),
937 worker_meta.clone(),
938 hash_tokens.clone(),
939 notify.clone(),
940 )
941 })
942 .collect(),
943 queue,
944 meta: global_meta,
945 hash_tokens,
946 notify,
947 }
948 }
949
950 pub fn with_unnamed_worker(mut self) -> Self {
951 self.add_unnamed_worker();
952 self
953 }
954
955 pub fn with_named_worker(mut self, name: impl ToString) -> Self {
956 self.add_named_worker(name);
957 self
958 }
959
960 pub fn add_unnamed_worker(&mut self) {
961 self.add_unnamed_worker_with_meta([]);
962 }
963
964 pub fn add_unnamed_worker_with_meta(
965 &mut self,
966 meta: impl IntoIterator<Item = (String, DynamicManagedLazy)>,
967 ) {
968 self.workers.push(Worker::new(
969 JobLocation::UnnamedWorker,
970 self.queue.clone(),
971 self.meta.clone(),
972 Arc::new(RwLock::new(meta.into_iter().collect::<HashMap<_, _>>())),
973 self.hash_tokens.clone(),
974 self.notify.clone(),
975 ));
976 }
977
978 pub fn add_named_worker(&mut self, name: impl ToString) {
979 self.add_named_worker_with_meta(name, []);
980 }
981
982 pub fn add_named_worker_with_meta(
983 &mut self,
984 name: impl ToString,
985 meta: impl IntoIterator<Item = (String, DynamicManagedLazy)>,
986 ) {
987 self.workers.push(Worker::new(
988 JobLocation::named_worker(name),
989 self.queue.clone(),
990 self.meta.clone(),
991 Arc::new(RwLock::new(meta.into_iter().collect::<HashMap<_, _>>())),
992 self.hash_tokens.clone(),
993 self.notify.clone(),
994 ));
995 }
996
997 pub fn remove_named_worker(&mut self, name: &str) {
998 if let Some(index) = self.workers.iter().position(|worker| {
999 if let JobLocation::NamedWorker(worker_name) = &worker.location {
1000 worker_name == name
1001 } else {
1002 false
1003 }
1004 }) {
1005 let mut worker = self.workers.swap_remove(index);
1006 worker.terminate.store(true, Ordering::Relaxed);
1007 let (lock, cvar) = &*self.notify;
1008 if let Ok(mut ready) = lock.lock() {
1009 *ready = true;
1010 };
1011 cvar.notify_all();
1012 if let Some(thread) = worker.thread.take() {
1013 let _ = thread.join();
1014 }
1015 }
1016 }
1017
1018 pub fn unnamed_workers(&self) -> usize {
1019 self.workers
1020 .iter()
1021 .filter(|worker| worker.location == JobLocation::UnnamedWorker)
1022 .count()
1023 }
1024
1025 pub fn named_workers(&self) -> impl Iterator<Item = &str> {
1026 self.workers.iter().filter_map(|worker| {
1027 if let JobLocation::NamedWorker(name) = &worker.location {
1028 Some(name.as_str())
1029 } else {
1030 None
1031 }
1032 })
1033 }
1034
1035 pub fn set_meta(&self, name: impl ToString, value: DynamicManagedLazy) {
1036 let mut meta = self.meta.write().unwrap();
1037 meta.insert(name.to_string(), value);
1038 }
1039
1040 pub fn unset_meta(&self, name: &str) {
1041 let mut meta = self.meta.write().unwrap();
1042 meta.remove(name);
1043 }
1044
1045 pub fn get_meta<T>(&self, name: &str) -> Option<ManagedLazy<T>> {
1046 let meta = self.meta.read().unwrap();
1047 meta.get(name)
1048 .cloned()
1049 .and_then(|value| value.into_typed::<T>().ok())
1050 }
1051
1052 pub fn get_meta_dynamic(&self, name: &str) -> Option<DynamicManagedLazy> {
1053 let meta = self.meta.read().unwrap();
1054 meta.get(name).cloned()
1055 }
1056
1057 pub fn run_local(&self) {
1058 self.run_local_inner(Duration::MAX, []);
1059 }
1060
1061 pub fn run_local_with_meta(
1062 &self,
1063 meta: impl IntoIterator<Item = (String, DynamicManagedLazy)>,
1064 ) {
1065 self.run_local_inner(Duration::MAX, meta);
1066 }
1067
1068 pub fn run_local_timeout(&self, timeout: Duration) {
1069 self.run_local_inner(timeout, []);
1070 }
1071
1072 pub fn run_local_timeout_with_meta(
1073 &self,
1074 timeout: Duration,
1075 meta: impl IntoIterator<Item = (String, DynamicManagedLazy)>,
1076 ) {
1077 self.run_local_inner(timeout, meta);
1078 }
1079
1080 fn run_local_inner(
1081 &self,
1082 timeout: Duration,
1083 meta: impl IntoIterator<Item = (String, DynamicManagedLazy)>,
1084 ) {
1085 let timer = Instant::now();
1086 let mut pending = vec![];
1087 let worker_meta = Arc::new(RwLock::new(meta.into_iter().collect::<HashMap<_, _>>()));
1088 while let Some(object) = self
1089 .queue
1090 .dequeue(&JobLocation::Local, self.workers.is_empty())
1091 {
1092 let JobObject {
1093 id,
1094 job,
1095 context,
1096 location,
1097 mut priority,
1098 cancel,
1099 suspend,
1100 meta,
1101 } = object;
1102 let mut notify_workers = false;
1103 let (poll_result, receiver) = if suspend.load(Ordering::Relaxed) {
1104 let (_, rx) = std::sync::mpsc::channel();
1105 (Some(job), rx)
1106 } else {
1107 let (waker, receiver) = JobsWaker::new_waker(
1108 self.queue.clone(),
1109 location.clone(),
1110 context,
1111 priority,
1112 self.meta.clone(),
1113 worker_meta.clone(),
1114 meta.clone(),
1115 self.hash_tokens.clone(),
1116 cancel.clone(),
1117 suspend.clone(),
1118 );
1119 let mut cx = Context::from_waker(&waker);
1120 #[cfg(feature = "tracing")]
1121 let _span = tracing::span!(
1122 tracing::Level::TRACE,
1123 "Job poll",
1124 id = id.to_string(),
1125 location = location.to_string(),
1126 context = context.to_string(),
1127 priority = priority.to_string(),
1128 thread_id = format!("{:?}", std::thread::current().id()),
1129 )
1130 .entered();
1131 let poll_result = job.poll(&mut cx);
1132 (poll_result, receiver)
1133 };
1134 if let Some(job) = poll_result {
1135 let mut move_to = None;
1136 for command in receiver.try_iter() {
1137 notify_workers = true;
1138 match command {
1139 JobsWakerCommand::MoveTo(location) => move_to = Some(location),
1140 JobsWakerCommand::ChangePriority(new_priority) => {
1141 priority = new_priority;
1142 }
1143 }
1144 }
1145 if let Some(location) = move_to {
1146 pending.push(JobObject {
1147 id,
1148 job,
1149 context,
1150 location,
1151 priority,
1152 cancel,
1153 suspend,
1154 meta,
1155 });
1156 } else {
1157 pending.push(JobObject {
1158 id,
1159 job,
1160 context,
1161 location,
1162 priority,
1163 cancel,
1164 suspend,
1165 meta,
1166 });
1167 }
1168 }
1169 if notify_workers {
1170 let (lock, cvar) = &*self.notify;
1171 if let Ok(mut running) = lock.lock() {
1172 *running = true;
1173 }
1174 cvar.notify_all();
1175 }
1176 if timer.elapsed() >= timeout {
1177 break;
1178 }
1179 }
1180 self.queue.extend(pending);
1181 }
1182
1183 pub fn submit_queue(&self, queue: &JobQueue) {
1184 self.queue.append(queue);
1185 let (lock, cvar) = &*self.notify;
1186 if let Ok(mut running) = lock.lock() {
1187 *running = true;
1188 }
1189 cvar.notify_all();
1190 }
1191
1192 pub fn run_queue(&self, queue: &JobQueue) {
1193 self.run_queue_inner(queue, Duration::MAX, []);
1194 }
1195
1196 pub fn run_queue_with_meta(
1197 &self,
1198 queue: &JobQueue,
1199 meta: impl IntoIterator<Item = (String, DynamicManagedLazy)>,
1200 ) {
1201 self.run_queue_inner(queue, Duration::MAX, meta);
1202 }
1203
1204 pub fn run_queue_timeout(&self, queue: &JobQueue, timeout: Duration) {
1205 self.run_queue_inner(queue, timeout, []);
1206 }
1207
1208 pub fn run_queue_timeout_with_meta(
1209 &self,
1210 queue: &JobQueue,
1211 timeout: Duration,
1212 meta: impl IntoIterator<Item = (String, DynamicManagedLazy)>,
1213 ) {
1214 self.run_queue_inner(queue, timeout, meta);
1215 }
1216
1217 fn run_queue_inner(
1218 &self,
1219 queue: &JobQueue,
1220 timeout: Duration,
1221 meta: impl IntoIterator<Item = (String, DynamicManagedLazy)>,
1222 ) {
1223 let timer = Instant::now();
1224 let mut pending = vec![];
1225 let worker_meta = Arc::new(RwLock::new(meta.into_iter().collect::<HashMap<_, _>>()));
1226 while let Some(object) = queue.dequeue(&JobLocation::Unknown, true) {
1227 let JobObject {
1228 id,
1229 job,
1230 context,
1231 location,
1232 mut priority,
1233 cancel,
1234 suspend,
1235 meta,
1236 } = object;
1237 let mut notify_workers = false;
1238 let (poll_result, receiver) = if suspend.load(Ordering::Relaxed) {
1239 let (_, rx) = std::sync::mpsc::channel();
1240 (Some(job), rx)
1241 } else {
1242 let (waker, receiver) = JobsWaker::new_waker(
1243 queue.clone(),
1244 location.clone(),
1245 context,
1246 priority,
1247 self.meta.clone(),
1248 worker_meta.clone(),
1249 meta.clone(),
1250 self.hash_tokens.clone(),
1251 cancel.clone(),
1252 suspend.clone(),
1253 );
1254 let mut cx = Context::from_waker(&waker);
1255 #[cfg(feature = "tracing")]
1256 let _span = tracing::span!(
1257 tracing::Level::TRACE,
1258 "Job poll",
1259 id = id.to_string(),
1260 location = location.to_string(),
1261 context = context.to_string(),
1262 priority = priority.to_string(),
1263 thread_id = format!("{:?}", std::thread::current().id()),
1264 )
1265 .entered();
1266 let poll_result = job.poll(&mut cx);
1267 (poll_result, receiver)
1268 };
1269 if let Some(job) = poll_result {
1270 let mut move_to = None;
1271 for command in receiver.try_iter() {
1272 notify_workers = true;
1273 match command {
1274 JobsWakerCommand::MoveTo(location) => move_to = Some(location),
1275 JobsWakerCommand::ChangePriority(new_priority) => {
1276 priority = new_priority;
1277 }
1278 }
1279 }
1280 if let Some(location) = move_to {
1281 pending.push(JobObject {
1282 id,
1283 job,
1284 context,
1285 location,
1286 priority,
1287 cancel,
1288 suspend,
1289 meta,
1290 });
1291 } else {
1292 pending.push(JobObject {
1293 id,
1294 job,
1295 context,
1296 location,
1297 priority,
1298 cancel,
1299 suspend,
1300 meta,
1301 });
1302 }
1303 }
1304 if notify_workers {
1305 let (lock, cvar) = &*self.notify;
1306 if let Ok(mut running) = lock.lock() {
1307 *running = true;
1308 }
1309 cvar.notify_all();
1310 }
1311 if timer.elapsed() >= timeout {
1312 break;
1313 }
1314 }
1315 self.queue.extend(pending);
1316 }
1317
1318 #[inline]
1319 pub fn is_empty(&self) -> bool {
1320 self.workers.is_empty()
1321 }
1322
1323 #[inline]
1324 pub fn len(&self) -> usize {
1325 self.workers.len()
1326 }
1327
1328 pub fn spawn_on<T: Send + 'static>(
1329 &self,
1330 location: JobLocation,
1331 priority: JobPriority,
1332 job: impl Future<Output = T> + Send + Sync + 'static,
1333 ) -> Result<JobHandle<T>, Box<dyn Error>> {
1334 let handle = self.queue.spawn_on(location, priority, job);
1335 let (lock, cvar) = &*self.notify;
1336 let mut running = lock.lock().map_err(|error| format!("{error}"))?;
1337 *running = true;
1338 cvar.notify_all();
1339 Ok(handle)
1340 }
1341
1342 pub fn spawn_on_with_meta<T: Send + 'static>(
1343 &self,
1344 location: JobLocation,
1345 priority: JobPriority,
1346 meta: impl IntoIterator<Item = (String, DynamicManagedLazy)>,
1347 job: impl Future<Output = T> + Send + Sync + 'static,
1348 ) -> Result<JobHandle<T>, Box<dyn Error>> {
1349 let handle = self.queue.spawn_on_with_meta(location, priority, meta, job);
1350 let (lock, cvar) = &*self.notify;
1351 let mut running = lock.lock().map_err(|error| format!("{error}"))?;
1352 *running = true;
1353 cvar.notify_all();
1354 Ok(handle)
1355 }
1356
1357 pub fn queue_on<T: Send + 'static>(
1358 &self,
1359 location: JobLocation,
1360 priority: JobPriority,
1361 job: impl FnOnce(JobContext) -> T + Send + Sync + 'static,
1362 ) -> Result<JobHandle<T>, Box<dyn Error>> {
1363 let handle = self.queue.queue_on(location, priority, job);
1364 let (lock, cvar) = &*self.notify;
1365 let mut running = lock.lock().map_err(|error| format!("{error}"))?;
1366 *running = true;
1367 cvar.notify_all();
1368 Ok(handle)
1369 }
1370
1371 pub fn broadcast<T: Send + 'static>(
1372 &self,
1373 job: impl Fn(JobContext) -> T + Send + Sync + 'static,
1374 ) -> Result<AllJobsHandle<T>, Box<dyn Error>> {
1375 self.broadcast_n(self.workers.len(), job)
1376 }
1377
1378 pub fn broadcast_n<T: Send + 'static>(
1379 &self,
1380 work_groups: usize,
1381 job: impl Fn(JobContext) -> T + Send + Sync + 'static,
1382 ) -> Result<AllJobsHandle<T>, Box<dyn Error>> {
1383 if self.workers.is_empty() {
1384 return Ok(AllJobsHandle::new(job(JobContext {
1385 work_group_index: 0,
1386 work_groups_count: 1,
1387 })));
1388 }
1389 let job = Arc::new(job);
1390 let handle = AllJobsHandle {
1391 jobs: (0..work_groups)
1392 .map(|group| {
1393 let job = Arc::clone(&job);
1394 let handle = JobHandle::<T>::default();
1395 let handle2 = handle.clone();
1396 self.queue.enqueue(JobObject {
1397 id: ID::new(),
1398 job: Job(Box::pin(async move {
1399 handle2.put(job(context().await));
1400 })),
1401 context: JobContext {
1402 work_group_index: group,
1403 work_groups_count: work_groups,
1404 },
1405 location: JobLocation::other_than_current_thread(),
1406 priority: JobPriority::High,
1407 cancel: handle.cancel.clone(),
1408 suspend: handle.suspend.clone(),
1409 meta: handle.meta.clone(),
1410 });
1411 handle
1412 })
1413 .collect::<Vec<_>>(),
1414 };
1415 let (lock, cvar) = &*self.notify;
1416 let mut running = lock.lock().map_err(|error| format!("{error}"))?;
1417 *running = true;
1418 cvar.notify_all();
1419 Ok(handle)
1420 }
1421}
1422
1423pub struct ScopedJobs<'env, T: Send + 'static> {
1424 jobs: &'env Jobs,
1425 handles: AllJobsHandle<T>,
1426}
1427
1428impl<T: Send + 'static> Drop for ScopedJobs<'_, T> {
1429 fn drop(&mut self) {
1430 self.execute_inner();
1431 }
1432}
1433
1434impl<'env, T: Send + 'static> ScopedJobs<'env, T> {
1435 pub fn new(jobs: &'env Jobs) -> Self {
1436 Self {
1437 jobs,
1438 handles: Default::default(),
1439 }
1440 }
1441
1442 pub fn spawn_on(
1443 &mut self,
1444 location: JobLocation,
1445 priority: JobPriority,
1446 job: impl Future<Output = T> + Send + Sync + 'env,
1447 ) -> Result<(), Box<dyn Error>> {
1448 let job = unsafe {
1449 std::mem::transmute::<
1450 Pin<Box<dyn Future<Output = T> + Send + Sync + 'env>>,
1451 Pin<Box<dyn Future<Output = T> + Send + Sync + 'static>>,
1452 >(Box::pin(job))
1453 };
1454 let handle = self.jobs.spawn_on(location, priority, job)?;
1455 self.handles.add(handle);
1456 Ok(())
1457 }
1458
1459 pub fn queue_on(
1460 &mut self,
1461 location: JobLocation,
1462 priority: JobPriority,
1463 job: impl FnOnce(JobContext) -> T + Send + Sync + 'env,
1464 ) -> Result<(), Box<dyn Error>> {
1465 let job = unsafe {
1466 std::mem::transmute::<
1467 Box<dyn FnOnce(JobContext) -> T + Send + Sync + 'env>,
1468 Box<dyn FnOnce(JobContext) -> T + Send + Sync + 'static>,
1469 >(Box::new(job))
1470 };
1471 self.handles
1472 .add(self.jobs.queue_on(location, priority, job)?);
1473 Ok(())
1474 }
1475
1476 pub fn broadcast(
1477 &mut self,
1478 job: impl Fn(JobContext) -> T + Send + Sync + 'env,
1479 ) -> Result<(), Box<dyn Error>> {
1480 let job = unsafe {
1481 std::mem::transmute::<
1482 Box<dyn Fn(JobContext) -> T + Send + Sync + 'env>,
1483 Box<dyn Fn(JobContext) -> T + Send + Sync + 'static>,
1484 >(Box::new(job))
1485 };
1486 self.handles.extend(self.jobs.broadcast(job)?.into_inner());
1487 Ok(())
1488 }
1489
1490 pub fn broadcast_n(
1491 &mut self,
1492 work_groups: usize,
1493 job: impl Fn(JobContext) -> T + Send + Sync + 'env,
1494 ) -> Result<(), Box<dyn Error>> {
1495 let job = unsafe {
1496 std::mem::transmute::<
1497 Box<dyn Fn(JobContext) -> T + Send + Sync + 'env>,
1498 Box<dyn Fn(JobContext) -> T + Send + Sync + 'static>,
1499 >(Box::new(job))
1500 };
1501 self.handles
1502 .extend(self.jobs.broadcast_n(work_groups, job)?.into_inner());
1503 Ok(())
1504 }
1505
1506 pub fn execute(mut self) -> Vec<T> {
1507 self.execute_inner()
1508 }
1509
1510 fn execute_inner(&mut self) -> Vec<T> {
1511 std::mem::take(&mut self.handles).wait().unwrap_or_default()
1512 }
1513}
1514
1515#[cfg(test)]
1516mod tests {
1517 use super::*;
1518 use crate::coroutine::{
1519 acquire_token, block_on, location, meta, move_to, on_exit, queue_on, spawn_on, suspend,
1520 wait_polls, wait_time, with_all, with_any, yield_now,
1521 };
1522 use std::sync::atomic::AtomicUsize;
1523
1524 #[test]
1525 fn test_jobs() {
1526 fn is_async<T: Send + Sync>() {}
1527
1528 is_async::<Jobs>();
1529
1530 let jobs = Jobs::default();
1531 let data = (0..100).collect::<Vec<_>>();
1532 let data2 = data.clone();
1533
1534 let job = jobs
1535 .queue_on(JobLocation::Unknown, JobPriority::Normal, move |_| {
1536 data.into_iter().sum::<usize>()
1537 })
1538 .unwrap();
1539
1540 let result = job.wait().unwrap();
1541 assert_eq!(result, 4950);
1542
1543 let job = jobs
1544 .queue_on(JobLocation::Local, JobPriority::Normal, move |_| {
1545 data2.into_iter().sum::<usize>()
1546 })
1547 .unwrap();
1548
1549 while !job.is_done() {
1550 jobs.run_local();
1551 }
1552 let result = job.try_take().unwrap().unwrap();
1553 assert_eq!(result, 4950);
1554
1555 let job = jobs.broadcast(move |ctx| ctx.work_group_index).unwrap();
1556 let result = job.wait().unwrap().into_iter().sum::<usize>();
1557 assert_eq!(result, (0..jobs.workers.len()).sum());
1558
1559 let job = jobs
1560 .broadcast_n(10, move |ctx| ctx.work_group_index)
1561 .unwrap();
1562 let result = job.wait().unwrap().into_iter().sum::<usize>();
1563 assert_eq!(result, {
1564 let mut accum = 0;
1565 for index in 0..10 {
1566 accum += index;
1567 }
1568 accum
1569 });
1570 }
1571
1572 #[test]
1573 fn test_local_thread_only_jobs() {
1574 let jobs = Jobs::new(0);
1575 let data = (0..100).collect::<Vec<_>>();
1576 let data2 = data.clone();
1577 let data3 = data.clone();
1578 let data4 = data.clone();
1579 let data5 = data.clone();
1580 let data6 = data.clone();
1581
1582 let job = jobs
1583 .queue_on(JobLocation::Unknown, JobPriority::Normal, move |_| {
1584 data.into_iter().sum::<usize>()
1585 })
1586 .unwrap();
1587
1588 while !job.is_done() {
1589 jobs.run_local();
1590 }
1591 let result = job.try_take().unwrap().unwrap();
1592 assert_eq!(result, 4950);
1593
1594 let job = jobs
1595 .queue_on(JobLocation::Local, JobPriority::Normal, move |_| {
1596 data2.into_iter().sum::<usize>()
1597 })
1598 .unwrap();
1599
1600 while !job.is_done() {
1601 jobs.run_local();
1602 }
1603 let result = job.try_take().unwrap().unwrap();
1604 assert_eq!(result, 4950);
1605
1606 let job = jobs
1607 .queue_on(JobLocation::UnnamedWorker, JobPriority::Normal, move |_| {
1608 data3.into_iter().sum::<usize>()
1609 })
1610 .unwrap();
1611
1612 while !job.is_done() {
1613 jobs.run_local();
1614 }
1615 let result = job.try_take().unwrap().unwrap();
1616 assert_eq!(result, 4950);
1617
1618 let job = jobs
1619 .queue_on(
1620 JobLocation::named_worker("temp"),
1621 JobPriority::Normal,
1622 move |_| data4.into_iter().sum::<usize>(),
1623 )
1624 .unwrap();
1625
1626 while !job.is_done() {
1627 jobs.run_local();
1628 }
1629 let result = job.try_take().unwrap().unwrap();
1630 assert_eq!(result, 4950);
1631
1632 let job = jobs
1633 .queue_on(
1634 JobLocation::current_thread(),
1635 JobPriority::Normal,
1636 move |_| data5.into_iter().sum::<usize>(),
1637 )
1638 .unwrap();
1639
1640 while !job.is_done() {
1641 jobs.run_local();
1642 }
1643 let result = job.try_take().unwrap().unwrap();
1644 assert_eq!(result, 4950);
1645
1646 let job = jobs
1647 .queue_on(
1648 JobLocation::other_than_current_thread(),
1649 JobPriority::Normal,
1650 move |_| data6.into_iter().sum::<usize>(),
1651 )
1652 .unwrap();
1653
1654 while !job.is_done() {
1655 jobs.run_local();
1656 }
1657 let result = job.try_take().unwrap().unwrap();
1658 assert_eq!(result, 4950);
1659
1660 let job = jobs.broadcast(move |_| 1).unwrap();
1661
1662 while !job.is_done() {
1663 jobs.run_local();
1664 }
1665 let result = job.wait().unwrap().into_iter().sum::<usize>();
1666 assert_eq!(result, 1);
1667
1668 let job = jobs.broadcast_n(10, move |_| 1).unwrap();
1669
1670 while !job.is_done() {
1671 jobs.run_local();
1672 }
1673 let result = job.wait().unwrap().into_iter().sum::<usize>();
1674 assert_eq!(result, 1);
1675 }
1676
1677 #[test]
1678 fn test_queue_jobs() {
1679 let jobs = Jobs::new(0);
1680 let queue = JobQueue::default();
1681 let data = (0..100).collect::<Vec<_>>();
1682
1683 let job = queue.queue_on(JobLocation::Unknown, JobPriority::Normal, move |_| {
1684 data.into_iter().sum::<usize>()
1685 });
1686
1687 while !job.is_done() {
1688 jobs.run_queue(&queue);
1689 }
1690 let result = job.try_take().unwrap().unwrap();
1691 assert_eq!(result, 4950);
1692 }
1693
1694 #[test]
1695 fn test_scoped_jobs() {
1696 let jobs = Jobs::default();
1697 let mut data = (0..100).collect::<Vec<_>>();
1698
1699 let mut scope = ScopedJobs::new(&jobs);
1700 scope
1701 .queue_on(JobLocation::Unknown, JobPriority::Normal, |_| {
1702 for value in &mut data {
1703 *value *= 2;
1704 }
1705 data.iter().copied().sum::<usize>()
1706 })
1707 .unwrap();
1708
1709 let result = scope.execute().into_iter().sum::<usize>();
1710 assert_eq!(result, 9900);
1711 }
1712
1713 #[test]
1714 fn test_futures_spawn() {
1715 let jobs = Jobs::default();
1716 let data = (0..100).collect::<Vec<_>>();
1717 let data2 = data.clone();
1718
1719 let job = jobs
1720 .spawn_on(JobLocation::Unknown, JobPriority::Normal, async move {
1721 let mut result = 0;
1722 for value in data {
1723 result += value;
1724 yield_now().await;
1725 }
1726 result
1727 })
1728 .unwrap();
1729
1730 let result = block_on(job).unwrap();
1731 assert_eq!(result, 4950);
1732
1733 let job = jobs
1734 .spawn_on(JobLocation::Local, JobPriority::Normal, async move {
1735 let mut result = 0;
1736 for value in data2 {
1737 result += value;
1738 yield_now().await;
1739 }
1740 result
1741 })
1742 .unwrap();
1743
1744 while !job.is_done() {
1745 jobs.run_local();
1746 }
1747 let result = job.try_take().unwrap().unwrap();
1748 assert_eq!(result, 4950);
1749
1750 let job = jobs
1751 .spawn_on(JobLocation::Unknown, JobPriority::Normal, async {
1752 let result = Arc::new(AtomicUsize::new(0));
1753 let result1 = result.clone();
1754 let result2 = result.clone();
1755 with_all(vec![
1756 Box::pin(async move {
1757 wait_time(Duration::from_millis(10)).await;
1758 result1.fetch_add(1, Ordering::SeqCst);
1759 }),
1760 Box::pin(async move {
1761 wait_time(Duration::from_millis(5)).await;
1762 result2.fetch_add(2, Ordering::SeqCst);
1763 }),
1764 ])
1765 .await;
1766 result.load(Ordering::SeqCst)
1767 })
1768 .unwrap();
1769 let result = block_on(job).unwrap();
1770 assert_eq!(result, 3);
1771
1772 let job = jobs
1773 .spawn_on(JobLocation::Unknown, JobPriority::Normal, async {
1774 let result = Arc::new(AtomicUsize::new(0));
1775 let result1 = result.clone();
1776 let result2 = result.clone();
1777 with_any(vec![
1778 Box::pin(async move {
1779 wait_polls(10).await;
1780 result1.store(1, Ordering::SeqCst);
1781 }),
1782 Box::pin(async move {
1783 wait_polls(5).await;
1784 result2.store(2, Ordering::SeqCst);
1785 }),
1786 ])
1787 .await;
1788 result.load(Ordering::SeqCst)
1789 })
1790 .unwrap();
1791 let result = block_on(job).unwrap();
1792 assert!(result > 0);
1793 }
1794
1795 #[test]
1796 fn test_futures_move() {
1797 let jobs = Jobs::new(1).with_named_worker("foo");
1798
1799 let job = jobs
1800 .spawn_on(JobLocation::Local, JobPriority::Normal, async {
1801 yield_now().await;
1802 println!("A: {:?}", location().await);
1804 move_to(JobLocation::Unknown).await;
1805 println!("B: {:?}", location().await);
1807 move_to(JobLocation::named_worker("foo")).await;
1808 println!("C: {:?}", location().await);
1810 move_to(JobLocation::Local).await;
1811 println!("D: {:?}", location().await);
1813 42
1814 })
1815 .unwrap();
1816
1817 while !job.is_done() {
1818 jobs.run_local();
1819 }
1820 let result = job.try_take().unwrap().unwrap();
1821 assert_eq!(result, 42);
1822 }
1823
1824 #[test]
1825 fn test_futures_schedule() {
1826 let jobs = Jobs::new(1).with_named_worker("foo");
1827
1828 let job = jobs
1829 .spawn_on(JobLocation::Local, JobPriority::Normal, async {
1830 spawn_on(JobLocation::Local, JobPriority::Normal, async {
1831 println!("A: {:?}", location().await);
1832 })
1833 .await;
1834 spawn_on(JobLocation::Unknown, JobPriority::Normal, async {
1835 println!("B: {:?}", location().await);
1836 })
1837 .await;
1838 spawn_on(
1839 JobLocation::named_worker("foo"),
1840 JobPriority::Normal,
1841 async {
1842 println!("C: {:?}", location().await);
1843 },
1844 )
1845 .await;
1846 queue_on(JobLocation::Local, JobPriority::Normal, |_| {
1847 println!("D: Local closure");
1848 })
1849 .await;
1850 queue_on(JobLocation::Unknown, JobPriority::Normal, |_| {
1851 println!("E: Unnamed worker closure");
1852 })
1853 .await;
1854 queue_on(
1855 JobLocation::named_worker("foo"),
1856 JobPriority::Normal,
1857 |_| {
1858 println!("F: Named worker closure");
1859 },
1860 )
1861 .await;
1862 42
1863 })
1864 .unwrap();
1865
1866 while !job.is_done() {
1867 jobs.run_local();
1868 }
1869 let result = job.try_take().unwrap().unwrap();
1870 assert_eq!(result, 42);
1871 }
1872
1873 #[test]
1874 fn test_futures_meta() {
1875 let jobs = Jobs::default();
1876 let mut value = 42usize;
1877 let (value_lazy, _value_lifetime) = DynamicManagedLazy::make(&mut value);
1878 jobs.set_meta("value", value_lazy);
1879
1880 let job = jobs
1881 .spawn_on(JobLocation::Unknown, JobPriority::Normal, async {
1882 let value = meta::<usize>("value").await.unwrap();
1883 *value.read().unwrap()
1884 })
1885 .unwrap();
1886
1887 let result = block_on(job).unwrap();
1888 assert_eq!(result, 42);
1889
1890 let mut flag = true;
1891 let (flag_lazy, _flag_lifetime) = DynamicManagedLazy::make(&mut flag);
1892 let job = jobs
1893 .spawn_on_with_meta(
1894 JobLocation::Unknown,
1895 JobPriority::Normal,
1896 [("flag".to_owned(), flag_lazy)],
1897 async {
1898 let flag = meta::<bool>("flag").await.unwrap();
1899 *flag.read().unwrap()
1900 },
1901 )
1902 .unwrap();
1903
1904 let result = block_on(job).unwrap();
1905 assert!(result);
1906
1907 let mut flag = true;
1908 let (flag_lazy, _flag_lifetime) = DynamicManagedLazy::make(&mut flag);
1909 let job = jobs
1910 .spawn_on(JobLocation::Local, JobPriority::Normal, async {
1911 let flag = meta::<bool>("flag").await.unwrap();
1912 *flag.read().unwrap()
1913 })
1914 .unwrap();
1915
1916 while !job.is_done() {
1917 jobs.run_local_with_meta([("flag".to_owned(), flag_lazy.clone())]);
1918 }
1919 let result = job.try_take().unwrap().unwrap();
1920 assert!(result);
1921 }
1922
1923 #[test]
1924 fn test_futures_acquire_token() {
1925 let jobs = Jobs::new(3);
1926
1927 let a = jobs
1928 .spawn_on(JobLocation::Unknown, JobPriority::Normal, async {
1929 let _token = acquire_token(&"foo").await;
1930 std::thread::sleep(Duration::from_millis(50));
1931 for i in 0..10 {
1932 println!("{i}");
1933 std::thread::sleep(Duration::from_millis(10));
1934 }
1935 })
1936 .unwrap();
1937 let b = jobs
1938 .spawn_on(JobLocation::Unknown, JobPriority::Normal, async {
1939 let _token = acquire_token(&"foo").await;
1940 std::thread::sleep(Duration::from_millis(50));
1941 for i in 10..20 {
1942 println!("{i}");
1943 std::thread::sleep(Duration::from_millis(10));
1944 }
1945 })
1946 .unwrap();
1947 block_on(AllJobsHandle::many([a, b])).unwrap();
1948
1949 #[cfg(not(miri))]
1950 {
1951 use std::path::Path;
1952
1953 async fn load_file(path: impl AsRef<Path>) -> Option<String> {
1954 let path = path.as_ref();
1955 let _token = acquire_token(&path).await;
1956 std::fs::read_to_string(path).ok()
1957 }
1958
1959 async fn save_file(path: impl AsRef<Path>, content: &str) {
1960 let path = path.as_ref();
1961 let _token = acquire_token(&path).await;
1962 let _ = std::fs::write(path, content);
1963 }
1964
1965 const PATH: &str = "./resources/test.txt";
1966 let content = "Hello, Jobs!".repeat(1000);
1967
1968 let a = jobs
1969 .spawn_on(JobLocation::Unknown, JobPriority::Normal, async move {
1970 std::thread::sleep(Duration::from_millis(50));
1971 save_file(PATH, &content).await;
1972 })
1973 .unwrap();
1974 let b = jobs
1975 .spawn_on(JobLocation::Unknown, JobPriority::Normal, async {
1976 std::thread::sleep(Duration::from_millis(50));
1977 let _ = load_file(PATH).await;
1978 })
1979 .unwrap();
1980 block_on(AllJobsHandle::many([a, b])).unwrap();
1981 }
1982 }
1983
1984 #[test]
1985 fn test_futures_on_exit() {
1986 let jobs = Jobs::default();
1987
1988 let state = Arc::new(AtomicBool::new(false));
1989 let state1 = state.clone();
1990 let job = jobs
1991 .spawn_on(JobLocation::Unknown, JobPriority::Normal, async {
1992 let _exit = on_exit(async move {
1993 state1.store(true, Ordering::SeqCst);
1994 })
1995 .await;
1996 42
1997 })
1998 .unwrap();
1999
2000 let result = block_on(job).unwrap();
2001 assert_eq!(result, 42);
2002 std::thread::sleep(Duration::from_millis(100));
2003 assert!(state.load(Ordering::SeqCst));
2004
2005 let state = Arc::new(AtomicBool::new(false));
2006 let state1 = state.clone();
2007 let job = jobs
2008 .spawn_on(JobLocation::Unknown, JobPriority::Normal, async {
2009 let exit = on_exit(async move {
2010 state1.store(true, Ordering::SeqCst);
2011 })
2012 .await;
2013 exit.invalidate();
2014 42
2015 })
2016 .unwrap();
2017
2018 let result = block_on(job).unwrap();
2019 assert_eq!(result, 42);
2020 std::thread::sleep(Duration::from_millis(100));
2021 assert!(!state.load(Ordering::SeqCst));
2022
2023 let state = Arc::new(AtomicBool::new(false));
2024 let state1 = state.clone();
2025 let job = jobs
2026 .spawn_on(JobLocation::Local, JobPriority::Normal, async {
2027 let exit = on_exit(async move {
2028 state1.store(true, Ordering::SeqCst);
2029 })
2030 .await;
2031 exit.invalidate();
2034 42
2035 })
2036 .unwrap();
2037
2038 job.cancel();
2039 while !job.is_done() {
2040 jobs.run_local();
2041 }
2042 assert_eq!(job.try_take(), Some(None));
2043 assert!(!state.load(Ordering::SeqCst));
2044 }
2045
2046 #[test]
2047 fn test_futures_suspend() {
2048 let jobs = Jobs::default();
2049
2050 let job = jobs
2051 .spawn_on(JobLocation::Local, JobPriority::Normal, async {
2052 suspend().await;
2053 42
2054 })
2055 .unwrap();
2056
2057 assert!(!job.is_done());
2058 for _ in 0..10 {
2059 jobs.run_local();
2060 }
2061 assert!(!job.is_done());
2062
2063 job.resume();
2064 while !job.is_done() {
2065 jobs.run_local();
2066 }
2067 let result = job.try_take().unwrap().unwrap();
2068 assert_eq!(result, 42);
2069 }
2070}