orchestra/lib.rs
1// Copyright (C) 2021 Parity Technologies (UK) Ltd.
2// SPDX-License-Identifier: Apache-2.0
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8// http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! # Orchestra
17//!
18//! `orchestra` provides a global information flow system in reference to system specifc
19//! process tokens.
20//! The token is arbitrary, but is used to notify all `Subsystem`s of what is relevant
21//! and what is not, leading to a so called _view_ of active tokens.
22//!
23//! An `orchestra` is something that allows spawning/stopping and orchestrating
24//! asynchronous tasks as well as establishing a well-defined and easy to use
25//! protocol that the tasks can use to communicate with each other. It is desired
26//! that this protocol is the only way tasks - called `Subsystem`s in the orchestra
27//! scope - communicate with each other, but that is not enforced and is the responsibility
28//! of the developer.
29//!
30//! The `Orchestra` is instantiated with a pre-defined set of `Subsystems` that
31//! share the same behavior from `Orchestra`'s point of view.
32//!
33//! ```text
34//! +-----------------------------+
35//! | Orchesta |
36//! +-----------------------------+
37//!
38//! ................| Orchestra "holds" these and uses |.............
39//! . them to (re)start things .
40//! . .
41//! . +-------------------+ +---------------------+ .
42//! . | Subsystem1 | | Subsystem2 | .
43//! . +-------------------+ +---------------------+ .
44//! . | | .
45//! ..................................................................
46//! | |
47//! start() start()
48//! | |
49//! V V
50//! ..................| Orchestra "runs" these |......................
51//! . .
52//! . +--------------------+ +---------------------+ .
53//! . | SubsystemInstance1 | <-- bidir --> | SubsystemInstance2 | .
54//! . +--------------------+ +---------------------+ .
55//! . .
56//! ..................................................................
57//! ```
58#![deny(unused_results)]
59#![deny(missing_docs)]
60// #![deny(unused_crate_dependencies)]
61
62pub use orchestra_proc_macro::{contextbounds, orchestra, subsystem};
63
64#[doc(hidden)]
65pub use metered;
66#[doc(hidden)]
67pub use tracing;
68
69#[doc(hidden)]
70pub use async_trait::async_trait;
71#[doc(hidden)]
72pub use futures::{
73 self,
74 channel::{mpsc, oneshot},
75 future::{BoxFuture, Fuse, Future},
76 poll, select,
77 stream::{self, select, select_with_strategy, FuturesUnordered, PollNext},
78 task::{Context, Poll},
79 FutureExt, StreamExt,
80};
81#[doc(hidden)]
82pub use std::pin::Pin;
83
84use std::sync::{
85 atomic::{self, AtomicUsize},
86 Arc,
87};
88#[doc(hidden)]
89pub use std::time::Duration;
90
91#[doc(hidden)]
92pub use metered::TrySendError;
93
94#[doc(hidden)]
95pub use futures_timer::Delay;
96
97use std::fmt;
98
99#[cfg(test)]
100mod tests;
101
102/// A spawner
103#[dyn_clonable::clonable]
104pub trait Spawner: Clone + Send + Sync {
105 /// Spawn the given blocking future.
106 ///
107 /// The given `group` and `name` is used to identify the future in tracing.
108 fn spawn_blocking(
109 &self,
110 name: &'static str,
111 group: Option<&'static str>,
112 future: futures::future::BoxFuture<'static, ()>,
113 );
114 /// Spawn the given non-blocking future.
115 ///
116 /// The given `group` and `name` is used to identify the future in tracing.
117 fn spawn(
118 &self,
119 name: &'static str,
120 group: Option<&'static str>,
121 future: futures::future::BoxFuture<'static, ()>,
122 );
123}
124
125/// A type of messages that are sent from a [`Subsystem`] to the declared orchestra.
126///
127/// Used to launch jobs.
128pub enum ToOrchestra {
129 /// A message that wraps something the `Subsystem` is desiring to
130 /// spawn on the orchestra and a `oneshot::Sender` to signal the result
131 /// of the spawn.
132 SpawnJob {
133 /// Name of the task to spawn which be shown in jaeger and tracing logs.
134 name: &'static str,
135 /// Subsystem of the task to spawn which be shown in jaeger and tracing logs.
136 subsystem: Option<&'static str>,
137 /// The future to execute.
138 s: BoxFuture<'static, ()>,
139 },
140
141 /// Same as `SpawnJob` but for blocking tasks to be executed on a
142 /// dedicated thread pool.
143 SpawnBlockingJob {
144 /// Name of the task to spawn which be shown in jaeger and tracing logs.
145 name: &'static str,
146 /// Subsystem of the task to spawn which be shown in jaeger and tracing logs.
147 subsystem: Option<&'static str>,
148 /// The future to execute.
149 s: BoxFuture<'static, ()>,
150 },
151}
152
153impl fmt::Debug for ToOrchestra {
154 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
155 match self {
156 Self::SpawnJob { name, subsystem, .. } => {
157 writeln!(f, "SpawnJob{{ {}, {} ..}}", name, subsystem.unwrap_or("default"))
158 },
159 Self::SpawnBlockingJob { name, subsystem, .. } => {
160 writeln!(f, "SpawnBlockingJob{{ {}, {} ..}}", name, subsystem.unwrap_or("default"))
161 },
162 }
163 }
164}
165
166/// A helper trait to map a subsystem to smth. else.
167pub trait MapSubsystem<T> {
168 /// The output type of the mapping.
169 type Output;
170
171 /// Consumes a `T` per subsystem, and maps it to `Self::Output`.
172 fn map_subsystem(&self, sub: T) -> Self::Output;
173}
174
175impl<F, T, U> MapSubsystem<T> for F
176where
177 F: Fn(T) -> U,
178{
179 type Output = U;
180
181 fn map_subsystem(&self, sub: T) -> U {
182 (self)(sub)
183 }
184}
185
186/// A wrapping type for messages.
187///
188/// Includes a counter to synchronize signals with messages,
189/// such that no inconsistent message sequences are prevented.
190#[derive(Debug)]
191pub struct MessagePacket<T> {
192 /// Signal level at the point of reception.
193 ///
194 /// Required to assure signals were consumed _before_
195 /// consuming messages that are based on the assumption
196 /// that a certain signal was assumed.
197 pub signals_received: usize,
198 /// The message to be sent/consumed.
199 pub message: T,
200}
201
202/// Create a packet from its parts.
203pub fn make_packet<T>(signals_received: usize, message: T) -> MessagePacket<T> {
204 MessagePacket { signals_received, message }
205}
206
207/// A functor to specify strategy of the channels selection in the `SubsystemIncomingMessages`
208pub fn select_message_channel_strategy(_: &mut ()) -> PollNext {
209 PollNext::Right
210}
211
212/// Incoming messages from both the bounded and unbounded channel.
213pub type SubsystemIncomingMessages<M> = self::stream::SelectWithStrategy<
214 self::metered::MeteredReceiver<MessagePacket<M>>,
215 self::metered::UnboundedMeteredReceiver<MessagePacket<M>>,
216 fn(&mut ()) -> self::stream::PollNext,
217 (),
218>;
219
220/// Watermark to track the received signals.
221#[derive(Debug, Default, Clone)]
222pub struct SignalsReceived(Arc<AtomicUsize>);
223
224impl SignalsReceived {
225 /// Load the current value of received signals.
226 pub fn load(&self) -> usize {
227 // It's imperative that we prevent reading a stale value from memory because of reordering.
228 // Memory barrier to ensure that no reads or writes in the current thread before this load are reordered.
229 // All writes in other threads using release semantics become visible to the current thread.
230 self.0.load(atomic::Ordering::Acquire)
231 }
232
233 /// Increase the number of signals by one.
234 pub fn inc(&self) {
235 let _previous = self.0.fetch_add(1, atomic::Ordering::AcqRel);
236 }
237}
238
239/// A trait to support the origin annotation
240/// such that errors across subsystems can be easier tracked.
241pub trait AnnotateErrorOrigin: 'static + Send + Sync + std::error::Error {
242 /// Annotate the error with a origin `str`.
243 ///
244 /// Commonly this is used to create nested enum variants.
245 ///
246 /// ```rust,ignore
247 /// E::WithOrigin("I am originally from Cowtown.", E::Variant)
248 /// ```
249 fn with_origin(self, origin: &'static str) -> Self;
250}
251
252/// An asynchronous subsystem task..
253///
254/// In essence it's just a new type wrapping a `BoxFuture`.
255pub struct SpawnedSubsystem<E>
256where
257 E: std::error::Error + Send + Sync + 'static + From<self::OrchestraError>,
258{
259 /// Name of the subsystem being spawned.
260 pub name: &'static str,
261 /// The task of the subsystem being spawned.
262 pub future: BoxFuture<'static, Result<(), E>>,
263}
264
265/// An error type that describes faults that may happen
266///
267/// These are:
268/// * Channels being closed
269/// * Subsystems dying when they are not expected to
270/// * Subsystems not dying when they are told to die
271/// * etc.
272#[derive(thiserror::Error, Debug)]
273#[allow(missing_docs)]
274pub enum OrchestraError {
275 #[error(transparent)]
276 NotifyCancellation(#[from] oneshot::Canceled),
277
278 #[error("Queue error")]
279 QueueError,
280
281 #[error("Failed to spawn task {0}")]
282 TaskSpawn(&'static str),
283
284 #[error(transparent)]
285 Infallible(#[from] std::convert::Infallible),
286
287 #[error("Failed to {0}")]
288 Context(String),
289
290 #[error("Subsystem stalled: {0}, source: {1}, type: {2}")]
291 SubsystemStalled(&'static str, &'static str, &'static str),
292
293 /// Per origin (or subsystem) annotations to wrap an error.
294 #[error("Error originated in {origin}")]
295 FromOrigin {
296 /// An additional annotation tag for the origin of `source`.
297 origin: &'static str,
298 /// The wrapped error. Marked as source for tracking the error chain.
299 #[source]
300 source: Box<dyn 'static + std::error::Error + Send + Sync>,
301 },
302}
303
304impl<T> From<metered::SendError<T>> for OrchestraError {
305 fn from(_err: metered::SendError<T>) -> Self {
306 Self::QueueError
307 }
308}
309
310/// Alias for a result with error type `OrchestraError`.
311pub type OrchestraResult<T> = std::result::Result<T, self::OrchestraError>;
312
313/// Collection of meters related to a subsystem.
314#[derive(Clone)]
315pub struct SubsystemMeters {
316 #[allow(missing_docs)]
317 pub bounded: metered::Meter,
318 #[allow(missing_docs)]
319 pub unbounded: metered::Meter,
320 #[allow(missing_docs)]
321 pub signals: metered::Meter,
322}
323
324impl SubsystemMeters {
325 /// Read the values of all subsystem `Meter`s.
326 pub fn read(&self) -> SubsystemMeterReadouts {
327 SubsystemMeterReadouts {
328 bounded: self.bounded.read(),
329 unbounded: self.unbounded.read(),
330 signals: self.signals.read(),
331 }
332 }
333}
334
335/// Set of readouts of the `Meter`s of a subsystem.
336pub struct SubsystemMeterReadouts {
337 #[allow(missing_docs)]
338 pub bounded: metered::Readout,
339 #[allow(missing_docs)]
340 pub unbounded: metered::Readout,
341 #[allow(missing_docs)]
342 pub signals: metered::Readout,
343}
344
345/// A running instance of some [`Subsystem`].
346///
347/// [`Subsystem`]: trait.Subsystem.html
348///
349/// `M` here is the inner message type, and _not_ the generated `enum AllMessages` or `#message_wrapper` type.
350pub struct SubsystemInstance<Message, Signal> {
351 /// Send sink for `Signal`s to be sent to a subsystem.
352 pub tx_signal: crate::metered::MeteredSender<Signal>,
353 /// Send sink for `Message`s to be sent to a subsystem.
354 pub tx_bounded: crate::metered::MeteredSender<MessagePacket<Message>>,
355 /// All meters of the particular subsystem instance.
356 pub meters: SubsystemMeters,
357 /// The number of signals already received.
358 /// Required to assure messages and signals
359 /// are processed correctly.
360 pub signals_received: usize,
361 /// Name of the subsystem instance.
362 pub name: &'static str,
363}
364
365/// A message type that a subsystem receives from an orchestra.
366/// It wraps signals from an orchestra and messages that are circulating
367/// between subsystems.
368///
369/// It is generic over over the message type `M` that a particular `Subsystem` may use.
370#[derive(Debug)]
371pub enum FromOrchestra<Message, Signal> {
372 /// Signal from the `Orchestra`.
373 Signal(Signal),
374
375 /// Some other `Subsystem`'s message.
376 Communication {
377 /// Contained message
378 msg: Message,
379 },
380}
381
382impl<Signal, Message> From<Signal> for FromOrchestra<Message, Signal> {
383 fn from(signal: Signal) -> Self {
384 Self::Signal(signal)
385 }
386}
387
388/// A context type that is given to the [`Subsystem`] upon spawning.
389/// It can be used by [`Subsystem`] to communicate with other [`Subsystem`]s
390/// or spawn jobs.
391///
392/// [`Orchestra`]: struct.Orchestra.html
393/// [`SubsystemJob`]: trait.SubsystemJob.html
394#[async_trait::async_trait]
395pub trait SubsystemContext: Send + 'static {
396 /// The message type of this context. Subsystems launched with this context will expect
397 /// to receive messages of this type. Commonly uses the wrapping `enum` commonly called
398 /// `AllMessages`.
399 type Message: ::std::fmt::Debug + Send + 'static;
400 /// And the same for signals.
401 type Signal: ::std::fmt::Debug + Send + 'static;
402 /// The overarching messages `enum` for this particular subsystem.
403 type OutgoingMessages: ::std::fmt::Debug + Send + 'static;
404
405 // The overarching messages `enum` for this particular subsystem.
406 // type AllMessages: From<Self::OutgoingMessages> + From<Self::Message> + std::fmt::Debug + Send + 'static;
407
408 /// The sender type as provided by `sender()` and underlying.
409 type Sender: Clone + Send + 'static + SubsystemSender<Self::OutgoingMessages>;
410 /// The error type.
411 type Error: ::std::error::Error + ::std::convert::From<OrchestraError> + Sync + Send + 'static;
412
413 /// Try to asynchronously receive a message.
414 ///
415 /// Has to be used with caution, if you loop over this without
416 /// using `pending!()` macro you will end up with a busy loop!
417 async fn try_recv(&mut self) -> Result<Option<FromOrchestra<Self::Message, Self::Signal>>, ()>;
418
419 /// Receive a signal or a message.
420 async fn recv(&mut self) -> Result<FromOrchestra<Self::Message, Self::Signal>, Self::Error>;
421
422 /// Receive a signal.
423 ///
424 /// This method allows the subsystem to process signals while being blocked on processing messages.
425 /// See `examples/backpressure.rs` for an example.
426 async fn recv_signal(&mut self) -> Result<Self::Signal, Self::Error>;
427
428 /// Spawn a child task on the executor.
429 fn spawn(
430 &mut self,
431 name: &'static str,
432 s: ::std::pin::Pin<Box<dyn crate::Future<Output = ()> + Send>>,
433 ) -> Result<(), Self::Error>;
434
435 /// Spawn a blocking child task on the executor's dedicated thread pool.
436 fn spawn_blocking(
437 &mut self,
438 name: &'static str,
439 s: ::std::pin::Pin<Box<dyn crate::Future<Output = ()> + Send>>,
440 ) -> Result<(), Self::Error>;
441
442 /// Send a direct message to some other `Subsystem`, routed based on message type.
443 // #[deprecated(note = "Use `self.sender().send_message(msg) instead, avoid passing around the full context.")]
444 async fn send_message<T>(&mut self, msg: T)
445 where
446 Self::OutgoingMessages: From<T> + Send,
447 T: Send,
448 {
449 self.sender().send_message(<Self::OutgoingMessages>::from(msg)).await
450 }
451
452 /// Send multiple direct messages to other `Subsystem`s, routed based on message type.
453 // #[deprecated(note = "Use `self.sender().send_message(msg) instead, avoid passing around the full context.")]
454 async fn send_messages<T, I>(&mut self, msgs: I)
455 where
456 Self::OutgoingMessages: From<T> + Send,
457 I: IntoIterator<Item = T> + Send,
458 I::IntoIter: Send,
459 T: Send,
460 {
461 self.sender()
462 .send_messages(msgs.into_iter().map(<Self::OutgoingMessages>::from))
463 .await
464 }
465
466 /// Send a message using the unbounded connection.
467 // #[deprecated(note = "Use `self.sender().send_unbounded_message(msg) instead, avoid passing around the full context.")]
468 fn send_unbounded_message<X>(&mut self, msg: X)
469 where
470 Self::OutgoingMessages: From<X> + Send,
471 X: Send,
472 {
473 self.sender().send_unbounded_message(<Self::OutgoingMessages>::from(msg))
474 }
475
476 /// Obtain the sender.
477 fn sender(&mut self) -> &mut Self::Sender;
478}
479
480/// A trait that describes the [`Subsystem`]s that can run on the [`Orchestra`].
481///
482/// It is generic over the message type circulating in the system.
483/// The idea that we want some type containing persistent state that
484/// can spawn actually running subsystems when asked.
485///
486/// [`Orchestra`]: struct.Orchestra.html
487/// [`Subsystem`]: trait.Subsystem.html
488pub trait Subsystem<Ctx, E>
489where
490 Ctx: SubsystemContext,
491 E: std::error::Error + Send + Sync + 'static + From<self::OrchestraError>,
492{
493 /// Start this `Subsystem` and return `SpawnedSubsystem`.
494 fn start(self, ctx: Ctx) -> SpawnedSubsystem<E>;
495}
496
497/// Priority of messages sending to the individual subsystems.
498/// Only for the bounded channel sender.
499
500#[derive(Debug)]
501pub enum PriorityLevel {
502 /// Normal priority.
503 Normal,
504 /// High priority.
505 High,
506}
507/// Normal priority.
508pub struct NormalPriority;
509/// High priority.
510pub struct HighPriority;
511
512/// Describes the priority of the message.
513pub trait Priority {
514 /// The priority level.
515 fn priority() -> PriorityLevel {
516 PriorityLevel::Normal
517 }
518}
519impl Priority for NormalPriority {
520 fn priority() -> PriorityLevel {
521 PriorityLevel::Normal
522 }
523}
524impl Priority for HighPriority {
525 fn priority() -> PriorityLevel {
526 PriorityLevel::High
527 }
528}
529
530/// Sender end of a channel to interface with a subsystem.
531#[async_trait::async_trait]
532pub trait SubsystemSender<OutgoingMessage>: Clone + Send + 'static
533where
534 OutgoingMessage: Send,
535{
536 /// Send a direct message to some other `Subsystem`, routed based on message type.
537 async fn send_message(&mut self, msg: OutgoingMessage);
538
539 /// Send a direct message with defined priority to some other `Subsystem`, routed based on message type.
540 async fn send_message_with_priority<P: Priority>(&mut self, msg: OutgoingMessage);
541
542 /// Tries to send a direct message to some other `Subsystem`, routed based on message type.
543 /// This method is useful for cases where the message queue is bounded and the message is ok
544 /// to be dropped if the queue is full. If the queue is full, this method will return an error.
545 /// This method is not async and will not block the current task.
546 fn try_send_message(
547 &mut self,
548 msg: OutgoingMessage,
549 ) -> Result<(), metered::TrySendError<OutgoingMessage>>;
550
551 /// Tries to send a direct message with defined priority to some other `Subsystem`, routed based on message type.
552 /// If the queue is full, this method will return an error.
553 /// This method is not async and will not block the current task.
554 fn try_send_message_with_priority<P: Priority>(
555 &mut self,
556 msg: OutgoingMessage,
557 ) -> Result<(), metered::TrySendError<OutgoingMessage>>;
558
559 /// Send multiple direct messages to other `Subsystem`s, routed based on message type.
560 async fn send_messages<I>(&mut self, msgs: I)
561 where
562 I: IntoIterator<Item = OutgoingMessage> + Send,
563 I::IntoIter: Send;
564
565 /// Send a message onto the unbounded queue of some other `Subsystem`, routed based on message
566 /// type.
567 ///
568 /// This function should be used only when there is some other bounding factor on the messages
569 /// sent with it. Otherwise, it risks a memory leak.
570 fn send_unbounded_message(&mut self, msg: OutgoingMessage);
571}
572
573/// A future that wraps another future with a `Delay` allowing for time-limited futures.
574#[pin_project::pin_project]
575pub struct Timeout<F: Future> {
576 #[pin]
577 future: F,
578 #[pin]
579 delay: Delay,
580}
581
582/// Extends `Future` to allow time-limited futures.
583pub trait TimeoutExt: Future {
584 /// Adds a timeout of `duration` to the given `Future`.
585 /// Returns a new `Future`.
586 fn timeout(self, duration: Duration) -> Timeout<Self>
587 where
588 Self: Sized,
589 {
590 Timeout { future: self, delay: Delay::new(duration) }
591 }
592}
593
594impl<F> TimeoutExt for F where F: Future {}
595
596impl<F> Future for Timeout<F>
597where
598 F: Future,
599{
600 type Output = Option<F::Output>;
601
602 fn poll(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
603 let this = self.project();
604
605 if this.delay.poll(ctx).is_ready() {
606 return Poll::Ready(None)
607 }
608
609 if let Poll::Ready(output) = this.future.poll(ctx) {
610 return Poll::Ready(Some(output))
611 }
612
613 Poll::Pending
614 }
615}