Struct ockam::Context

source ·
pub struct Context { /* private fields */ }
Expand description

Context contains Node state and references to the runtime.

Implementations§

source§

impl Context

source

pub fn runtime(&self) -> &Handle

Return runtime clone

source

pub fn address(&self) -> Address

Return the primary address of the current worker

source

pub fn addresses(&self) -> Vec<Address>

Return all addresses of the current worker

source

pub fn mailboxes(&self) -> &Mailboxes

Return a reference to the mailboxes of this context

source

pub fn flow_controls(&self) -> &FlowControls

Shared FlowControls instance

source

pub fn tracing_context(&self) -> OpenTelemetryContext

Return the tracing context

source

pub fn protocol_version(&self) -> u8

Return the protocol version of the message being processed

source

pub fn set_tracing_context(&mut self, tracing_context: OpenTelemetryContext)

Set the current tracing context

source

pub fn set_protocol_version(&mut self, protocol_version: u8)

Set the current protocol version

source§

impl Context

source

pub async fn set_cluster<S>(&self, label: S) -> Result<(), Error>
where S: Into<String>,

Assign the current worker to a cluster

A cluster is a set of workers that should be stopped together when the node is stopped or parts of the system are reloaded. This is not to be confused with supervisors!

By adding your worker to a cluster you signal to the runtime that your worker may be depended on by other workers that should be stopped first.

Your cluster name MUST NOT start with _internals. or ockam.!

Clusters are de-allocated in reverse order of their initialisation when the node is stopped.

source

pub async fn list_workers(&self) -> Result<Vec<Address>, Error>

Return a list of all available worker addresses on a node

source

pub async fn wait_for<A>(&self, addr: A) -> Result<(), Error>
where A: Into<Address>,

Wait for a particular address to become “ready”

source

pub async fn find_terminal_address( &self, route: impl Into<Vec<Address>> ) -> Result<Option<AddressAndMetadata>, Error>

Finds the terminal address of a route, if present

source

pub async fn read_metadata( &self, address: impl Into<Address> ) -> Result<Option<AddressMetadata>, Error>

Read metadata for the provided address

source§

impl Context

source

pub async fn new_detached_with_mailboxes( &self, mailboxes: Mailboxes ) -> Result<Context, Error>

TODO basically we can just rename Self::new_detached_impl()

source

pub async fn new_detached( &self, address: impl Into<Address>, incoming: impl IncomingAccessControl, outgoing: impl OutgoingAccessControl ) -> Result<Context, Error>

Create a new detached Context without spawning a full worker

Note: this function is very low-level. For most users start_worker() is the recommended way to create a new worker context.

Approximate flow of starting a detached address:

  1. Create and Spawn AsyncDrop::run
  2. StartWorker message -> Router
  3. First address is considered a primary_addr (main_addr)
  4. Check if router.map.address_records_map already has primary_addr
  5. AddressRecord is created and inserted in router.map
  6. Iterate over metadata: Check if it belongs to that record Set is_terminal true in router.map.address_metadata_map (if address is terminal) Insert attributes one by one
  7. For each address we insert pair (Address, primary_addr) into router.map.alias_map, including (primary_addr, primary_addr itself)

Approximate flow of stopping a detached address:

  1. Context::Drop is called when Context is dropped by rust runtime (according to RAII principle)
  2. async_drop_sender is used to send the Context address
  3. AsyncDrop sends StopWorker message -> Router
  4. Get AddressRecord
  5. router.map.free_address(main_address) is called (given Router state is running): remote main_address from router.map.stopping (it’s not their anyway, unless in was a cluster and node was shutting down) Remove AddressRecord from router.map.address_records_map (return error if not found) Remove all alias in router.map.alias_map Remote all meta from router.map.address_metadata
source§

impl Context

source

pub async fn receive<M>(&mut self) -> Result<Routed<M>, Error>
where M: Message,

Block the current worker to wait for a typed message

This function may return a Err(FailedLoadData) if the underlying worker was shut down, or Err(Timeout) if the call was waiting for longer than the default timeout.

Use receive_extended() to use a specific timeout period.

Will return None if the corresponding worker has been stopped, or the underlying Node has shut down.

