#![deny(unused_results)]
#![deny(missing_docs)]
pub use orchestra_proc_macro::{contextbounds, orchestra, subsystem};
#[doc(hidden)]
pub use metered;
#[doc(hidden)]
pub use tracing;
#[doc(hidden)]
pub use async_trait::async_trait;
#[doc(hidden)]
pub use futures::{
self,
channel::{mpsc, oneshot},
future::{BoxFuture, Fuse, Future},
poll, select,
stream::{self, select, select_with_strategy, FuturesUnordered, PollNext},
task::{Context, Poll},
FutureExt, StreamExt,
};
#[doc(hidden)]
pub use std::pin::Pin;
use std::sync::{
atomic::{self, AtomicUsize},
Arc,
};
#[doc(hidden)]
pub use std::time::Duration;
#[doc(hidden)]
pub use metered::TrySendError;
#[doc(hidden)]
pub use futures_timer::Delay;
use std::fmt;
#[cfg(test)]
mod tests;
#[dyn_clonable::clonable]
pub trait Spawner: Clone + Send + Sync {
fn spawn_blocking(
&self,
name: &'static str,
group: Option<&'static str>,
future: futures::future::BoxFuture<'static, ()>,
);
fn spawn(
&self,
name: &'static str,
group: Option<&'static str>,
future: futures::future::BoxFuture<'static, ()>,
);
}
pub enum ToOrchestra {
SpawnJob {
name: &'static str,
subsystem: Option<&'static str>,
s: BoxFuture<'static, ()>,
},
SpawnBlockingJob {
name: &'static str,
subsystem: Option<&'static str>,
s: BoxFuture<'static, ()>,
},
}
impl fmt::Debug for ToOrchestra {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::SpawnJob { name, subsystem, .. } => {
writeln!(f, "SpawnJob{{ {}, {} ..}}", name, subsystem.unwrap_or("default"))
},
Self::SpawnBlockingJob { name, subsystem, .. } => {
writeln!(f, "SpawnBlockingJob{{ {}, {} ..}}", name, subsystem.unwrap_or("default"))
},
}
}
}
pub trait MapSubsystem<T> {
type Output;
fn map_subsystem(&self, sub: T) -> Self::Output;
}
impl<F, T, U> MapSubsystem<T> for F
where
F: Fn(T) -> U,
{
type Output = U;
fn map_subsystem(&self, sub: T) -> U {
(self)(sub)
}
}
#[derive(Debug)]
pub struct MessagePacket<T> {
pub signals_received: usize,
pub message: T,
}
pub fn make_packet<T>(signals_received: usize, message: T) -> MessagePacket<T> {
MessagePacket { signals_received, message }
}
pub fn select_message_channel_strategy(_: &mut ()) -> PollNext {
PollNext::Right
}
pub type SubsystemIncomingMessages<M> = self::stream::SelectWithStrategy<
self::metered::MeteredReceiver<MessagePacket<M>>,
self::metered::UnboundedMeteredReceiver<MessagePacket<M>>,
fn(&mut ()) -> self::stream::PollNext,
(),
>;
#[derive(Debug, Default, Clone)]
pub struct SignalsReceived(Arc<AtomicUsize>);
impl SignalsReceived {
pub fn load(&self) -> usize {
self.0.load(atomic::Ordering::Acquire)
}
pub fn inc(&self) {
let _previous = self.0.fetch_add(1, atomic::Ordering::AcqRel);
}
}
pub trait AnnotateErrorOrigin: 'static + Send + Sync + std::error::Error {
fn with_origin(self, origin: &'static str) -> Self;
}
pub struct SpawnedSubsystem<E>
where
E: std::error::Error + Send + Sync + 'static + From<self::OrchestraError>,
{
pub name: &'static str,
pub future: BoxFuture<'static, Result<(), E>>,
}
#[derive(thiserror::Error, Debug)]
#[allow(missing_docs)]
pub enum OrchestraError {
#[error(transparent)]
NotifyCancellation(#[from] oneshot::Canceled),
#[error("Queue error")]
QueueError,
#[error("Failed to spawn task {0}")]
TaskSpawn(&'static str),
#[error(transparent)]
Infallible(#[from] std::convert::Infallible),
#[error("Failed to {0}")]
Context(String),
#[error("Subsystem stalled: {0}, source: {1}, type: {2}")]
SubsystemStalled(&'static str, &'static str, &'static str),
#[error("Error originated in {origin}")]
FromOrigin {
origin: &'static str,
#[source]
source: Box<dyn 'static + std::error::Error + Send + Sync>,
},
}
impl<T> From<metered::SendError<T>> for OrchestraError {
fn from(_err: metered::SendError<T>) -> Self {
Self::QueueError
}
}
pub type OrchestraResult<T> = std::result::Result<T, self::OrchestraError>;
#[derive(Clone)]
pub struct SubsystemMeters {
#[allow(missing_docs)]
pub bounded: metered::Meter,
#[allow(missing_docs)]
pub unbounded: metered::Meter,
#[allow(missing_docs)]
pub signals: metered::Meter,
}
impl SubsystemMeters {
pub fn read(&self) -> SubsystemMeterReadouts {
SubsystemMeterReadouts {
bounded: self.bounded.read(),
unbounded: self.unbounded.read(),
signals: self.signals.read(),
}
}
}
pub struct SubsystemMeterReadouts {
#[allow(missing_docs)]
pub bounded: metered::Readout,
#[allow(missing_docs)]
pub unbounded: metered::Readout,
#[allow(missing_docs)]
pub signals: metered::Readout,
}
pub struct SubsystemInstance<Message, Signal> {
pub tx_signal: crate::metered::MeteredSender<Signal>,
pub tx_bounded: crate::metered::MeteredSender<MessagePacket<Message>>,
pub meters: SubsystemMeters,
pub signals_received: usize,
pub name: &'static str,
}
#[derive(Debug)]
pub enum FromOrchestra<Message, Signal> {
Signal(Signal),
Communication {
msg: Message,
},
}
impl<Signal, Message> From<Signal> for FromOrchestra<Message, Signal> {
fn from(signal: Signal) -> Self {
Self::Signal(signal)
}
}
#[async_trait::async_trait]
pub trait SubsystemContext: Send + 'static {
type Message: ::std::fmt::Debug + Send + 'static;
type Signal: ::std::fmt::Debug + Send + 'static;
type OutgoingMessages: ::std::fmt::Debug + Send + 'static;
type Sender: Clone + Send + 'static + SubsystemSender<Self::OutgoingMessages>;
type Error: ::std::error::Error + ::std::convert::From<OrchestraError> + Sync + Send + 'static;
async fn try_recv(&mut self) -> Result<Option<FromOrchestra<Self::Message, Self::Signal>>, ()>;
async fn recv(&mut self) -> Result<FromOrchestra<Self::Message, Self::Signal>, Self::Error>;
async fn recv_signal(&mut self) -> Result<Self::Signal, Self::Error>;
fn spawn(
&mut self,
name: &'static str,
s: ::std::pin::Pin<Box<dyn crate::Future<Output = ()> + Send>>,
) -> Result<(), Self::Error>;
fn spawn_blocking(
&mut self,
name: &'static str,
s: ::std::pin::Pin<Box<dyn crate::Future<Output = ()> + Send>>,
) -> Result<(), Self::Error>;
async fn send_message<T>(&mut self, msg: T)
where
Self::OutgoingMessages: From<T> + Send,
T: Send,
{
self.sender().send_message(<Self::OutgoingMessages>::from(msg)).await
}
async fn send_messages<T, I>(&mut self, msgs: I)
where
Self::OutgoingMessages: From<T> + Send,
I: IntoIterator<Item = T> + Send,
I::IntoIter: Send,
T: Send,
{
self.sender()
.send_messages(msgs.into_iter().map(<Self::OutgoingMessages>::from))
.await
}
fn send_unbounded_message<X>(&mut self, msg: X)
where
Self::OutgoingMessages: From<X> + Send,
X: Send,
{
self.sender().send_unbounded_message(<Self::OutgoingMessages>::from(msg))
}
fn sender(&mut self) -> &mut Self::Sender;
}
pub trait Subsystem<Ctx, E>
where
Ctx: SubsystemContext,
E: std::error::Error + Send + Sync + 'static + From<self::OrchestraError>,
{
fn start(self, ctx: Ctx) -> SpawnedSubsystem<E>;
}
#[derive(Debug)]
pub enum PriorityLevel {
Normal,
High,
}
pub struct NormalPriority;
pub struct HighPriority;
pub trait Priority {
fn priority() -> PriorityLevel {
PriorityLevel::Normal
}
}
impl Priority for NormalPriority {
fn priority() -> PriorityLevel {
PriorityLevel::Normal
}
}
impl Priority for HighPriority {
fn priority() -> PriorityLevel {
PriorityLevel::High
}
}
#[async_trait::async_trait]
pub trait SubsystemSender<OutgoingMessage>: Clone + Send + 'static
where
OutgoingMessage: Send,
{
async fn send_message(&mut self, msg: OutgoingMessage);
async fn send_message_with_priority<P: Priority>(&mut self, msg: OutgoingMessage);
fn try_send_message(
&mut self,
msg: OutgoingMessage,
) -> Result<(), metered::TrySendError<OutgoingMessage>>;
fn try_send_message_with_priority<P: Priority>(
&mut self,
msg: OutgoingMessage,
) -> Result<(), metered::TrySendError<OutgoingMessage>>;
async fn send_messages<I>(&mut self, msgs: I)
where
I: IntoIterator<Item = OutgoingMessage> + Send,
I::IntoIter: Send;
fn send_unbounded_message(&mut self, msg: OutgoingMessage);
}
#[pin_project::pin_project]
pub struct Timeout<F: Future> {
#[pin]
future: F,
#[pin]
delay: Delay,
}
pub trait TimeoutExt: Future {
fn timeout(self, duration: Duration) -> Timeout<Self>
where
Self: Sized,
{
Timeout { future: self, delay: Delay::new(duration) }
}
}
impl<F> TimeoutExt for F where F: Future {}
impl<F> Future for Timeout<F>
where
F: Future,
{
type Output = Option<F::Output>;
fn poll(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
let this = self.project();
if this.delay.poll(ctx).is_ready() {
return Poll::Ready(None)
}
if let Poll::Ready(output) = this.future.poll(ctx) {
return Poll::Ready(Some(output))
}
Poll::Pending
}
}