1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
use crate::identity::models::Identifier;
use crate::identity::storage::Storage;
use crate::identity::{
    secure_channels, Credentials, CredentialsServer, Identities, IdentitiesCreation,
    IdentitiesKeys, IdentitiesRepository, SecureChannel, SecureChannelListener,
    SecureChannelRegistry, SecureChannels, SecureChannelsBuilder,
};
use crate::identity::{Identity, SecureChannelListenerOptions, SecureChannelOptions};
use ockam_core::compat::string::String;
use ockam_core::compat::sync::Arc;
use ockam_core::flow_control::FlowControls;
use ockam_core::{
    Address, AsyncTryClone, IncomingAccessControl, Message, OutgoingAccessControl, Processor,
    Result, Route, Routed, Worker,
};
use ockam_identity::{PurposeKeys, Vault, VaultStorage};
use ockam_node::{Context, HasContext, MessageReceiveOptions, MessageSendReceiveOptions};
use ockam_vault::SigningSecretKeyHandle;

use crate::remote::{RemoteRelay, RemoteRelayInfo, RemoteRelayOptions};
use crate::stream::Stream;
use crate::OckamError;

/// This struct supports all the ockam services for managing identities
/// and creating secure channels
pub struct Node {
    context: Context,
    secure_channels: Arc<SecureChannels>,
}

/// Create a default node (with no persistence)
/// Persistent implementations are available by using a builder.
/// For example, you can use a FileStorage backend to support the node vault.
/// ```rust
/// use std::path::Path;
/// use std::sync::Arc;
/// use ockam::{Node, Result};
/// use ockam_node::Context;
/// use ockam_vault::storage::PersistentStorage;
///
/// async fn make_node(ctx: Context) -> Result<Node> {
///   let node = Node::builder().with_vault_storage(PersistentStorage::create(Path::new("vault")).await?).build(&ctx).await?;
///   Ok(node)
/// }
///
///
/// ```
/// Here is another example where we specify a local LMDB database to store identity attributes
/// ```rust
/// use std::sync::Arc;
/// use ockam::{Node, Result};
/// use ockam::LmdbStorage;
/// use ockam_node::Context;
///
/// async fn make_node(ctx: Context) -> Result<Node> {
///    let lmdb_storage = Arc::new(LmdbStorage::new("identities").await?);
///    let node = Node::builder().with_identities_storage(lmdb_storage).build(&ctx).await?;
///    Ok(node)
/// }
/// ```
pub fn node(ctx: Context) -> Node {
    Node {
        context: ctx,
        secure_channels: secure_channels(),
    }
}

impl Node {
    /// Return the node's [`FlowControls`]
    pub fn flow_controls(&self) -> &FlowControls {
        self.context.flow_controls()
    }

    /// Return the current context
    pub fn context(&self) -> &Context {
        &self.context
    }

    /// Create a new stream
    pub async fn create_stream(&self) -> Result<Stream> {
        Stream::new(self.get_context()).await
    }

    /// Create a new relay
    pub async fn create_relay(
        &self,
        orchestrator_route: impl Into<Route>,
        options: RemoteRelayOptions,
    ) -> Result<RemoteRelayInfo> {
        RemoteRelay::create(self.get_context(), orchestrator_route, options).await
    }

    /// Create a new static relay
    pub async fn create_static_relay(
        &self,
        orchestrator_route: impl Into<Route>,
        alias: impl Into<String>,
        options: RemoteRelayOptions,
    ) -> Result<RemoteRelayInfo> {
        RemoteRelay::create_static(self.get_context(), orchestrator_route, alias, options).await
    }

    /// Create an Identity
    pub async fn create_identity(&self) -> Result<Identifier> {
        Ok(self
            .identities_creation()
            .create_identity()
            .await?
            .identifier()
            .clone())
    }