source

pub async fn receive_extended<M>( &mut self, options: MessageReceiveOptions ) -> Result<Routed<M>, Error>
where M: Message,

Wait to receive a typed message

source§

impl Context

source

pub async fn register<A>( &self, type_: TransportType, addr: A ) -> Result<(), Error>
where A: Into<Address>,

Register a router for a specific address type

source§

impl Context

source

pub async fn send_and_receive<M>( &self, route: impl Into<Route>, msg: impl Message ) -> Result<M, Error>
where M: Message,

Using a temporary new context, send a message and then receive a message with default timeout and no flow control

This helper function uses new_detached, send, and receive internally. See their documentation for more details.

source

pub async fn send_and_receive_extended<M>( &self, route: impl Into<Route>, msg: impl Message, options: MessageSendReceiveOptions ) -> Result<Routed<M>, Error>
where M: Message,

Using a temporary new context, send a message and then receive a message

This helper function uses new_detached, send, and receive internally. See their documentation for more details.

source

pub async fn send_to_self<A, M>( &self, from: A, addr: A, msg: M ) -> Result<(), Error>
where A: Into<Address>, M: Message + Send + 'static,

Send a message to another address associated with this worker

This function is a simple wrapper around Self::send() which validates the address given to it and will reject invalid addresses.

source

pub async fn send<R, M>(&self, route: R, msg: M) -> Result<(), Error>
where R: Into<Route>, M: Message + Send + 'static,

Send a message to an address or via a fully-qualified route

Routes can be constructed from a set of Addresses, or via the RouteBuilder type. Routes can contain middleware router addresses, which will re-address messages that need to be handled by specific domain workers.

use ockam_core::Message;
use serde::{Serialize, Deserialize};

#[derive(Message, Serialize, Deserialize)]
struct MyMessage(String);

impl MyMessage {
    fn new(s: &str) -> Self {
        Self(s.into())
    }
}

ctx.send("my-test-worker", MyMessage::new("Hello you there :)")).await?;
Ok(())
source

pub async fn send_with_local_info<R, M>( &self, route: R, msg: M, local_info: Vec<LocalInfo> ) -> Result<(), Error>
where R: Into<Route>, M: Message + Send + 'static,

Send a message to an address or via a fully-qualified route after attaching the given LocalInfo to the message.

source

pub async fn send_from_address<R, M>( &self, route: R, msg: M, sending_address: Address ) -> Result<(), Error>
where R: Into<Route>, M: Message + Send + 'static,

Send a message to an address or via a fully-qualified route

Routes can be constructed from a set of Addresses, or via the RouteBuilder type. Routes can contain middleware router addresses, which will re-address messages that need to be handled by specific domain workers.

This function additionally takes the sending address parameter, to specify which of a worker’s (or processor’s) addresses should be used.

source

pub async fn forward(&self, local_msg: LocalMessage) -> Result<(), Error>

Forward a transport message to its next routing destination

Similar to Context::send, but taking a LocalMessage, which contains the full destination route, and calculated return route for this hop.

Note: you most likely want to use Context::send instead, unless you are writing an external router implementation for ockam node.

source

pub async fn forward_from_address( &self, local_msg: LocalMessage, sending_address: Address ) -> Result<(), Error>

Forward a transport message to its next routing destination

Similar to Context::send, but taking a LocalMessage, which contains the full destination route, and calculated return route for this hop.

Note: you most likely want to use Context::send instead, unless you are writing an external router implementation for ockam node.

source§

impl Context

source

pub async fn stop_now(&self) -> Result<(), Error>

Signal to the local runtime to shut down immediately

WARNING: calling this function may result in data loss. It is recommended to use the much safer Context::stop function instead!

source

pub async fn stop(&self) -> Result<(), Error>

Signal to the local runtime to shut down

This call will hang until a safe shutdown has been completed. The default timeout for a safe shutdown is 1 second. You can change this behaviour by calling Context::stop_timeout directly.

source

pub async fn stop_timeout(&self, seconds: u8) -> Result<(), Error>

Signal to the local runtime to shut down

This call will hang until a safe shutdown has been completed or the desired timeout has been reached.

source§

impl Context

source

