use async_executor::{Executor, Task};
use event_listener::{Event, EventListener};
use futures_lite::pin;
use std::{
future::Future,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
time::Duration,
};
struct LimitedExecutor {
executor: Executor<'static>,
shared: Arc<SharedState>,
}
struct SharedState {
max: usize,
active: AtomicUsize,
slot_available: Event,
}
impl LimitedExecutor {
fn new(max: usize) -> Self {
Self {
executor: Executor::new(),
shared: Arc::new(SharedState {
max,
active: AtomicUsize::new(0),
slot_available: Event::new(),
}),
}
}
async fn spawn<F: Future + Send + 'static>(&self, future: F) -> Task<F::Output>
where
F::Output: Send + 'static,
{
let listener = EventListener::new(&self.shared.slot_available);
pin!(listener);
let mut active = self.shared.active.load(Ordering::Acquire);
loop {
if active < self.shared.max {
let new_active = active + 1;
match self.shared.active.compare_exchange(
active,
new_active,
Ordering::SeqCst,
Ordering::SeqCst,
) {
Ok(_) => {
let future = {
let shared = self.shared.clone();
async move {
struct DecOnDrop(Arc<SharedState>);
impl Drop for DecOnDrop {
fn drop(&mut self) {
self.0.active.fetch_sub(1, Ordering::SeqCst);
self.0.slot_available.notify(usize::MAX);
}
}
let _dec = DecOnDrop(shared);
future.await
}
};
self.shared.slot_available.notify(1);
return self.executor.spawn(future);
}
Err(actual) => {
active = actual;
}
}
} else {
if listener.as_ref().is_listening() {
listener.as_mut().await;
} else {
listener.as_mut().listen();
}
active = self.shared.active.load(Ordering::Acquire);
}
}
}
async fn run<F: Future>(&self, future: F) -> F::Output {
self.executor.run(future).await
}
}
fn main() {
futures_lite::future::block_on(async {
let ex = Arc::new(LimitedExecutor::new(10));
ex.run({
let ex = ex.clone();
async move {
for i in 0..15 {
ex.spawn(async move {
async_io::Timer::after(Duration::from_millis(fastrand::u64(1..3))).await;
println!("Waiting task #{i} finished!");
})
.await
.detach();
}
let (start_tx, start_rx) = async_channel::bounded::<()>(1);
let mut current_rx = start_rx;
start_tx.send(()).await.unwrap();
for i in 0..25 {
let (next_tx, next_rx) = async_channel::bounded::<()>(1);
ex.spawn(async move {
current_rx.recv().await.unwrap();
println!("Channel task {i} woken up!");
next_tx.send(()).await.unwrap();
println!("Channel task {i} finished!");
})
.await
.detach();
current_rx = next_rx;
}
current_rx.recv().await.unwrap();
println!("All tasks finished!");
}
})
.await;
});
}