    /// Create the [`SecureChannel`] [`PurposeKey`]
    pub async fn create_secure_channel_key(&self, identifier: &Identifier) -> Result<()> {
        let _ = self
            .identities()
            .purpose_keys()
            .purpose_keys_creation()
            .create_secure_channel_purpose_key(identifier)
            .await?;

        Ok(())
    }

    /// Import an Identity given its private key and change history
    /// Note: the data is not persisted!
    pub async fn import_private_identity(
        &self,
        identity_change_history: &[u8],
        key: &SigningSecretKeyHandle,
    ) -> Result<Identity> {
        self.identities_creation()
            .import_private_identity(identity_change_history, key)
            .await
    }

    /// Import an Identity given that was exported as a hex-encoded string
    pub async fn import_identity_hex(&self, data: &str) -> Result<Identity> {
        self.identities_creation()
            .import(
                None,
                &hex::decode(data).map_err(|_| OckamError::InvalidHex)?,
            )
            .await
    }

    /// Spawns a SecureChannel listener at given `Address` with given [`SecureChannelListenerOptions`]
    pub async fn create_secure_channel_listener(
        &self,
        identifier: &Identifier,
        address: impl Into<Address>,
        options: impl Into<SecureChannelListenerOptions>,
    ) -> Result<SecureChannelListener> {
        self.secure_channels()
            .create_secure_channel_listener(self.get_context(), identifier, address, options)
            .await
    }

    /// Initiate a SecureChannel using `Route` to the SecureChannel listener and [`SecureChannelOptions`]
    pub async fn create_secure_channel(
        &self,
        identifier: &Identifier,
        route: impl Into<Route>,
        options: impl Into<SecureChannelOptions>,
    ) -> Result<SecureChannel> {
        self.secure_channels()
            .create_secure_channel(self.get_context(), identifier, route, options)
            .await
    }

    /// Start a new worker instance at the given address. Default Access Control is AllowAll
    pub async fn start_worker<W>(&self, address: impl Into<Address>, worker: W) -> Result<()>
    where
        W: Worker<Context = Context>,
    {
        self.context.start_worker(address, worker).await
    }

    /// Start a new worker instance at the given address with given Access Controls
    pub async fn start_worker_with_access_control<W>(
        &self,
        address: impl Into<Address>,
        worker: W,
        incoming: impl IncomingAccessControl,
        outgoing: impl OutgoingAccessControl,
    ) -> Result<()>
    where
        W: Worker<Context = Context>,
    {
        self.context
            .start_worker_with_access_control(address, worker, incoming, outgoing)
            .await
    }

    /// Start a new processor instance at the given address. Default Access Control is DenyAll
    pub async fn start_processor<P>(&self, address: impl Into<Address>, processor: P) -> Result<()>
    where
        P: Processor<Context = Context>,
    {
        self.context.start_processor(address, processor).await
    }

    /// Start a new processor instance at the given address with given Access Controls
    pub async fn start_processor_with_access_control<P>(
        &self,
        address: impl Into<Address>,
        processor: P,
        incoming: impl IncomingAccessControl,
        outgoing: impl OutgoingAccessControl,
    ) -> Result<()>
    where
        P: Processor<Context = Context>,
    {
        self.context
            .start_processor_with_access_control(address, processor, incoming, outgoing)
            .await
    }

    /// Signal to the local runtime to shut down
    pub async fn stop(&mut self) -> Result<()> {
        self.context.stop().await
    }

    /// Send a message to an address or via a fully-qualified route
    pub async fn send<R, M>(&self, route: R, msg: M) -> Result<()>
    where
        R: Into<Route>,
        M: Message + Send + 'static,
    {
        self.context.send(route, msg).await
    }

    /// Send a message to an address or via a fully-qualified route and receive a response
    pub async fn send_and_receive<M>(&self, route: impl Into<Route>, msg: impl Message) -> Result<M>
    where
        M: Message,
    {
        self.context.send_and_receive(route, msg).await
    }