pub fn register_transport(&self, transport: Arc<dyn Transport>)

Return the list of supported transports

source

pub fn is_transport_registered(&self, transport_type: TransportType) -> bool

Return true if a given transport has already been registered

source

pub async fn resolve_transport_route( &self, route: Route ) -> Result<Route, Error>

For each address handled by a given transport in a route, for example, (TCP, “127.0.0.1:4000”) Create a worker supporting the routing of messages for this transport and replace the address in the route with the worker address

source

pub async fn resolve_transport_route_static( route: Route, transports: HashMap<TransportType, Arc<dyn Transport>> ) -> Result<(Route, Option<Address>), Error>

For each address handled by a given transport in a route, for example, (TCP, “127.0.0.1:4000”) Create a worker supporting the routing of messages for this transport and replace the address in the route with the worker address. Also, returns the connection worker address, if that connection was instantiated in this function.

source§

impl Context

source

pub async fn start_worker<W>( &self, address: impl Into<Address>, worker: W ) -> Result<(), Error>
where W: Worker<Context = Context>,

Start a new worker instance at the given address. Default AccessControl is AllowAll

A worker is an asynchronous piece of code that can send and receive messages of a specific type. This type is encoded via the Worker trait. If your code relies on a manual run-loop you may want to use start_processor() instead!

Each address in the set must be unique and unused on the current node. Workers must implement the Worker trait and be thread-safe. Workers run asynchronously and will be scheduled independently of each other. To wait for the initialisation of your worker to complete you can use wait_for().

use ockam_core::{Result, Worker, worker};
use ockam_node::Context;

struct MyWorker;

#[worker]
impl Worker for MyWorker {
    type Context = Context;
    type Message = String;
}

async fn start_my_worker(ctx: &mut Context) -> Result<()> {
    ctx.start_worker("my-worker-address", MyWorker).await
}

Approximate flow of starting a worker:

  1. StartWorker message -> Router
  2. First address is considered a primary_addr (main_addr)
  3. Check if router.map.address_records_map already has primary_addr
  4. AddressRecord is created and inserted in router.map
  5. Iterate over metadata: Check if it belongs to that record Set is_terminal true in router.map.address_metadata_map (if address is terminal) Insert attributes one by one
  6. For each address we insert pair (Address, primary_addr) into router.map.alias_map, including (primary_addr, primary_addr itself)
  7. WorkerRelay is spawned as a tokio task: WorkerRelay calls initialize WorkerRelay calls Worker::handle_message for each message until either stop signal is received (CtrlSignal::InterruptStop to AddressRecord::ctrl_tx) there are no messages coming to that receiver (the sender side is dropped)
source

pub async fn start_worker_with_access_control<W>( &self, address: impl Into<Address>, worker: W, incoming: impl IncomingAccessControl, outgoing: impl OutgoingAccessControl ) -> Result<(), Error>
where W: Worker<Context = Context>,

Start a new worker instance at the given address

A worker is an asynchronous piece of code that can send and receive messages of a specific type. This type is encoded via the Worker trait. If your code relies on a manual run-loop you may want to use start_processor() instead!

Each address in the set must be unique and unused on the current node. Workers must implement the Worker trait and be thread-safe. Workers run asynchronously and will be scheduled independently of each other. To wait for the initialisation of your worker to complete you can use wait_for().

use ockam_core::{AllowAll, Result, Worker, worker};
use ockam_node::Context;

struct MyWorker;

#[worker]
impl Worker for MyWorker {
    type Context = Context;
    type Message = String;
}

async fn start_my_worker(ctx: &mut Context) -> Result<()> {
    ctx.start_worker_with_access_control("my-worker-address", MyWorker, AllowAll, AllowAll).await
}
source

pub async fn start_processor<P>( &self, address: impl Into<Address>, processor: P ) -> Result<(), Error>
where P: Processor<Context = Context>,

Start a new processor instance at the given address. Default AccessControl is DenyAll

A processor is an asynchronous piece of code that runs a custom run loop, with access to a worker context to send and receive messages. If your code is built around responding to message events, consider using start_worker() instead!

