pub struct Runtime { /* private fields */ }
Expand description
The core of the distributed task management system.
Fundamentally, the Runtime
is responsible for managing the channels that
Task
s and Directive
s use to
communicate, and provides a simple interface for interacting with these
channels.
This runtime should be used in an orchestration context, where a single node
is responsible for managing the execution of tasks. This is in contrast to
the WorkerRuntime
, which is used by worker nodes that execute tasks.
The primary purpose of this runtime is to facilitate the instantiation of
new coordination channels for dispatching Task
s and receiving their
associated TaskResult
s. It takes care of namespacing new result channels
such that users can consume them in an isolated manner.
Directive
s will use this to create a siloed
channel upon which they can listen to only the results of the Task
s they
manage. It also takes care of synchronizing disparate sender and receiver
channels such that closing the sender terminates the receiver.
§Emulation
The main Runtime
provides emulation functionality for the worker. This
can be enabled by passing the
config::Runtime::InMemory
field as
part of the configuration to Runtime::from_config
. This will spin up a
multi-threaded worker emulator that will execute multiple WorkerRuntime
s
concurrently. This can be used to simulate a distributed environment
in-memory, and finds immediate practical use in writing tests.
See the runtime module documentation for more information on runtime semantics.
Implementations§
Source§impl Runtime
impl Runtime
Sourcepub async fn from_config(config: &Config, marker: Marker) -> Result<Self>
pub async fn from_config(config: &Config, marker: Marker) -> Result<Self>
pub async fn close(&self) -> Result<()>
Sourcepub async fn lease_coordinated_task_channel<'a, Op: Operation + 'a, Metadata: Serializable + 'a>(
&self,
) -> Result<(String, Box<dyn Publisher<Task<'a, Op, Metadata>> + Send + Unpin + Sync + 'a>, LeaseGuard<DynamicChannel, Box<dyn Stream<Item = (TaskResult<Op, Metadata>, Box<dyn Acker>)> + Send + Unpin + 'a>>)>
pub async fn lease_coordinated_task_channel<'a, Op: Operation + 'a, Metadata: Serializable + 'a>( &self, ) -> Result<(String, Box<dyn Publisher<Task<'a, Op, Metadata>> + Send + Unpin + Sync + 'a>, LeaseGuard<DynamicChannel, Box<dyn Stream<Item = (TaskResult<Op, Metadata>, Box<dyn Acker>)> + Send + Unpin + 'a>>)>
Leases a new discrete
coordinated
channel for asynchronously dispatching Task
s and receiving their
TaskResult
s.
§Design notes
- The returned receiver is wrapped in a
LeaseGuard
that will release the leasedChannel
once dropped. - A unique identifier is returned for the newly leased
Channel
that is bound to the receiver. This can be used to routeTaskResult
s back to that channel such that they’re propagated to provided receiver. - Even though the
sender
andreceiver
interact with two distinct channels, they are bound bycoordinated_channel
, which ensures that open/closed state and pending message state are synchronized between them. - The provided receiver automatically deserializes
AnyTaskResult
s into typedTaskResult
s on behalf of the caller.
Users should take care to close the sender once all tasks have been dispatched. This will ensure that the receiver is terminated once all results have been received. Without this, the receiver will block indefinitely. Alternatively, returning early without closing the sender will will be fine too, as the sender will be closed once dropped.
§Example
use paladin::{
RemoteExecute,
runtime::Runtime,
task::Task,
operation::{Operation, Result},
};
use serde::{Deserialize, Serialize};
use futures::StreamExt;
#[derive(Serialize, Deserialize)]
struct Metadata {
id: usize,
}
async fn run_task<Op: Operation>(op: &Op, runtime: &Runtime, input: Op::Input) -> anyhow::Result<()> {
let (identifier, mut sender, mut receiver) = runtime.lease_coordinated_task_channel::<Op, Metadata>().await?;
// Issue a task with the identifier of the receiver
sender.publish(&Task {
routing_key: identifier,
metadata: Metadata { id: 0 },
op,
input,
})
.await?;
sender.close().await?;
while let Some((result, acker)) = receiver.next().await {
// ... handle result
}
Ok(())
}
§Metadata
Metadata
is a generic parameter that implements Serializable
, used
to encode additional information into a Task
that is relevant to
the particular Directive
orchestrating
it. This can be useful in cases where a
Directive
needs to coordinate the
results of multiple Task
s.