spawn_groups/shared/
runtime.rs

1use crate::{async_stream::AsyncStream, shared::priority::Priority, threadpool_impl::ThreadPool};
2use std::{
3    future::Future,
4    sync::{
5        atomic::{AtomicUsize, Ordering},
6        Arc,
7    },
8};
9
10use super::priority_task::PrioritizedTask;
11
12pub(crate) struct RuntimeEngine<ItemType> {
13    stream: AsyncStream<ItemType>,
14    pool: ThreadPool,
15    task_count: Arc<AtomicUsize>,
16}
17
18impl<ItemType> RuntimeEngine<ItemType> {
19    pub(crate) fn new(count: usize) -> Self {
20        Self {
21            pool: ThreadPool::new(count),
22            stream: AsyncStream::new(),
23            task_count: Arc::new(AtomicUsize::default()),
24        }
25    }
26}
27
28impl<ItemType> Default for RuntimeEngine<ItemType> {
29    fn default() -> Self {
30        Self {
31            pool: ThreadPool::default(),
32            stream: AsyncStream::new(),
33            task_count: Arc::new(AtomicUsize::default()),
34        }
35    }
36}
37
38impl<ItemType> RuntimeEngine<ItemType> {
39    pub(crate) fn cancel(&self) {
40        self.pool.clear();
41        self.pool.wait_for_all();
42        self.task_count.store(0, Ordering::Relaxed);
43    }
44}
45
46impl<ItemType> RuntimeEngine<ItemType> {
47    pub(crate) fn stream(&self) -> AsyncStream<ItemType> {
48        self.stream.clone()
49    }
50
51    pub(crate) fn end(&self) {
52        self.pool.clear();
53        self.pool.wait_for_all();
54        self.task_count.store(0, Ordering::Relaxed);
55        self.pool.end()
56    }
57}
58
59impl<ValueType> RuntimeEngine<ValueType> {
60    pub(crate) fn wait_for_all_tasks(&self) {
61        self.poll();
62        self.task_count.store(0, Ordering::Relaxed);
63    }
64}
65
66impl<ItemType> RuntimeEngine<ItemType> {
67    pub(crate) fn write_task(&mut self, priority: Priority, task: impl Future<Output = ItemType>) {
68        let (stream, task_counter) = (self.stream(), self.task_count.clone());
69        stream.increment();
70        task_counter.fetch_add(1, Ordering::Relaxed);
71        self.pool
72            .submit(PrioritizedTask::new(priority.into(), async move {
73                let task_result = task.await;
74                stream.insert_item(task_result).await;
75                task_counter.fetch_sub(1, Ordering::Relaxed);
76            }));
77    }
78}
79
80impl<ItemType> RuntimeEngine<ItemType> {
81    fn poll(&self) {
82        self.pool.wait_for_all();
83    }
84
85    pub(crate) fn task_count(&self) -> usize {
86        self.task_count.load(Ordering::Acquire)
87    }
88}