pub struct Runtime { /* private fields */ }Expand description
The core of the distributed task management system.
Fundamentally, the Runtime is responsible for managing the channels that
Tasks and Directives use to
communicate, and provides a simple interface for interacting with these
channels.
This runtime should be used in an orchestration context, where a single node
is responsible for managing the execution of tasks. This is in contrast to
the WorkerRuntime, which is used by worker nodes that execute tasks.
The primary purpose of this runtime is to facilitate the instantiation of
new coordination channels for dispatching Tasks and receiving their
associated TaskResults. It takes care of namespacing new result channels
such that users can consume them in an isolated manner.
Directives will use this to create a siloed
channel upon which they can listen to only the results of the Tasks they
manage. It also takes care of synchronizing disparate sender and receiver
channels such that closing the sender terminates the receiver.
§Emulation
The main Runtime provides emulation functionality for the worker. This
can be enabled by passing the
config::Runtime::InMemory field as
part of the configuration to Runtime::from_config. This will spin up a
multi-threaded worker emulator that will execute multiple WorkerRuntimes
concurrently. This can be used to simulate a distributed environment
in-memory, and finds immediate practical use in writing tests.
See the runtime module documentation for more information on runtime semantics.
Implementations§
Source§impl Runtime
impl Runtime
Sourcepub async fn from_config(config: &Config, marker: Marker) -> Result<Self>
pub async fn from_config(config: &Config, marker: Marker) -> Result<Self>
pub async fn close(&self) -> Result<()>
Sourcepub async fn lease_coordinated_task_channel<'a, Op: Operation + 'a, Metadata: Serializable + 'a>(
&self,
) -> Result<(String, Box<dyn Publisher<Task<'a, Op, Metadata>> + Send + Unpin + Sync + 'a>, LeaseGuard<DynamicChannel, Box<dyn Stream<Item = (TaskResult<Op, Metadata>, Box<dyn Acker>)> + Send + Unpin + 'a>>)>
pub async fn lease_coordinated_task_channel<'a, Op: Operation + 'a, Metadata: Serializable + 'a>( &self, ) -> Result<(String, Box<dyn Publisher<Task<'a, Op, Metadata>> + Send + Unpin + Sync + 'a>, LeaseGuard<DynamicChannel, Box<dyn Stream<Item = (TaskResult<Op, Metadata>, Box<dyn Acker>)> + Send + Unpin + 'a>>)>
Leases a new discrete
coordinated
channel for asynchronously dispatching Tasks and receiving their
TaskResults.
§Design notes
- The returned receiver is wrapped in a
LeaseGuardthat will release the leasedChannelonce dropped. - A unique identifier is returned for the newly leased
Channelthat is bound to the receiver. This can be used to routeTaskResults back to that channel such that they’re propagated to provided receiver. - Even though the
senderandreceiverinteract with two distinct channels, they are bound bycoordinated_channel, which ensures that open/closed state and pending message state are synchronized between them. - The provided receiver automatically deserializes
AnyTaskResults into typedTaskResults on behalf of the caller.
Users should take care to close the sender once all tasks have been dispatched. This will ensure that the receiver is terminated once all results have been received. Without this, the receiver will block indefinitely. Alternatively, returning early without closing the sender will will be fine too, as the sender will be closed once dropped.
§Example
use paladin::{
RemoteExecute,
runtime::Runtime,
task::Task,
operation::{Operation, Result},
};
use serde::{Deserialize, Serialize};
use futures::StreamExt;
#[derive(Serialize, Deserialize)]
struct Metadata {
id: usize,
}
async fn run_task<Op: Operation>(op: &Op, runtime: &Runtime, input: Op::Input) -> anyhow::Result<()> {
let (identifier, mut sender, mut receiver) = runtime.lease_coordinated_task_channel::<Op, Metadata>().await?;
// Issue a task with the identifier of the receiver
sender.publish(&Task {
routing_key: identifier,
metadata: Metadata { id: 0 },
op,
input,
})
.await?;
sender.close().await?;
while let Some((result, acker)) = receiver.next().await {
// ... handle result
}
Ok(())
}§Metadata
Metadata is a generic parameter that implements Serializable, used
to encode additional information into a Task that is relevant to
the particular Directive orchestrating
it. This can be useful in cases where a
Directive needs to coordinate the
results of multiple Tasks.