ockam_node/context/
worker_lifecycle.rs

1use crate::Context;
2use crate::{ProcessorBuilder, WorkerBuilder};
3use ockam_core::{
4    Address, IncomingAccessControl, OutgoingAccessControl, Processor, Result, Worker,
5};
6
7impl Context {
8    /// Start a new worker instance at the given address. Default AccessControl is AllowAll
9    ///
10    /// A worker is an asynchronous piece of code that can send and
11    /// receive messages of a specific type.  This type is encoded via
12    /// the [`Worker`](ockam_core::Worker) trait.  If your code relies
13    /// on a manual run-loop you may want to use
14    /// [`start_processor()`](Self::start_processor) instead!
15    ///
16    /// Each address in the set must be unique and unused on the
17    /// current node.  Workers must implement the Worker trait and be
18    /// thread-safe.  Workers run asynchronously and will be scheduled
19    /// independently of each other.  To wait for the initialisation
20    /// of your worker to complete you can use
21    /// [`wait_for()`](Self::wait_for).
22    ///
23    /// ```rust
24    /// use ockam_core::{Result, Worker, worker};
25    /// use ockam_node::Context;
26    ///
27    /// struct MyWorker;
28    ///
29    /// #[worker]
30    /// impl Worker for MyWorker {
31    ///     type Context = Context;
32    ///     type Message = String;
33    /// }
34    ///
35    /// fn start_my_worker(ctx: &mut Context) -> Result<()> {
36    ///     ctx.start_worker("my-worker-address", MyWorker)
37    /// }
38    /// ```
39    ///
40    /// Approximate flow of starting a worker:
41    ///
42    /// 1. StartWorker message -> Router
43    /// 2. First address is considered a primary_addr (main_addr)
44    /// 3. Check if router.map.address_records_map already has primary_addr
45    /// 4. AddressRecord is created and inserted in router.map
46    /// 5. Iterate over metadata:
47    ///     Check if it belongs to that record
48    ///     Set is_terminal true in router.map.address_metadata_map (if address is terminal)
49    ///     Insert attributes one by one
50    /// 6. For each address we insert pair (Address, primary_addr) into router.map.alias_map, including (primary_addr, primary_addr itself)
51    /// 7. WorkerRelay is spawned as a tokio task:
52    ///     WorkerRelay calls initialize
53    ///     WorkerRelay calls Worker::handle_message for each message until either
54    ///         stop signal is received (CtrlSignal::InterruptStop to AddressRecord::ctrl_tx)
55    ///         there are no messages coming to that receiver (the sender side is dropped)
56    pub fn start_worker<W>(&self, address: impl Into<Address>, worker: W) -> Result<()>
57    where
58        W: Worker<Context = Context>,
59    {
60        WorkerBuilder::new(worker)
61            .with_address(address)
62            .start(self)?;
63
64        Ok(())
65    }
66
67    /// Start a new worker instance at the given address
68    ///
69    /// A worker is an asynchronous piece of code that can send and
70    /// receive messages of a specific type.  This type is encoded via
71    /// the [`Worker`](ockam_core::Worker) trait.  If your code relies
72    /// on a manual run-loop you may want to use
73    /// [`start_processor()`](Self::start_processor) instead!
74    ///
75    /// Each address in the set must be unique and unused on the
76    /// current node.  Workers must implement the Worker trait and be
77    /// thread-safe.  Workers run asynchronously and will be scheduled
78    /// independently of each other.
79    ///
80    /// ```rust
81    /// use ockam_core::{AllowAll, Result, Worker, worker};
82    /// use ockam_node::Context;
83    ///
84    /// struct MyWorker;
85    ///
86    /// #[worker]
87    /// impl Worker for MyWorker {
88    ///     type Context = Context;
89    ///     type Message = String;
90    /// }
91    ///
92    ///  fn start_my_worker(ctx: &mut Context) -> Result<()> {
93    ///     ctx.start_worker_with_access_control("my-worker-address", MyWorker, AllowAll, AllowAll)
94    /// }
95    /// ```
96    pub fn start_worker_with_access_control<W>(
97        &self,
98        address: impl Into<Address>,
99        worker: W,
100        incoming: impl IncomingAccessControl,
101        outgoing: impl OutgoingAccessControl,
102    ) -> Result<()>
103    where
104        W: Worker<Context = Context>,
105    {
106        WorkerBuilder::new(worker)
107            .with_address(address)
108            .with_incoming_access_control(incoming)
109            .with_outgoing_access_control(outgoing)
110            .start(self)?;
111
112        Ok(())
113    }
114
115    /// Start a new processor instance at the given address. Default AccessControl is DenyAll
116    ///
117    /// A processor is an asynchronous piece of code that runs a
118    /// custom run loop, with access to a worker context to send and
119    /// receive messages.  If your code is built around responding to
120    /// message events, consider using
121    /// [`start_worker()`](Self::start_worker) instead!
122    ///
123    /// Approximate flow of starting a processor:
124    ///
125    /// 1. StartProcessor message -> Router
126    /// 2. First address is considered a primary_addr (main_addr)
127    /// 3. Check if router.map.address_records_map already has primary_addr
128    /// 4. AddressRecord is created and inserted in router.map
129    /// 5. Iterate over metadata:
130    ///     Check if it belongs to that record
131    ///     Set is_terminal true in router.map.address_metadata_map (if address is terminal)
132    ///     Insert attributes one by one
133    /// 6. For each address we insert pair (Address, primary_addr) into router.map.alias_map, including (primary_addr, primary_addr itself)
134    /// 7. ProcessorRelay is spawned as a tokio task:
135    ///     ProcessorRelay calls Processor::initialize
136    ///     ProcessorRelay calls Processor::process until either false is returned or stop signal is received (CtrlSignal::InterruptStop to AddressRecord::ctrl_tx)
137    pub fn start_processor<P>(&self, address: impl Into<Address>, processor: P) -> Result<()>
138    where
139        P: Processor<Context = Context>,
140    {
141        ProcessorBuilder::new(processor)
142            .with_address(address.into())
143            .start(self)?;
144
145        Ok(())
146    }
147
148    /// Start a new processor instance at the given address
149    ///
150    /// A processor is an asynchronous piece of code that runs a
151    /// custom run loop, with access to a worker context to send and
152    /// receive messages.  If your code is built around responding to
153    /// message events, consider using
154    /// [`start_worker()`](Self::start_worker) instead!
155    ///
156    pub fn start_processor_with_access_control<P>(
157        &self,
158        address: impl Into<Address>,
159        processor: P,
160        incoming: impl IncomingAccessControl,
161        outgoing: impl OutgoingAccessControl,
162    ) -> Result<()>
163    where
164        P: Processor<Context = Context>,
165    {
166        ProcessorBuilder::new(processor)
167            .with_address(address)
168            .with_incoming_access_control(incoming)
169            .with_outgoing_access_control(outgoing)
170            .start(self)?;
171
172        Ok(())
173    }
174
175    /// Stop a Worker or a Processor running on given Address
176    pub fn stop_address(&self, address: &Address) -> Result<()> {
177        self.router()?.stop_address(address, false)
178    }
179
180    /// Stop a Worker or a Processor running on the context primary address
181    pub fn stop_primary_address(&self) -> Result<()> {
182        self.stop_address(self.primary_address())
183    }
184}