ockam_node/
worker_builder.rs

1use crate::{debugger, ContextMode, WorkerShutdownPriority};
2use crate::{relay::WorkerRelay, Context};
3use ockam_core::compat::string::String;
4use ockam_core::compat::sync::Arc;
5use ockam_core::{
6    Address, AddressMetadata, AllowAll, IncomingAccessControl, Mailbox, Mailboxes,
7    OutgoingAccessControl, Result, Worker,
8};
9
10/// Start a [`Worker`] with a custom configuration
11///
12/// Varying use-cases should use the builder API to customise the
13/// underlying worker that is created.
14pub struct WorkerBuilder<W>
15where
16    W: Worker<Context = Context>,
17{
18    worker: W,
19}
20
21impl<W> WorkerBuilder<W>
22where
23    W: Worker<Context = Context>,
24{
25    /// Create a new builder for a given Worker. Default AccessControl is AllowAll
26    pub fn new(worker: W) -> Self {
27        Self { worker }
28    }
29}
30
31impl<W> WorkerBuilder<W>
32where
33    W: Worker<Context = Context>,
34{
35    /// Worker with only one [`Address`]
36    pub fn with_address(self, address: impl Into<Address>) -> WorkerBuilderOneAddress<W> {
37        self.with_address_and_metadata_impl(address, None)
38    }
39
40    /// Worker with single terminal [`Address`]
41    pub fn with_terminal_address(self, address: impl Into<Address>) -> WorkerBuilderOneAddress<W> {
42        self.with_address_and_metadata(
43            address,
44            AddressMetadata {
45                is_terminal: true,
46                attributes: vec![],
47            },
48        )
49    }
50
51    /// Worker with single terminal [`Address`] and metadata
52    pub fn with_address_and_metadata(
53        self,
54        address: impl Into<Address>,
55        metadata: AddressMetadata,
56    ) -> WorkerBuilderOneAddress<W> {
57        self.with_address_and_metadata_impl(address, Some(metadata))
58    }
59
60    /// Worker with single terminal [`Address`] and metadata
61    pub fn with_address_and_metadata_impl(
62        self,
63        address: impl Into<Address>,
64        metadata: Option<AddressMetadata>,
65    ) -> WorkerBuilderOneAddress<W> {
66        WorkerBuilderOneAddress {
67            incoming_ac: Arc::new(AllowAll),
68            outgoing_ac: Arc::new(AllowAll),
69            worker: self.worker,
70            address: address.into(),
71            metadata,
72            shutdown_priority: Default::default(),
73        }
74    }
75
76    /// Worker with multiple [`Address`]es
77    pub fn with_mailboxes(self, mailboxes: Mailboxes) -> WorkerBuilderMultipleAddresses<W> {
78        WorkerBuilderMultipleAddresses {
79            mailboxes,
80            shutdown_priority: Default::default(),
81            worker: self.worker,
82        }
83    }
84}
85
86pub struct WorkerBuilderMultipleAddresses<W>
87where
88    W: Worker<Context = Context>,
89{
90    mailboxes: Mailboxes,
91    shutdown_priority: WorkerShutdownPriority,
92    worker: W,
93}
94
95impl<W> WorkerBuilderMultipleAddresses<W>
96where
97    W: Worker<Context = Context>,
98{
99    /// Consume this builder and start a new Ockam [`Worker`] from the given context
100    pub fn start(self, context: &Context) -> Result<()> {
101        start(context, self.mailboxes, self.shutdown_priority, self.worker)
102    }
103
104    pub fn with_shutdown_priority(mut self, shutdown_priority: WorkerShutdownPriority) -> Self {
105        self.shutdown_priority = shutdown_priority;
106        self
107    }
108}
109
110pub struct WorkerBuilderOneAddress<W>
111where
112    W: Worker<Context = Context>,
113{
114    incoming_ac: Arc<dyn IncomingAccessControl>,
115    outgoing_ac: Arc<dyn OutgoingAccessControl>,
116    address: Address,
117    worker: W,
118    metadata: Option<AddressMetadata>,
119    shutdown_priority: WorkerShutdownPriority,
120}
121
122impl<W> WorkerBuilderOneAddress<W>
123where
124    W: Worker<Context = Context>,
125{
126    /// Mark the provided address as terminal
127    pub fn terminal(mut self) -> Self {
128        self.metadata
129            .get_or_insert(AddressMetadata {
130                is_terminal: false,
131                attributes: vec![],
132            })
133            .is_terminal = true;
134        self
135    }
136
137    /// Adds metadata attribute for the provided address
138    pub fn with_metadata_attribute(
139        mut self,
140        key: impl Into<String>,
141        value: impl Into<String>,
142    ) -> Self {
143        self.metadata
144            .get_or_insert(AddressMetadata {
145                is_terminal: false,
146                attributes: vec![],
147            })
148            .attributes
149            .push((key.into(), value.into()));
150
151        self
152    }
153
154    pub fn with_shutdown_priority(mut self, shutdown_priority: WorkerShutdownPriority) -> Self {
155        self.shutdown_priority = shutdown_priority;
156        self
157    }
158
159    /// Consume this builder and start a new Ockam [`Worker`] from the given context
160    pub fn start(self, context: &Context) -> Result<()> {
161        start(
162            context,
163            Mailboxes::new(
164                Mailbox::new(
165                    self.address,
166                    self.metadata,
167                    self.incoming_ac,
168                    self.outgoing_ac,
169                ),
170                vec![],
171            ),
172            self.shutdown_priority,
173            self.worker,
174        )
175    }
176}
177
178impl<W> WorkerBuilderOneAddress<W>
179where
180    W: Worker<Context = Context>,
181{
182    /// Set [`IncomingAccessControl`]
183    pub fn with_incoming_access_control(
184        mut self,
185        incoming_access_control: impl IncomingAccessControl,
186    ) -> Self {
187        self.incoming_ac = Arc::new(incoming_access_control);
188        self
189    }
190
191    /// Set [`IncomingAccessControl`]
192    pub fn with_incoming_access_control_arc(
193        mut self,
194        incoming_access_control: Arc<dyn IncomingAccessControl>,
195    ) -> Self {
196        self.incoming_ac = incoming_access_control.clone();
197        self
198    }
199
200    /// Set [`OutgoingAccessControl`]
201    pub fn with_outgoing_access_control(
202        mut self,
203        outgoing_access_control: impl OutgoingAccessControl,
204    ) -> Self {
205        self.outgoing_ac = Arc::new(outgoing_access_control);
206        self
207    }
208
209    /// Set [`OutgoingAccessControl`]
210    pub fn with_outgoing_access_control_arc(
211        mut self,
212        outgoing_access_control: Arc<dyn OutgoingAccessControl>,
213    ) -> Self {
214        self.outgoing_ac = outgoing_access_control.clone();
215        self
216    }
217}
218
219/// Consume this builder and start a new Ockam [`Worker`] from the given context
220fn start<W>(
221    context: &Context,
222    mailboxes: Mailboxes,
223    shutdown_priority: WorkerShutdownPriority,
224    worker: W,
225) -> Result<()>
226where
227    W: Worker<Context = Context>,
228{
229    debug!(
230        "Initializing ockam worker '{}' with access control in:{:?} out:{:?}",
231        mailboxes.primary_address(),
232        mailboxes.primary_mailbox().incoming_access_control(),
233        mailboxes.primary_mailbox().outgoing_access_control(),
234    );
235
236    // Pass it to the context
237    let (ctx, sender, ctrl_rx) = context.new_with_mailboxes(mailboxes, ContextMode::Attached);
238
239    debugger::log_inherit_context("WORKER", context, &ctx);
240
241    let router = context.router()?;
242    router.add_worker(
243        ctx.mailboxes(),
244        sender,
245        false,
246        shutdown_priority,
247        context.mailbox_count(),
248    )?;
249
250    // Then initialise the worker message relay
251    WorkerRelay::init(context.runtime(), worker, ctx, ctrl_rx);
252
253    Ok(())
254}