Struct ockam_node::Context
source · pub struct Context { /* private fields */ }
Expand description
Context contains Node state and references to the runtime.
Implementations§
source§impl Context
impl Context
sourcepub fn flow_controls(&self) -> &FlowControls
pub fn flow_controls(&self) -> &FlowControls
Shared FlowControls
instance
source§impl Context
impl Context
sourcepub async fn set_cluster<S: Into<String>>(&self, label: S) -> Result<()>
pub async fn set_cluster<S: Into<String>>(&self, label: S) -> Result<()>
Assign the current worker to a cluster
A cluster is a set of workers that should be stopped together when the node is stopped or parts of the system are reloaded. This is not to be confused with supervisors!
By adding your worker to a cluster you signal to the runtime that your worker may be depended on by other workers that should be stopped first.
Your cluster name MUST NOT start with _internals.
or
ockam.
!
Clusters are de-allocated in reverse order of their initialisation when the node is stopped.
sourcepub async fn list_workers(&self) -> Result<Vec<Address>>
pub async fn list_workers(&self) -> Result<Vec<Address>>
Return a list of all available worker addresses on a node
source§impl Context
impl Context
sourcepub async fn new_detached_with_mailboxes(
&self,
mailboxes: Mailboxes
) -> Result<DetachedContext>
pub async fn new_detached_with_mailboxes( &self, mailboxes: Mailboxes ) -> Result<DetachedContext>
TODO basically we can just rename Self::new_detached_impl()
sourcepub async fn new_detached(
&self,
address: impl Into<Address>,
incoming: impl IncomingAccessControl,
outgoing: impl OutgoingAccessControl
) -> Result<DetachedContext>
pub async fn new_detached( &self, address: impl Into<Address>, incoming: impl IncomingAccessControl, outgoing: impl OutgoingAccessControl ) -> Result<DetachedContext>
Create a new detached Context
without spawning a full worker
Note: this function is very low-level. For most users
start_worker()
is the recommended way
to create a new worker context.
source§impl Context
impl Context
sourcepub async fn receive<M: Message>(&mut self) -> Result<Routed<M>>
pub async fn receive<M: Message>(&mut self) -> Result<Routed<M>>
Block the current worker to wait for a typed message
This function may return a Err(FailedLoadData)
if the
underlying worker was shut down, or Err(Timeout)
if the call
was waiting for longer than the default timeout
.
Use receive_extended()
to use a specific timeout period.
Will return None
if the corresponding worker has been
stopped, or the underlying Node has shut down.
sourcepub async fn receive_extended<M: Message>(
&mut self,
options: MessageReceiveOptions
) -> Result<Routed<M>>
pub async fn receive_extended<M: Message>( &mut self, options: MessageReceiveOptions ) -> Result<Routed<M>>
Wait to receive a typed message
source§impl Context
impl Context
sourcepub async fn send_and_receive<M>(
&self,
route: impl Into<Route>,
msg: impl Message
) -> Result<M>where
M: Message,
pub async fn send_and_receive<M>( &self, route: impl Into<Route>, msg: impl Message ) -> Result<M>where M: Message,
Using a temporary new context, send a message and then receive a message with default timeout and no flow control
This helper function uses new_detached
, send
, and
receive
internally. See their documentation for more
details.
sourcepub async fn send_and_receive_extended<M>(
&self,
route: impl Into<Route>,
msg: impl Message,
options: MessageSendReceiveOptions
) -> Result<Routed<M>>where
M: Message,
pub async fn send_and_receive_extended<M>( &self, route: impl Into<Route>, msg: impl Message, options: MessageSendReceiveOptions ) -> Result<Routed<M>>where M: Message,
Using a temporary new context, send a message and then receive a message
This helper function uses new_detached
, send
, and
receive
internally. See their documentation for more
details.
sourcepub async fn send_to_self<A, M>(&self, from: A, addr: A, msg: M) -> Result<()>where
A: Into<Address>,
M: Message + Send + 'static,
pub async fn send_to_self<A, M>(&self, from: A, addr: A, msg: M) -> Result<()>where A: Into<Address>, M: Message + Send + 'static,
Send a message to another address associated with this worker
This function is a simple wrapper around Self::send()
which
validates the address given to it and will reject invalid
addresses.
sourcepub async fn send<R, M>(&self, route: R, msg: M) -> Result<()>where
R: Into<Route>,
M: Message + Send + 'static,
pub async fn send<R, M>(&self, route: R, msg: M) -> Result<()>where R: Into<Route>, M: Message + Send + 'static,
Send a message to an address or via a fully-qualified route
Routes can be constructed from a set of Address
es, or via
the RouteBuilder
type. Routes can contain middleware
router addresses, which will re-address messages that need to
be handled by specific domain workers.
use ockam_core::Message;
use serde::{Serialize, Deserialize};
#[derive(Message, Serialize, Deserialize)]
struct MyMessage(String);
impl MyMessage {
fn new(s: &str) -> Self {
Self(s.into())
}
}
ctx.send("my-test-worker", MyMessage::new("Hello you there :)")).await?;
Ok(())
sourcepub async fn send_with_local_info<R, M>(
&self,
route: R,
msg: M,
local_info: Vec<LocalInfo>
) -> Result<()>where
R: Into<Route>,
M: Message + Send + 'static,
pub async fn send_with_local_info<R, M>( &self, route: R, msg: M, local_info: Vec<LocalInfo> ) -> Result<()>where R: Into<Route>, M: Message + Send + 'static,
Send a message to an address or via a fully-qualified route
after attaching the given LocalInfo
to the message.
sourcepub async fn send_from_address<R, M>(
&self,
route: R,
msg: M,
sending_address: Address
) -> Result<()>where
R: Into<Route>,
M: Message + Send + 'static,
pub async fn send_from_address<R, M>( &self, route: R, msg: M, sending_address: Address ) -> Result<()>where R: Into<Route>, M: Message + Send + 'static,
Send a message to an address or via a fully-qualified route
Routes can be constructed from a set of Address
es, or via
the RouteBuilder
type. Routes can contain middleware
router addresses, which will re-address messages that need to
be handled by specific domain workers.
This function additionally takes the sending address parameter, to specify which of a worker’s (or processor’s) addresses should be used.
sourcepub async fn forward(&self, local_msg: LocalMessage) -> Result<()>
pub async fn forward(&self, local_msg: LocalMessage) -> Result<()>
Forward a transport message to its next routing destination
Similar to Context::send
, but taking a
TransportMessage
, which contains the full destination
route, and calculated return route for this hop.
Note: you most likely want to use
Context::send
instead, unless you are writing an
external router implementation for ockam node.
sourcepub async fn forward_from_address(
&self,
local_msg: LocalMessage,
sending_address: Address
) -> Result<()>
pub async fn forward_from_address( &self, local_msg: LocalMessage, sending_address: Address ) -> Result<()>
Forward a transport message to its next routing destination
Similar to Context::send
, but taking a
TransportMessage
, which contains the full destination
route, and calculated return route for this hop.
Note: you most likely want to use
Context::send
instead, unless you are writing an
external router implementation for ockam node.
source§impl Context
impl Context
sourcepub async fn stop_now(&self) -> Result<()>
pub async fn stop_now(&self) -> Result<()>
Signal to the local runtime to shut down immediately
WARNING: calling this function may result in data loss.
It is recommended to use the much safer
Context::stop
function instead!
sourcepub async fn stop(&self) -> Result<()>
pub async fn stop(&self) -> Result<()>
Signal to the local runtime to shut down
This call will hang until a safe shutdown has been completed.
The default timeout for a safe shutdown is 1 second. You can
change this behaviour by calling
Context::stop_timeout
directly.
sourcepub async fn stop_timeout(&self, seconds: u8) -> Result<()>
pub async fn stop_timeout(&self, seconds: u8) -> Result<()>
Signal to the local runtime to shut down
This call will hang until a safe shutdown has been completed or the desired timeout has been reached.
source§impl Context
impl Context
sourcepub fn register_transport(&self, transport: Arc<dyn Transport>)
pub fn register_transport(&self, transport: Arc<dyn Transport>)
Return the list of supported transports
sourcepub fn is_transport_registered(&self, transport_type: TransportType) -> bool
pub fn is_transport_registered(&self, transport_type: TransportType) -> bool
Return true if a given transport has already been registered
sourcepub async fn resolve_transport_route(&self, route: Route) -> Result<Route>
pub async fn resolve_transport_route(&self, route: Route) -> Result<Route>
For each address handled by a given transport in a route, for example, (TCP, “127.0.0.1:4000”) Create a worker supporting the routing of messages for this transport and replace the address in the route with the worker address
source§impl Context
impl Context
sourcepub async fn start_worker<W>(
&self,
address: impl Into<Address>,
worker: W
) -> Result<()>where
W: Worker<Context = Context>,
pub async fn start_worker<W>( &self, address: impl Into<Address>, worker: W ) -> Result<()>where W: Worker<Context = Context>,
Start a new worker instance at the given address. Default AccessControl is AllowAll
A worker is an asynchronous piece of code that can send and
receive messages of a specific type. This type is encoded via
the Worker
trait. If your code relies
on a manual run-loop you may want to use
start_processor()
instead!
Each address in the set must be unique and unused on the
current node. Workers must implement the Worker trait and be
thread-safe. Workers run asynchronously and will be scheduled
independently of each other. To wait for the initialisation
of your worker to complete you can use
wait_for()
.
use ockam_core::{Result, Worker, worker};
use ockam_node::Context;
struct MyWorker;
#[worker]
impl Worker for MyWorker {
type Context = Context;
type Message = String;
}
async fn start_my_worker(ctx: &mut Context) -> Result<()> {
ctx.start_worker("my-worker-address", MyWorker).await
}
sourcepub async fn start_worker_with_access_control<W>(
&self,
address: impl Into<Address>,
worker: W,
incoming: impl IncomingAccessControl,
outgoing: impl OutgoingAccessControl
) -> Result<()>where
W: Worker<Context = Context>,
pub async fn start_worker_with_access_control<W>( &self, address: impl Into<Address>, worker: W, incoming: impl IncomingAccessControl, outgoing: impl OutgoingAccessControl ) -> Result<()>where W: Worker<Context = Context>,
Start a new worker instance at the given address
A worker is an asynchronous piece of code that can send and
receive messages of a specific type. This type is encoded via
the Worker
trait. If your code relies
on a manual run-loop you may want to use
start_processor()
instead!
Each address in the set must be unique and unused on the
current node. Workers must implement the Worker trait and be
thread-safe. Workers run asynchronously and will be scheduled
independently of each other. To wait for the initialisation
of your worker to complete you can use
wait_for()
.
use ockam_core::{AllowAll, Result, Worker, worker};
use ockam_node::Context;
struct MyWorker;
#[worker]
impl Worker for MyWorker {
type Context = Context;
type Message = String;
}
async fn start_my_worker(ctx: &mut Context) -> Result<()> {
ctx.start_worker_with_access_control("my-worker-address", MyWorker, AllowAll, AllowAll).await
}
sourcepub async fn start_processor<P>(
&self,
address: impl Into<Address>,
processor: P
) -> Result<()>where
P: Processor<Context = Context>,
pub async fn start_processor<P>( &self, address: impl Into<Address>, processor: P ) -> Result<()>where P: Processor<Context = Context>,
Start a new processor instance at the given address. Default AccessControl is DenyAll
A processor is an asynchronous piece of code that runs a
custom run loop, with access to a worker context to send and
receive messages. If your code is built around responding to
message events, consider using
start_worker()
instead!
sourcepub async fn start_processor_with_access_control<P>(
&self,
address: impl Into<Address>,
processor: P,
incoming: impl IncomingAccessControl,
outgoing: impl OutgoingAccessControl
) -> Result<()>where
P: Processor<Context = Context>,
pub async fn start_processor_with_access_control<P>( &self, address: impl Into<Address>, processor: P, incoming: impl IncomingAccessControl, outgoing: impl OutgoingAccessControl ) -> Result<()>where P: Processor<Context = Context>,
Start a new processor instance at the given address
A processor is an asynchronous piece of code that runs a
custom run loop, with access to a worker context to send and
receive messages. If your code is built around responding to
message events, consider using
start_worker()
instead!