spawn_groups/shared/
runtime.rs1use crate::{async_stream::AsyncStream, shared::priority::Priority, threadpool_impl::ThreadPool};
2use std::{
3 future::Future,
4 sync::{
5 atomic::{AtomicUsize, Ordering},
6 Arc,
7 },
8};
9
10use super::priority_task::PrioritizedTask;
11
12pub(crate) struct RuntimeEngine<ItemType> {
13 stream: AsyncStream<ItemType>,
14 pool: ThreadPool,
15 task_count: Arc<AtomicUsize>,
16}
17
18impl<ItemType> RuntimeEngine<ItemType> {
19 pub(crate) fn new(count: usize) -> Self {
20 Self {
21 pool: ThreadPool::new(count),
22 stream: AsyncStream::new(),
23 task_count: Arc::new(AtomicUsize::default()),
24 }
25 }
26}
27
28impl<ItemType> Default for RuntimeEngine<ItemType> {
29 fn default() -> Self {
30 Self {
31 pool: ThreadPool::default(),
32 stream: AsyncStream::new(),
33 task_count: Arc::new(AtomicUsize::default()),
34 }
35 }
36}
37
38impl<ItemType> RuntimeEngine<ItemType> {
39 pub(crate) fn cancel(&self) {
40 self.pool.clear();
41 self.pool.wait_for_all();
42 self.task_count.store(0, Ordering::Relaxed);
43 }
44}
45
46impl<ItemType> RuntimeEngine<ItemType> {
47 pub(crate) fn stream(&self) -> AsyncStream<ItemType> {
48 self.stream.clone()
49 }
50
51 pub(crate) fn end(&self) {
52 self.pool.clear();
53 self.pool.wait_for_all();
54 self.task_count.store(0, Ordering::Relaxed);
55 self.pool.end()
56 }
57}
58
59impl<ValueType> RuntimeEngine<ValueType> {
60 pub(crate) fn wait_for_all_tasks(&self) {
61 self.poll();
62 self.task_count.store(0, Ordering::Relaxed);
63 }
64}
65
66impl<ItemType> RuntimeEngine<ItemType> {
67 pub(crate) fn write_task(&mut self, priority: Priority, task: impl Future<Output = ItemType>) {
68 let (stream, task_counter) = (self.stream(), self.task_count.clone());
69 stream.increment();
70 task_counter.fetch_add(1, Ordering::Relaxed);
71 self.pool
72 .submit(PrioritizedTask::new(priority.into(), async move {
73 let task_result = task.await;
74 stream.insert_item(task_result).await;
75 task_counter.fetch_sub(1, Ordering::Relaxed);
76 }));
77 }
78}
79
80impl<ItemType> RuntimeEngine<ItemType> {
81 fn poll(&self) {
82 self.pool.wait_for_all();
83 }
84
85 pub(crate) fn task_count(&self) -> usize {
86 self.task_count.load(Ordering::Acquire)
87 }
88}