Approximate flow of starting a processor:

  1. StartProcessor message -> Router
  2. First address is considered a primary_addr (main_addr)
  3. Check if router.map.address_records_map already has primary_addr
  4. AddressRecord is created and inserted in router.map
  5. Iterate over metadata: Check if it belongs to that record Set is_terminal true in router.map.address_metadata_map (if address is terminal) Insert attributes one by one
  6. For each address we insert pair (Address, primary_addr) into router.map.alias_map, including (primary_addr, primary_addr itself)
  7. ProcessorRelay is spawned as a tokio task: ProcessorRelay calls Processor::initialize ProcessorRelay calls Processor::process until either false is returned or stop signal is received (CtrlSignal::InterruptStop to AddressRecord::ctrl_tx)
source

pub async fn start_processor_with_access_control<P>( &self, address: impl Into<Address>, processor: P, incoming: impl IncomingAccessControl, outgoing: impl OutgoingAccessControl ) -> Result<(), Error>
where P: Processor<Context = Context>,

Start a new processor instance at the given address

A processor is an asynchronous piece of code that runs a custom run loop, with access to a worker context to send and receive messages. If your code is built around responding to message events, consider using start_worker() instead!

source

pub async fn stop_worker<A>(&self, addr: A) -> Result<(), Error>
where A: Into<Address>,

Shut down a local worker by its primary address

Approximate flow of stopping a worker:

  1. StopWorker message -> Router
  2. Get AddressRecord
  3. Drop sender
  4. WorkerRelay calls Worker::shutdown
  5. StopAck message -> Router (from main_address)
  6. router.map.free_address(main_address) is called (given Router state is running): remote main_address from router.map.stopping (it’s not their anyway, unless in was a cluster and node was shutting down) Remove AddressRecord from router.map.address_records_map (return error if not found) Remove all alias in router.map.alias_map Remote all meta from router.map.address_metadata
source

pub async fn stop_processor<A>(&self, addr: A) -> Result<(), Error>
where A: Into<Address>,

Shut down a local processor by its address

Approximate flow of stopping a processor:

  1. StopProcessor message -> Router
  2. Get AddressRecord
  3. Call AddressRecord::stop: Send CtrlSignal::InterruptStop to AddressRecord::ctrl_tx Set AddressRecord::state = AddressState::Stopping
  4. ProcessorRelay calls Processor::shutdown
  5. StopAck message -> Router (from main_address)
  6. router.map.free_address(main_address) is called (given Router state is running): remote main_address from router.map.stopping (it’s not their anyways unless in was a cluster and node was shutting down) Remove AddressRecord from router.map.address_records_map (return error if not found) Remove all alias in router.map.alias_map Remote all meta from router.map.address_metadata

Trait Implementations§

source§

impl AsyncTryClone for Context

source§

fn async_try_clone<'life0, 'async_trait>( &'life0 self ) -> Pin<Box<dyn Future<Output = Result<Context, Error>> + Send + 'async_trait>>
where 'life0: 'async_trait, Context: 'async_trait,

Try cloning a object and return an Err in case of failure.
source§

impl Debug for Context

source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), Error>

Formats the value using the given formatter. Read more
source§

impl Drop for Context

source§

fn drop(&mut self)

Executes the destructor for this type. Read more

Auto Trait Implementations§

Blanket Implementations§

source§

impl<T> Any for T
where T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for T
where T: ?Sized,

source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
source§

impl<T> From<T> for T

source§

fn from(t: T) -> T

Returns the argument unchanged.

source§

impl<T> FutureExt for T

source§

fn with_context(self, otel_cx: Context) -> WithContext<Self>

Attaches the provided Context to this type, returning a WithContext wrapper. Read more
source§

fn with_current_context(self) -> WithContext<Self>

Attaches the current Context to this type, returning a WithContext wrapper. Read more
source§

impl<T> Instrument for T

source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
source§

impl<T, U> Into<U> for T
where U: From<T>,

source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

source§

impl<T> IntoEither for T

source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
source§

impl<T> Same for T

§

type Output = T

Should always be Self
source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

§

type Error = Infallible

The type returned in the event of a conversion error.
source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

source§

fn vzip(self) -> V

source§

impl<T> WithSubscriber for T

source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more