use std::sync::atomic::{AtomicUsize, Ordering};
use futures::{executor::ThreadPool, pin_mut};
use futures_time::{task::sleep, time};
use orchestra::*;
use rand::prelude::*;
static COUNTER: AtomicUsize = AtomicUsize::new(0);
#[derive(Debug, Clone)]
pub enum Signal {
Heartbeat,
}
#[derive(thiserror::Error, Debug)]
enum MyError {
#[error(transparent)]
Generated(#[from] OrchestraError),
}
#[derive(Debug)]
pub struct Message1(usize);
#[derive(Debug)]
pub struct Message2;
struct DummyEvent;
#[derive(Debug, Clone)]
pub struct MySpawner(pub ThreadPool);
impl Spawner for MySpawner {
fn spawn_blocking(
&self,
_task_name: &'static str,
_subsystem_name: Option<&'static str>,
future: futures::future::BoxFuture<'static, ()>,
) {
self.0.spawn_ok(future);
}
fn spawn(
&self,
_task_name: &'static str,
_subsystem_name: Option<&'static str>,
future: futures::future::BoxFuture<'static, ()>,
) {
self.0.spawn_ok(future);
}
}
struct SubsystemA;
struct SubsystemB;
#[orchestra(signal=Signal, event=DummyEvent, gen=AllMessages, error=OrchestraError, message_capacity=5, signal_capacity=600)]
struct MyOrchestra {
#[subsystem(Message1, sends: [Message2])]
sub_a: SubsystemA,
#[subsystem(Message2, sends: [Message1])]
sub_b: SubsystemB,
}
#[orchestra::subsystem(SubsystemA, error=OrchestraError)]
impl<Context> SubsystemA {
fn start(self, mut ctx: Context) -> SpawnedSubsystem<OrchestraError> {
SpawnedSubsystem {
name: "SubsystemA",
future: Box::pin(async move {
const TASK_LIMIT: usize = 5;
let mut tasks = FuturesUnordered::new();
'outer: loop {
loop {
select! {
from_orchestra = ctx.recv().fuse() => {
match from_orchestra {
Ok(FromOrchestra::Signal(sig)) => {
log::info!(target: "subsystem::A", "received SIGNAL {sig:?} in message-processing loop");
},
Ok(FromOrchestra::Communication { msg }) => {
log::info!(target: "subsystem::A", "received MESSAGE {msg:?}");
let Message1(id) = msg;
let dur = time::Duration::from_secs((5..30).choose(&mut rand::thread_rng()).unwrap());
tasks.push(async move {
log::info!(target: "task", "[{id}]: sleeping for {dur:?}");
sleep(dur).await;
log::info!(target: "task", "[{id}]: woke up");
id
});
if tasks.len() >= TASK_LIMIT {
break;
}
},
Err(_) => {
break 'outer;
}
}
},
id = tasks.select_next_some() => {
log::info!(target: "subsystem::A", "task {id} finished in message-processing loop");
}
}
}
log::warn!(target: "subsystem::A", "↓↓↓ saturated, only processing signals ↓↓↓");
loop {
select! {
signal = ctx.recv_signal().fuse() => {
match signal {
Ok(sig) => log::info!(target: "subsystem::A", "received SIGNAL {sig:?} in signal-processing loop"),
Err(_) => {
break 'outer;
}
}
},
id = tasks.select_next_some() => {
log::info!(target: "subsystem::A", "task {id} finished in signal-processing loop");
if tasks.len() < TASK_LIMIT {
break;
}
}
}
}
log::warn!(target: "subsystem::A", "↑↑↑ desaturated, processing everything ↑↑↑");
}
Ok(())
}),
}
}
}
#[orchestra::subsystem(SubsystemB, error=OrchestraError)]
impl<Context> SubsystemB {
fn start(self, mut ctx: Context) -> SpawnedSubsystem<OrchestraError> {
let mut sender = ctx.sender().clone();
SpawnedSubsystem {
name: "SubsystemB",
future: Box::pin(async move {
loop {
select! {
from_orchestra = ctx.recv().fuse() => {
match from_orchestra {
Ok(FromOrchestra::Signal(sig)) => {
let id = COUNTER.fetch_add(1, Ordering::AcqRel);
log::info!(target: "subsystem::B", "received SIGNAL {sig:?}, sending task [{id}]");
sender.send_message(Message1(id)).await;
log::info!(target: "subsystem::B", "successfully sent task [{id}]");
},
Ok(FromOrchestra::Communication { msg }) => {
log::info!(target: "subsystem::B", "received MESSAGE {msg:?}");
},
Err(_) => break
}
},
}
}
Ok(())
}),
}
}
}
fn main() {
env_logger::builder().filter_level(log::LevelFilter::Info).init();
let (mut orchestra, _handle) = MyOrchestra::builder()
.sub_a(SubsystemA)
.sub_b(SubsystemB)
.spawner(MySpawner(ThreadPool::new().unwrap()))
.build()
.unwrap();
let fut = orchestra.running_subsystems.into_future().fuse();
pin_mut!(fut);
let signal_spammer = async {
loop {
sleep(time::Duration::from_secs(1)).await;
let _ = orchestra.sub_a.send_signal(Signal::Heartbeat).await;
let _ = orchestra.sub_b.send_signal(Signal::Heartbeat).await;
}
};
pin_mut!(signal_spammer);
futures::executor::block_on(async {
loop {
select! {
_ = signal_spammer.as_mut().fuse() => (),
_ = fut => (),
}
}
})
}