    /// Send a message to an address or via a fully-qualified route and receive a response
    pub async fn send_and_receive_extended<M>(
        &self,
        route: impl Into<Route>,
        msg: impl Message,
        options: MessageSendReceiveOptions,
    ) -> Result<Routed<M>>
    where
        M: Message,
    {
        self.context
            .send_and_receive_extended(route, msg, options)
            .await
    }

    /// Send a message to an address or via a fully-qualified route and receive a response
    pub async fn receive<M: Message>(&mut self) -> Result<Routed<M>> {
        self.context.receive::<M>().await
    }

    /// Send a message to an address or via a fully-qualified route and receive a response
    pub async fn receive_extended<M: Message>(
        &mut self,
        options: MessageReceiveOptions,
    ) -> Result<Routed<M>>
    where
        M: Message,
    {
        self.context.receive_extended(options).await
    }

    /// Return secure channel services
    pub fn secure_channels(&self) -> Arc<SecureChannels> {
        self.secure_channels.clone()
    }

    /// Return services to manage identities
    pub fn identities(&self) -> Arc<Identities> {
        self.secure_channels.identities()
    }

    /// Return services to create and import identities
    pub fn identities_creation(&self) -> Arc<IdentitiesCreation> {
        self.secure_channels.identities().identities_creation()
    }

    /// Return services to manage identities keys
    pub fn identities_keys(&self) -> Arc<IdentitiesKeys> {
        self.secure_channels.identities().identities_keys()
    }

    /// Return services to manage credentials
    pub fn credentials(&self) -> Arc<Credentials> {
        self.secure_channels.identities().credentials()
    }

    /// Return the [`Vault`]
    pub fn vault(&self) -> Vault {
        self.secure_channels.vault()
    }

    /// Return the vault used by secure channels
    pub fn purpose_keys(&self) -> Arc<PurposeKeys> {
        self.secure_channels.identities().purpose_keys()
    }

    /// Return services to serve credentials
    pub fn credentials_server(&self) -> Arc<dyn CredentialsServer> {
        self.secure_channels.identities().credentials_server()
    }

    /// Return the repository used to store identities data
    pub fn identities_repository(&self) -> Arc<dyn IdentitiesRepository> {
        self.secure_channels.identities().repository()
    }

    /// Return a new builder for top-level services
    pub fn builder() -> NodeBuilder {
        NodeBuilder::new()
    }
}

/// This trait can be used to integrate transports into a node
impl HasContext for Node {
    /// Return a context
    fn get_context(&self) -> &Context {
        self.context()
    }
}

/// Builder for top level services
/// It merely encapsulates a secure channel builder for now
#[derive(Clone)]
pub struct NodeBuilder {
    builder: SecureChannelsBuilder,
}

impl NodeBuilder {
    fn new() -> Self {
        Self {
            builder: SecureChannels::builder(),
        }
    }

    /// Set [`Vault`]
    pub fn with_vault(mut self, vault: Vault) -> Self {
        self.builder = self.builder.with_vault(vault);
        self
    }

    /// With Software Vault with given Storage
    pub fn with_vault_storage(mut self, storage: VaultStorage) -> Self {
        self.builder = self.builder.with_vault_storage(storage);
        self
    }

    /// Set a specific storage for identities
    pub fn with_identities_storage(mut self, storage: Arc<dyn Storage>) -> Self {
        self.builder = self.builder.with_identities_storage(storage);
        self
    }

    /// Set a specific identities repository
    pub fn with_identities_repository(mut self, repository: Arc<dyn IdentitiesRepository>) -> Self {
        self.builder = self.builder.with_identities_repository(repository);
        self
    }

    /// Set a specific secure channels registry
    pub fn with_secure_channels_registry(mut self, registry: SecureChannelRegistry) -> Self {
        self.builder = self.builder.with_secure_channels_registry(registry);
        self
    }

    /// Build top level services
    pub async fn build(self, ctx: &Context) -> Result<Node> {
        Ok(Node {
            context: ctx.async_try_clone().await?,
            secure_channels: self.builder.build(),
        })
    }
}