ockam_node/context/worker_lifecycle.rs
1use crate::Context;
2use crate::{ProcessorBuilder, WorkerBuilder};
3use ockam_core::{
4 Address, IncomingAccessControl, OutgoingAccessControl, Processor, Result, Worker,
5};
6
7impl Context {
8 /// Start a new worker instance at the given address. Default AccessControl is AllowAll
9 ///
10 /// A worker is an asynchronous piece of code that can send and
11 /// receive messages of a specific type. This type is encoded via
12 /// the [`Worker`](ockam_core::Worker) trait. If your code relies
13 /// on a manual run-loop you may want to use
14 /// [`start_processor()`](Self::start_processor) instead!
15 ///
16 /// Each address in the set must be unique and unused on the
17 /// current node. Workers must implement the Worker trait and be
18 /// thread-safe. Workers run asynchronously and will be scheduled
19 /// independently of each other. To wait for the initialisation
20 /// of your worker to complete you can use
21 /// [`wait_for()`](Self::wait_for).
22 ///
23 /// ```rust
24 /// use ockam_core::{Result, Worker, worker};
25 /// use ockam_node::Context;
26 ///
27 /// struct MyWorker;
28 ///
29 /// #[worker]
30 /// impl Worker for MyWorker {
31 /// type Context = Context;
32 /// type Message = String;
33 /// }
34 ///
35 /// fn start_my_worker(ctx: &mut Context) -> Result<()> {
36 /// ctx.start_worker("my-worker-address", MyWorker)
37 /// }
38 /// ```
39 ///
40 /// Approximate flow of starting a worker:
41 ///
42 /// 1. StartWorker message -> Router
43 /// 2. First address is considered a primary_addr (main_addr)
44 /// 3. Check if router.map.address_records_map already has primary_addr
45 /// 4. AddressRecord is created and inserted in router.map
46 /// 5. Iterate over metadata:
47 /// Check if it belongs to that record
48 /// Set is_terminal true in router.map.address_metadata_map (if address is terminal)
49 /// Insert attributes one by one
50 /// 6. For each address we insert pair (Address, primary_addr) into router.map.alias_map, including (primary_addr, primary_addr itself)
51 /// 7. WorkerRelay is spawned as a tokio task:
52 /// WorkerRelay calls initialize
53 /// WorkerRelay calls Worker::handle_message for each message until either
54 /// stop signal is received (CtrlSignal::InterruptStop to AddressRecord::ctrl_tx)
55 /// there are no messages coming to that receiver (the sender side is dropped)
56 pub fn start_worker<W>(&self, address: impl Into<Address>, worker: W) -> Result<()>
57 where
58 W: Worker<Context = Context>,
59 {
60 WorkerBuilder::new(worker)
61 .with_address(address)
62 .start(self)?;
63
64 Ok(())
65 }
66
67 /// Start a new worker instance at the given address
68 ///
69 /// A worker is an asynchronous piece of code that can send and
70 /// receive messages of a specific type. This type is encoded via
71 /// the [`Worker`](ockam_core::Worker) trait. If your code relies
72 /// on a manual run-loop you may want to use
73 /// [`start_processor()`](Self::start_processor) instead!
74 ///
75 /// Each address in the set must be unique and unused on the
76 /// current node. Workers must implement the Worker trait and be
77 /// thread-safe. Workers run asynchronously and will be scheduled
78 /// independently of each other.
79 ///
80 /// ```rust
81 /// use ockam_core::{AllowAll, Result, Worker, worker};
82 /// use ockam_node::Context;
83 ///
84 /// struct MyWorker;
85 ///
86 /// #[worker]
87 /// impl Worker for MyWorker {
88 /// type Context = Context;
89 /// type Message = String;
90 /// }
91 ///
92 /// fn start_my_worker(ctx: &mut Context) -> Result<()> {
93 /// ctx.start_worker_with_access_control("my-worker-address", MyWorker, AllowAll, AllowAll)
94 /// }
95 /// ```
96 pub fn start_worker_with_access_control<W>(
97 &self,
98 address: impl Into<Address>,
99 worker: W,
100 incoming: impl IncomingAccessControl,
101 outgoing: impl OutgoingAccessControl,
102 ) -> Result<()>
103 where
104 W: Worker<Context = Context>,
105 {
106 WorkerBuilder::new(worker)
107 .with_address(address)
108 .with_incoming_access_control(incoming)
109 .with_outgoing_access_control(outgoing)
110 .start(self)?;
111
112 Ok(())
113 }
114
115 /// Start a new processor instance at the given address. Default AccessControl is DenyAll
116 ///
117 /// A processor is an asynchronous piece of code that runs a
118 /// custom run loop, with access to a worker context to send and
119 /// receive messages. If your code is built around responding to
120 /// message events, consider using
121 /// [`start_worker()`](Self::start_worker) instead!
122 ///
123 /// Approximate flow of starting a processor:
124 ///
125 /// 1. StartProcessor message -> Router
126 /// 2. First address is considered a primary_addr (main_addr)
127 /// 3. Check if router.map.address_records_map already has primary_addr
128 /// 4. AddressRecord is created and inserted in router.map
129 /// 5. Iterate over metadata:
130 /// Check if it belongs to that record
131 /// Set is_terminal true in router.map.address_metadata_map (if address is terminal)
132 /// Insert attributes one by one
133 /// 6. For each address we insert pair (Address, primary_addr) into router.map.alias_map, including (primary_addr, primary_addr itself)
134 /// 7. ProcessorRelay is spawned as a tokio task:
135 /// ProcessorRelay calls Processor::initialize
136 /// ProcessorRelay calls Processor::process until either false is returned or stop signal is received (CtrlSignal::InterruptStop to AddressRecord::ctrl_tx)
137 pub fn start_processor<P>(&self, address: impl Into<Address>, processor: P) -> Result<()>
138 where
139 P: Processor<Context = Context>,
140 {
141 ProcessorBuilder::new(processor)
142 .with_address(address.into())
143 .start(self)?;
144
145 Ok(())
146 }
147
148 /// Start a new processor instance at the given address
149 ///
150 /// A processor is an asynchronous piece of code that runs a
151 /// custom run loop, with access to a worker context to send and
152 /// receive messages. If your code is built around responding to
153 /// message events, consider using
154 /// [`start_worker()`](Self::start_worker) instead!
155 ///
156 pub fn start_processor_with_access_control<P>(
157 &self,
158 address: impl Into<Address>,
159 processor: P,
160 incoming: impl IncomingAccessControl,
161 outgoing: impl OutgoingAccessControl,
162 ) -> Result<()>
163 where
164 P: Processor<Context = Context>,
165 {
166 ProcessorBuilder::new(processor)
167 .with_address(address)
168 .with_incoming_access_control(incoming)
169 .with_outgoing_access_control(outgoing)
170 .start(self)?;
171
172 Ok(())
173 }
174
175 /// Stop a Worker or a Processor running on given Address
176 pub fn stop_address(&self, address: &Address) -> Result<()> {
177 self.router()?.stop_address(address, false)
178 }
179
180 /// Stop a Worker or a Processor running on the context primary address
181 pub fn stop_primary_address(&self) -> Result<()> {
182 self.stop_address(self.primary_address())
183 }
184}