1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
// SPDX-License-Identifier: MIT OR Apache-2.0
use std::net::{Ipv4Addr, Ipv6Addr};
use p2panda_core::SigningKey;
use p2panda_net::addrs::TrustedTransportInfo;
use p2panda_net::discovery::DiscoveryConfig;
use p2panda_net::gossip::GossipConfig;
use p2panda_net::iroh_endpoint::{EndpointAddr, RelayUrl};
use p2panda_net::utils::from_verifying_key;
use p2panda_net::{NetworkId, NodeId};
use p2panda_store::SqliteStore;
use p2panda_store::sqlite::{SqlitePool, SqliteStoreBuilder};
use crate::Node;
use crate::forge::OperationForge;
use crate::network::MdnsDiscoveryMode;
use crate::node::{AckPolicy, Config, SpawnError};
use crate::processor::{Pipeline, TaskTracker};
/// Builder for `Node`.
///
/// To create the `Node` call `NodeBuilder::spawn()`.
#[derive(Default)]
pub struct NodeBuilder {
signing_key: Option<SigningKey>,
config: Config,
store_options: StoreBuilderOptions,
}
impl NodeBuilder {
/// Creates a new `NodeBuilder` using default configuration values.
pub fn new() -> Self {
NodeBuilder {
signing_key: None,
config: Config::default(),
store_options: StoreBuilderOptions::default(),
}
}
/// Sets the signing key.
///
/// The public key derived from the given private key is used to identify the node in the
/// network. For example, this key can be used to directly connect to the node. The private key
/// serves as the means of authenticating the node during the connection handshake (using TLS
/// 1.3) and is also used to sign operations to ensure data integrity and authenticity.
///
/// If left unset, a new key will be randomly generated.
pub fn signing_key(mut self, signing_key: SigningKey) -> Self {
self.signing_key = Some(signing_key);
self
}
/// Defines the database URL to be used by the store.
///
/// The given URL must take the form of a SQLite database URI: <https://sqlite.org/uri.html>.
///
/// Database migrations are run automatically. Users should use `NodeBuilder::database_pool()`
/// together with `sqlx` if manual migration management is required.
///
/// If left unset, the node will default to using an ephemeral in-memory database.
pub fn database_url(mut self, url: &str) -> Self {
self.store_options = StoreBuilderOptions::Url(url.to_string());
self
}
/// Defines the connection pool to be used by the store.
///
/// If left unset, a new connection pool will be created using the default maximum connections
/// value of 16 and the database URL (either the default in-memory URL or the URL set by
/// calling `NodeBuilder::database_url()`).
pub fn database_pool(mut self, pool: SqlitePool) -> Self {
self.store_options = StoreBuilderOptions::Pool(pool);
self
}
/// Defines the acknowledgement policy.
///
/// If left unset, the policy defaults to `Automatic` and all messages emitted from topic
/// streams will be automatically acknowledged.
///
/// See the `Node::stream(topic)` documentation for further information.
pub fn ack_policy(mut self, value: AckPolicy) -> Self {
self.config.ack_policy = value;
self
}
/// Sets the network identifier.
///
/// The network identifier is used to achieve separation and prevent interoperability between
/// distinct networks. This is the most global identifier to group nodes into networks. Different
/// applications may choose to share the same underlying network infrastructure by using the same
/// network identifier.
///
/// **WARNING:** The network identifier is _not_ confidentially exchanged with a remote node and
/// can not be treated as a secret value. See: <https://github.com/p2panda/p2panda/issues/965>
///
/// If left unset, the network ID defaults to the byte representation of the BLAKE3 hash of the
/// string "p2panda".
pub fn network_id(mut self, network_id: NetworkId) -> Self {
self.config.network.network_id = network_id;
self
}
/// Sets a relay server URL to assist in establishing direct connections.
///
/// Multipe relays can be added; a single "home relay" will be automatically selected based on
/// latency.
///
/// Relays fullfil multiple functions:
///
/// 1. The relay server helps establish connections by temporarily routing encrypted traffic
/// until a direct, P2P connection is feasible. This allows nodes to immediately get
/// started, without waiting for holepunching / STUN to complete first.
/// 2. Handle learning a node's public addresses (via QUIC address discovery), signalling and
/// hole-punching to establish direct connections between two nodes. This set of methods is
/// also understood as STUN. After this point the relay is not required anymore.
/// 3. Relayed and encrypted fallback using the server when establishing a direct connection
/// failed (TURN).
///
/// If no relay is given other nodes can only connect to us if a directly-reachable IP address
/// is available and known to them.
pub fn relay_url(mut self, url: RelayUrl) -> Self {
self.config.network.relay_urls.insert(url);
self
}
/// Inserts a bootstrap node into the local address book.
///
/// Bootstrap nodes are used as a starting point for the random-walk discovery algorithm to
/// find other nodes in the network, without the need for any centralised registry. Any node
/// can serve as a bootstrap into the network. The URL of the relay used by the bootstrap node
/// is required to assist with connectivity (via relaying of traffic and negotiation of
/// hole-punching for direct connections).
///
/// Multiple bootstrap nodes can be registered. Each iteration of the discovery algorithm
/// begins by picking a random node from the set of known bootstrap nodes. It's recommended to
/// register several bootstrap nodes, especially if they are not highly-available; this
/// offers redunancy in the case that any of the bootstrap nodes go offline or are otherwise
/// unavailable.
///
/// Consult the documentation of the `p2panda-discovery` crate for further details concerning
/// the discovery protocol.
pub fn bootstrap(mut self, node_id: NodeId, relay_url: RelayUrl) -> Self {
let endpoint_addr =
EndpointAddr::new(from_verifying_key(node_id)).with_relay_url(relay_url);
self.config
.network
.bootstraps
.insert((node_id, TrustedTransportInfo::from(endpoint_addr)));
self
}
/// Sets the mDNS discovery mode.
///
/// mDNS may be set to active, passive or disabled mode.
///
/// If left unset, the mode defaults to active and this node will actively advertise it's
/// endpoint address on the local area network.
pub fn mdns_mode(mut self, mode: MdnsDiscoveryMode) -> Self {
self.config.network.mdns_mode = mode;
self
}
/// Binds an IPv4 socket at the given address.
///
/// If left unset, the address defaults to `0.0.0.0`.
pub fn bind_ip_v4(mut self, ip: Ipv4Addr) -> Self {
self.config.network.iroh.bind_ip_v4 = ip;
self
}
/// Sets the IPv4 address port.
///
/// If left unset, the port defaults to `0` which results in a random free port being chosen.
/// If the given port is already in use, a random port will be chosen as a fallback.
pub fn bind_port_v4(mut self, port: u16) -> Self {
self.config.network.iroh.bind_port_v4 = port;
self
}
/// Binds an IPv6 socket at the given address.
///
/// If left unset, the address defaults to `[::]`.
pub fn bind_ip_v6(mut self, ip: Ipv6Addr) -> Self {
self.config.network.iroh.bind_ip_v6 = ip;
self
}
/// Sets the IPv6 address port.
///
/// If left unset, the port defaults to `0` which results in a random free port being chosen.
/// If the given port is already in use, a random port will be chosen as a fallback.
pub fn bind_port_v6(mut self, port: u16) -> Self {
self.config.network.iroh.bind_port_v6 = port;
self
}
/// Defines custom discovery configuration parameters.
///
/// This allows fine-tuning of the random walk protocol, including the number of walkers and
/// their reset probability.
pub fn discovery_config(mut self, config: DiscoveryConfig) -> Self {
self.config.network.discovery = config;
self
}
/// Defines custom gossip configuration parameters.
///
/// This allows fine-tuning of swarm membership and gossip broadcast parameters, as well as the
/// maximum message size for broadcast. The default maximum message size is 4096 bytes.
pub fn gossip_config(mut self, config: GossipConfig) -> Self {
self.config.network.gossip = config;
self
}
/// Spawns the `Node`.
pub async fn spawn(self) -> Result<Node, SpawnError> {
let signing_key = self.signing_key.unwrap_or_default();
let store = match self.store_options {
StoreBuilderOptions::Memory => SqliteStoreBuilder::new().build().await?,
StoreBuilderOptions::Url(url) => {
SqliteStoreBuilder::new().database_url(&url).build().await?
}
StoreBuilderOptions::Pool(pool) => SqliteStore::from_pool(pool),
};
let forge = OperationForge::from_signing_key(signing_key, store.clone());
let tasks = TaskTracker::new();
let pipeline = Pipeline::new::<SqliteStore>(store.clone(), tasks);
let node = Node::spawn_inner(self.config, store, forge, pipeline).await?;
Ok(node)
}
}
#[derive(Default)]
enum StoreBuilderOptions {
#[default]
Memory,
Url(String),
Pool(SqlitePool),
}