use async_executor::{Executor, Task};
use async_lock::Semaphore;
use std::{future::Future, sync::Arc, time::Duration};
struct LimitedExecutor {
executor: Executor<'static>,
semaphore: Arc<Semaphore>,
}
impl LimitedExecutor {
fn new(max: usize) -> Self {
Self {
executor: Executor::new(),
semaphore: Semaphore::new(max).into(),
}
}
async fn spawn<F: Future + Send + 'static>(&self, future: F) -> Task<F::Output>
where
F::Output: Send + 'static,
{
let permit = self.semaphore.acquire_arc().await;
let future = async move {
let result = future.await;
drop(permit);
result
};
self.executor.spawn(future)
}
async fn run<F: Future>(&self, future: F) -> F::Output {
self.executor.run(future).await
}
}
fn main() {
futures_lite::future::block_on(async {
let ex = Arc::new(LimitedExecutor::new(10));
ex.run({
let ex = ex.clone();
async move {
for i in 0..15 {
ex.spawn(async move {
async_io::Timer::after(Duration::from_millis(fastrand::u64(1..3))).await;
println!("Waiting task #{i} finished!");
})
.await
.detach();
}
let (start_tx, start_rx) = async_channel::bounded::<()>(1);
let mut current_rx = start_rx;
start_tx.send(()).await.unwrap();
for i in 0..25 {
let (next_tx, next_rx) = async_channel::bounded::<()>(1);
ex.spawn(async move {
current_rx.recv().await.unwrap();
println!("Channel task {i} woken up!");
next_tx.send(()).await.unwrap();
println!("Channel task {i} finished!");
})
.await
.detach();
current_rx = next_rx;
}
current_rx.recv().await.unwrap();
println!("All tasks finished!");
}
})
.await;
});
}