orchestra/
lib.rs

1// Copyright (C) 2021 Parity Technologies (UK) Ltd.
2// SPDX-License-Identifier: Apache-2.0
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8// http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! # Orchestra
17//!
18//! `orchestra` provides a global information flow system in reference to system specifc
19//! process tokens.
20//! The token is arbitrary, but is used to notify all `Subsystem`s of what is relevant
21//! and what is not, leading to a so called _view_ of active tokens.
22//!
23//! An `orchestra` is something that allows spawning/stopping and orchestrating
24//! asynchronous tasks as well as establishing a well-defined and easy to use
25//! protocol that the tasks can use to communicate with each other. It is desired
26//! that this protocol is the only way tasks - called `Subsystem`s in the orchestra
27//! scope - communicate with each other, but that is not enforced and is the responsibility
28//! of the developer.
29//!
30//! The `Orchestra` is instantiated with a pre-defined set of `Subsystems` that
31//! share the same behavior from `Orchestra`'s point of view.
32//!
33//! ```text
34//!                              +-----------------------------+
35//!                              |         Orchesta            |
36//!                              +-----------------------------+
37//!
38//!             ................|  Orchestra "holds" these and uses |.............
39//!             .                  them to (re)start things                      .
40//!             .                                                                .
41//!             .  +-------------------+                +---------------------+  .
42//!             .  |   Subsystem1      |                |   Subsystem2        |  .
43//!             .  +-------------------+                +---------------------+  .
44//!             .           |                                       |            .
45//!             ..................................................................
46//!                         |                                       |
47//!                       start()                                 start()
48//!                         |                                       |
49//!                         V                                       V
50//!             ..................| Orchestra "runs" these |......................
51//!             .                                                                .
52//!             .  +--------------------+               +---------------------+  .
53//!             .  | SubsystemInstance1 | <-- bidir --> | SubsystemInstance2  |  .
54//!             .  +--------------------+               +---------------------+  .
55//!             .                                                                .
56//!             ..................................................................
57//! ```
58#![deny(unused_results)]
59#![deny(missing_docs)]
60// #![deny(unused_crate_dependencies)]
61
62pub use orchestra_proc_macro::{contextbounds, orchestra, subsystem};
63
64#[doc(hidden)]
65pub use metered;
66#[doc(hidden)]
67pub use tracing;
68
69#[doc(hidden)]
70pub use async_trait::async_trait;
71#[doc(hidden)]
72pub use futures::{
73	self,
74	channel::{mpsc, oneshot},
75	future::{BoxFuture, Fuse, Future},
76	poll, select,
77	stream::{self, select, select_with_strategy, FuturesUnordered, PollNext},
78	task::{Context, Poll},
79	FutureExt, StreamExt,
80};
81#[doc(hidden)]
82pub use std::pin::Pin;
83
84use std::sync::{
85	atomic::{self, AtomicUsize},
86	Arc,
87};
88#[doc(hidden)]
89pub use std::time::Duration;
90
91#[doc(hidden)]
92pub use metered::TrySendError;
93
94#[doc(hidden)]
95pub use futures_timer::Delay;
96
97use std::fmt;
98
99#[cfg(test)]
100mod tests;
101
102/// A spawner
103#[dyn_clonable::clonable]
104pub trait Spawner: Clone + Send + Sync {
105	/// Spawn the given blocking future.
106	///
107	/// The given `group` and `name` is used to identify the future in tracing.
108	fn spawn_blocking(
109		&self,
110		name: &'static str,
111		group: Option<&'static str>,
112		future: futures::future::BoxFuture<'static, ()>,
113	);
114	/// Spawn the given non-blocking future.
115	///
116	/// The given `group` and `name` is used to identify the future in tracing.
117	fn spawn(
118		&self,
119		name: &'static str,
120		group: Option<&'static str>,
121		future: futures::future::BoxFuture<'static, ()>,
122	);
123}
124
125/// A type of messages that are sent from a [`Subsystem`] to the declared orchestra.
126///
127/// Used to launch jobs.
128pub enum ToOrchestra {
129	/// A message that wraps something the `Subsystem` is desiring to
130	/// spawn on the orchestra and a `oneshot::Sender` to signal the result
131	/// of the spawn.
132	SpawnJob {
133		/// Name of the task to spawn which be shown in jaeger and tracing logs.
134		name: &'static str,
135		/// Subsystem of the task to spawn which be shown in jaeger and tracing logs.
136		subsystem: Option<&'static str>,
137		/// The future to execute.
138		s: BoxFuture<'static, ()>,
139	},
140
141	/// Same as `SpawnJob` but for blocking tasks to be executed on a
142	/// dedicated thread pool.
143	SpawnBlockingJob {
144		/// Name of the task to spawn which be shown in jaeger and tracing logs.
145		name: &'static str,
146		/// Subsystem of the task to spawn which be shown in jaeger and tracing logs.
147		subsystem: Option<&'static str>,
148		/// The future to execute.
149		s: BoxFuture<'static, ()>,
150	},
151}
152
153impl fmt::Debug for ToOrchestra {
154	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
155		match self {
156			Self::SpawnJob { name, subsystem, .. } => {
157				writeln!(f, "SpawnJob{{ {}, {} ..}}", name, subsystem.unwrap_or("default"))
158			},
159			Self::SpawnBlockingJob { name, subsystem, .. } => {
160				writeln!(f, "SpawnBlockingJob{{ {}, {} ..}}", name, subsystem.unwrap_or("default"))
161			},
162		}
163	}
164}
165
166/// A helper trait to map a subsystem to smth. else.
167pub trait MapSubsystem<T> {
168	/// The output type of the mapping.
169	type Output;
170
171	/// Consumes a `T` per subsystem, and maps it to `Self::Output`.
172	fn map_subsystem(&self, sub: T) -> Self::Output;
173}
174
175impl<F, T, U> MapSubsystem<T> for F
176where
177	F: Fn(T) -> U,
178{
179	type Output = U;
180
181	fn map_subsystem(&self, sub: T) -> U {
182		(self)(sub)
183	}
184}
185
186/// A wrapping type for messages.
187///
188/// Includes a counter to synchronize signals with messages,
189/// such that no inconsistent message sequences are prevented.
190#[derive(Debug)]
191pub struct MessagePacket<T> {
192	/// Signal level at the point of reception.
193	///
194	/// Required to assure signals were consumed _before_
195	/// consuming messages that are based on the assumption
196	/// that a certain signal was assumed.
197	pub signals_received: usize,
198	/// The message to be sent/consumed.
199	pub message: T,
200}
201
202/// Create a packet from its parts.
203pub fn make_packet<T>(signals_received: usize, message: T) -> MessagePacket<T> {
204	MessagePacket { signals_received, message }
205}
206
207/// A functor to specify strategy of the channels selection in the `SubsystemIncomingMessages`
208pub fn select_message_channel_strategy(_: &mut ()) -> PollNext {
209	PollNext::Right
210}
211
212/// Incoming messages from both the bounded and unbounded channel.
213pub type SubsystemIncomingMessages<M> = self::stream::SelectWithStrategy<
214	self::metered::MeteredReceiver<MessagePacket<M>>,
215	self::metered::UnboundedMeteredReceiver<MessagePacket<M>>,
216	fn(&mut ()) -> self::stream::PollNext,
217	(),
218>;
219
220/// Watermark to track the received signals.
221#[derive(Debug, Default, Clone)]
222pub struct SignalsReceived(Arc<AtomicUsize>);
223
224impl SignalsReceived {
225	/// Load the current value of received signals.
226	pub fn load(&self) -> usize {
227		// It's imperative that we prevent reading a stale value from memory because of reordering.
228		// Memory barrier to ensure that no reads or writes in the current thread before this load are reordered.
229		// All writes in other threads using release semantics become visible to the current thread.
230		self.0.load(atomic::Ordering::Acquire)
231	}
232
233	/// Increase the number of signals by one.
234	pub fn inc(&self) {
235		let _previous = self.0.fetch_add(1, atomic::Ordering::AcqRel);
236	}
237}
238
239/// A trait to support the origin annotation
240/// such that errors across subsystems can be easier tracked.
241pub trait AnnotateErrorOrigin: 'static + Send + Sync + std::error::Error {
242	/// Annotate the error with a origin `str`.
243	///
244	/// Commonly this is used to create nested enum variants.
245	///
246	/// ```rust,ignore
247	/// E::WithOrigin("I am originally from Cowtown.", E::Variant)
248	/// ```
249	fn with_origin(self, origin: &'static str) -> Self;
250}
251
252/// An asynchronous subsystem task..
253///
254/// In essence it's just a new type wrapping a `BoxFuture`.
255pub struct SpawnedSubsystem<E>
256where
257	E: std::error::Error + Send + Sync + 'static + From<self::OrchestraError>,
258{
259	/// Name of the subsystem being spawned.
260	pub name: &'static str,
261	/// The task of the subsystem being spawned.
262	pub future: BoxFuture<'static, Result<(), E>>,
263}
264
265/// An error type that describes faults that may happen
266///
267/// These are:
268///   * Channels being closed
269///   * Subsystems dying when they are not expected to
270///   * Subsystems not dying when they are told to die
271///   * etc.
272#[derive(thiserror::Error, Debug)]
273#[allow(missing_docs)]
274pub enum OrchestraError {
275	#[error(transparent)]
276	NotifyCancellation(#[from] oneshot::Canceled),
277
278	#[error("Queue error")]
279	QueueError,
280
281	#[error("Failed to spawn task {0}")]
282	TaskSpawn(&'static str),
283
284	#[error(transparent)]
285	Infallible(#[from] std::convert::Infallible),
286
287	#[error("Failed to {0}")]
288	Context(String),
289
290	#[error("Subsystem stalled: {0}, source: {1}, type: {2}")]
291	SubsystemStalled(&'static str, &'static str, &'static str),
292
293	/// Per origin (or subsystem) annotations to wrap an error.
294	#[error("Error originated in {origin}")]
295	FromOrigin {
296		/// An additional annotation tag for the origin of `source`.
297		origin: &'static str,
298		/// The wrapped error. Marked as source for tracking the error chain.
299		#[source]
300		source: Box<dyn 'static + std::error::Error + Send + Sync>,
301	},
302}
303
304impl<T> From<metered::SendError<T>> for OrchestraError {
305	fn from(_err: metered::SendError<T>) -> Self {
306		Self::QueueError
307	}
308}
309
310/// Alias for a result with error type `OrchestraError`.
311pub type OrchestraResult<T> = std::result::Result<T, self::OrchestraError>;
312
313/// Collection of meters related to a subsystem.
314#[derive(Clone)]
315pub struct SubsystemMeters {
316	#[allow(missing_docs)]
317	pub bounded: metered::Meter,
318	#[allow(missing_docs)]
319	pub unbounded: metered::Meter,
320	#[allow(missing_docs)]
321	pub signals: metered::Meter,
322}
323
324impl SubsystemMeters {
325	/// Read the values of all subsystem `Meter`s.
326	pub fn read(&self) -> SubsystemMeterReadouts {
327		SubsystemMeterReadouts {
328			bounded: self.bounded.read(),
329			unbounded: self.unbounded.read(),
330			signals: self.signals.read(),
331		}
332	}
333}
334
335/// Set of readouts of the `Meter`s of a subsystem.
336pub struct SubsystemMeterReadouts {
337	#[allow(missing_docs)]
338	pub bounded: metered::Readout,
339	#[allow(missing_docs)]
340	pub unbounded: metered::Readout,
341	#[allow(missing_docs)]
342	pub signals: metered::Readout,
343}
344
345/// A running instance of some [`Subsystem`].
346///
347/// [`Subsystem`]: trait.Subsystem.html
348///
349/// `M` here is the inner message type, and _not_ the generated `enum AllMessages` or `#message_wrapper` type.
350pub struct SubsystemInstance<Message, Signal> {
351	/// Send sink for `Signal`s to be sent to a subsystem.
352	pub tx_signal: crate::metered::MeteredSender<Signal>,
353	/// Send sink for `Message`s to be sent to a subsystem.
354	pub tx_bounded: crate::metered::MeteredSender<MessagePacket<Message>>,
355	/// All meters of the particular subsystem instance.
356	pub meters: SubsystemMeters,
357	/// The number of signals already received.
358	/// Required to assure messages and signals
359	/// are processed correctly.
360	pub signals_received: usize,
361	/// Name of the subsystem instance.
362	pub name: &'static str,
363}
364
365/// A message type that a subsystem receives from an orchestra.
366/// It wraps signals from an orchestra and messages that are circulating
367/// between subsystems.
368///
369/// It is generic over over the message type `M` that a particular `Subsystem` may use.
370#[derive(Debug)]
371pub enum FromOrchestra<Message, Signal> {
372	/// Signal from the `Orchestra`.
373	Signal(Signal),
374
375	/// Some other `Subsystem`'s message.
376	Communication {
377		/// Contained message
378		msg: Message,
379	},
380}
381
382impl<Signal, Message> From<Signal> for FromOrchestra<Message, Signal> {
383	fn from(signal: Signal) -> Self {
384		Self::Signal(signal)
385	}
386}
387
388/// A context type that is given to the [`Subsystem`] upon spawning.
389/// It can be used by [`Subsystem`] to communicate with other [`Subsystem`]s
390/// or spawn jobs.
391///
392/// [`Orchestra`]: struct.Orchestra.html
393/// [`SubsystemJob`]: trait.SubsystemJob.html
394#[async_trait::async_trait]
395pub trait SubsystemContext: Send + 'static {
396	/// The message type of this context. Subsystems launched with this context will expect
397	/// to receive messages of this type. Commonly uses the wrapping `enum` commonly called
398	/// `AllMessages`.
399	type Message: ::std::fmt::Debug + Send + 'static;
400	/// And the same for signals.
401	type Signal: ::std::fmt::Debug + Send + 'static;
402	/// The overarching messages `enum` for this particular subsystem.
403	type OutgoingMessages: ::std::fmt::Debug + Send + 'static;
404
405	// The overarching messages `enum` for this particular subsystem.
406	// type AllMessages: From<Self::OutgoingMessages> + From<Self::Message> + std::fmt::Debug + Send + 'static;
407
408	/// The sender type as provided by `sender()` and underlying.
409	type Sender: Clone + Send + 'static + SubsystemSender<Self::OutgoingMessages>;
410	/// The error type.
411	type Error: ::std::error::Error + ::std::convert::From<OrchestraError> + Sync + Send + 'static;
412
413	/// Try to asynchronously receive a message.
414	///
415	/// Has to be used with caution, if you loop over this without
416	/// using `pending!()` macro you will end up with a busy loop!
417	async fn try_recv(&mut self) -> Result<Option<FromOrchestra<Self::Message, Self::Signal>>, ()>;
418
419	/// Receive a signal or a message.
420	async fn recv(&mut self) -> Result<FromOrchestra<Self::Message, Self::Signal>, Self::Error>;
421
422	/// Receive a signal.
423	///
424	/// This method allows the subsystem to process signals while being blocked on processing messages.
425	/// See `examples/backpressure.rs` for an example.
426	async fn recv_signal(&mut self) -> Result<Self::Signal, Self::Error>;
427
428	/// Spawn a child task on the executor.
429	fn spawn(
430		&mut self,
431		name: &'static str,
432		s: ::std::pin::Pin<Box<dyn crate::Future<Output = ()> + Send>>,
433	) -> Result<(), Self::Error>;
434
435	/// Spawn a blocking child task on the executor's dedicated thread pool.
436	fn spawn_blocking(
437		&mut self,
438		name: &'static str,
439		s: ::std::pin::Pin<Box<dyn crate::Future<Output = ()> + Send>>,
440	) -> Result<(), Self::Error>;
441
442	/// Send a direct message to some other `Subsystem`, routed based on message type.
443	// #[deprecated(note = "Use `self.sender().send_message(msg) instead, avoid passing around the full context.")]
444	async fn send_message<T>(&mut self, msg: T)
445	where
446		Self::OutgoingMessages: From<T> + Send,
447		T: Send,
448	{
449		self.sender().send_message(<Self::OutgoingMessages>::from(msg)).await
450	}
451
452	/// Send multiple direct messages to other `Subsystem`s, routed based on message type.
453	// #[deprecated(note = "Use `self.sender().send_message(msg) instead, avoid passing around the full context.")]
454	async fn send_messages<T, I>(&mut self, msgs: I)
455	where
456		Self::OutgoingMessages: From<T> + Send,
457		I: IntoIterator<Item = T> + Send,
458		I::IntoIter: Send,
459		T: Send,
460	{
461		self.sender()
462			.send_messages(msgs.into_iter().map(<Self::OutgoingMessages>::from))
463			.await
464	}
465
466	/// Send a message using the unbounded connection.
467	// #[deprecated(note = "Use `self.sender().send_unbounded_message(msg) instead, avoid passing around the full context.")]
468	fn send_unbounded_message<X>(&mut self, msg: X)
469	where
470		Self::OutgoingMessages: From<X> + Send,
471		X: Send,
472	{
473		self.sender().send_unbounded_message(<Self::OutgoingMessages>::from(msg))
474	}
475
476	/// Obtain the sender.
477	fn sender(&mut self) -> &mut Self::Sender;
478}
479
480/// A trait that describes the [`Subsystem`]s that can run on the [`Orchestra`].
481///
482/// It is generic over the message type circulating in the system.
483/// The idea that we want some type containing persistent state that
484/// can spawn actually running subsystems when asked.
485///
486/// [`Orchestra`]: struct.Orchestra.html
487/// [`Subsystem`]: trait.Subsystem.html
488pub trait Subsystem<Ctx, E>
489where
490	Ctx: SubsystemContext,
491	E: std::error::Error + Send + Sync + 'static + From<self::OrchestraError>,
492{
493	/// Start this `Subsystem` and return `SpawnedSubsystem`.
494	fn start(self, ctx: Ctx) -> SpawnedSubsystem<E>;
495}
496
497/// Priority of messages sending to the individual subsystems.
498/// Only for the bounded channel sender.
499
500#[derive(Debug)]
501pub enum PriorityLevel {
502	/// Normal priority.
503	Normal,
504	/// High priority.
505	High,
506}
507/// Normal priority.
508pub struct NormalPriority;
509/// High priority.
510pub struct HighPriority;
511
512/// Describes the priority of the message.
513pub trait Priority {
514	/// The priority level.
515	fn priority() -> PriorityLevel {
516		PriorityLevel::Normal
517	}
518}
519impl Priority for NormalPriority {
520	fn priority() -> PriorityLevel {
521		PriorityLevel::Normal
522	}
523}
524impl Priority for HighPriority {
525	fn priority() -> PriorityLevel {
526		PriorityLevel::High
527	}
528}
529
530/// Sender end of a channel to interface with a subsystem.
531#[async_trait::async_trait]
532pub trait SubsystemSender<OutgoingMessage>: Clone + Send + 'static
533where
534	OutgoingMessage: Send,
535{
536	/// Send a direct message to some other `Subsystem`, routed based on message type.
537	async fn send_message(&mut self, msg: OutgoingMessage);
538
539	/// Send a direct message with defined priority to some other `Subsystem`, routed based on message type.
540	async fn send_message_with_priority<P: Priority>(&mut self, msg: OutgoingMessage);
541
542	/// Tries to send a direct message to some other `Subsystem`, routed based on message type.
543	/// This method is useful for cases where the message queue is bounded and the message is ok
544	/// to be dropped if the queue is full. If the queue is full, this method will return an error.
545	/// This method is not async and will not block the current task.
546	fn try_send_message(
547		&mut self,
548		msg: OutgoingMessage,
549	) -> Result<(), metered::TrySendError<OutgoingMessage>>;
550
551	/// Tries to send a direct message with defined priority to some other `Subsystem`, routed based on message type.
552	/// If the queue is full, this method will return an error.
553	/// This method is not async and will not block the current task.
554	fn try_send_message_with_priority<P: Priority>(
555		&mut self,
556		msg: OutgoingMessage,
557	) -> Result<(), metered::TrySendError<OutgoingMessage>>;
558
559	/// Send multiple direct messages to other `Subsystem`s, routed based on message type.
560	async fn send_messages<I>(&mut self, msgs: I)
561	where
562		I: IntoIterator<Item = OutgoingMessage> + Send,
563		I::IntoIter: Send;
564
565	/// Send a message onto the unbounded queue of some other `Subsystem`, routed based on message
566	/// type.
567	///
568	/// This function should be used only when there is some other bounding factor on the messages
569	/// sent with it. Otherwise, it risks a memory leak.
570	fn send_unbounded_message(&mut self, msg: OutgoingMessage);
571}
572
573/// A future that wraps another future with a `Delay` allowing for time-limited futures.
574#[pin_project::pin_project]
575pub struct Timeout<F: Future> {
576	#[pin]
577	future: F,
578	#[pin]
579	delay: Delay,
580}
581
582/// Extends `Future` to allow time-limited futures.
583pub trait TimeoutExt: Future {
584	/// Adds a timeout of `duration` to the given `Future`.
585	/// Returns a new `Future`.
586	fn timeout(self, duration: Duration) -> Timeout<Self>
587	where
588		Self: Sized,
589	{
590		Timeout { future: self, delay: Delay::new(duration) }
591	}
592}
593
594impl<F> TimeoutExt for F where F: Future {}
595
596impl<F> Future for Timeout<F>
597where
598	F: Future,
599{
600	type Output = Option<F::Output>;
601
602	fn poll(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
603		let this = self.project();
604
605		if this.delay.poll(ctx).is_ready() {
606			return Poll::Ready(None)
607		}
608
609		if let Poll::Ready(output) = this.future.poll(ctx) {
610			return Poll::Ready(Some(output))
611		}
612
613		Poll::Pending
614	}
615}