1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
// SPDX-License-Identifier: MIT OR Apache-2.0
use std::fmt::Debug;
use futures_util::Stream;
use p2panda_core::Topic;
use p2panda_net::iroh_endpoint::RelayUrl;
use p2panda_net::{NetworkId, NodeId};
use p2panda_store::sqlite::{SqliteError, SqliteStore, SqliteStoreBuilder};
use serde::{Deserialize, Serialize};
use thiserror::Error;
pub use crate::builder::NodeBuilder;
use crate::forge::{Forge, OperationForge};
use crate::network::{Network, NetworkConfig, NetworkError};
use crate::operation::{Extensions, LogId};
use crate::processor::{Pipeline, TaskTracker};
use crate::streams::{
EphemeralStreamPublisher, EphemeralStreamSubscription, StreamFrom, StreamPublisher,
StreamSubscription, SystemEvent, ephemeral_stream, event_stream, processed_stream,
};
/// Node API with methods to establish ephemeral and eventually consistent topic streams.
#[derive(Debug)]
pub struct Node {
config: Config,
store: SqliteStore,
forge: OperationForge,
// NOTE: One single pipeline is currently used to handle _all_ incoming operations, independent
// of number of streams. While this is sufficient for most applications for now we might want to
// make the number of processors configurable to avoid head-of-line blocking.
pipeline: Pipeline<LogId, Extensions, Topic>,
network: Network,
}
impl Node {
/// Returns the builder for a `Node`.
pub fn builder() -> NodeBuilder {
NodeBuilder::new()
}
/// Spawns a `Node` using default configuration parameters.
///
/// A [`SpawnError`] is returned if spawning is unsuccessful due to a network or store-related
/// failure.
pub async fn spawn() -> Result<Self, SpawnError> {
// Initialises an in-memory SQLite database.
let store = SqliteStoreBuilder::default().build().await?;
// Create a forge with a new internally-generated private key.
let forge = OperationForge::new(store.clone());
// Use default config, this will _not_ include a bootstrap and relay and reduces the
// functionality of p2panda to only work on local-area networks.
let config = Config::default();
// Prepare manager which orchestrates processing of incoming operations.
let tasks = TaskTracker::new();
let pipeline = Pipeline::new::<SqliteStore>(store.clone(), tasks);
let node = Node::spawn_inner(config, store, forge, pipeline).await?;
Ok(node)
}
pub(crate) async fn spawn_inner(
config: Config,
store: SqliteStore,
forge: OperationForge,
pipeline: Pipeline<LogId, Extensions, Topic>,
) -> Result<Self, NetworkError> {
let network = Network::spawn(
config.network.clone(),
forge.signing_key().clone(),
store.clone(),
)
.await?;
Ok(Node {
config,
store,
forge,
pipeline,
network,
})
}
/// Returns a publisher and stateful subscriber for an eventually consistent event stream of
/// messages over the given topic.
///
/// This API is inspired by the principles of "event streaming", combined with eventually
/// consistent "local-first" and causally ordered events.
///
/// ## Event types
///
/// Items emitted from the stream include application messages (delivered on top of p2panda's
/// "operation" append-only log data-type), error and system events, for example about the sync
/// session taking place on the networking layer.
///
/// ## Event processing
///
/// Every operation running through the subscription stream gets processed by an internal "event
/// processing pipeline". This concerns the system-layer, meaning the internal p2panda
/// append-only log `Operation` data-type and internal processors to derive state from these
/// operations. Here we check the log-integrity, prune the log on demand, order operations
/// causally and more.
///
/// After this we're forwarding the message to the application-layer, with a bunch of meta data
/// and debugging info attached.
///
/// Applications usually want to further process the received events from the stream, for
/// example validating the application specific message format to then finally change the state.
///
/// These application messages can be deltas of CRDTs (Conflict-Free Replicated Data-Types) or
/// concrete events, such as "move pawn to E4" in a chess-game. Usually applying these state
/// transitions will lead to a new "materialization" of the application's state which is
/// persisted in the app's database.
///
/// This streaming API has a *at least once* guarantee, meaning that events can occur more than
/// once. Any processing system needs to have an idempotency guarantee or account for tracking
/// processed events.
///
/// Events are automatically acknowledged by default and re-played when not acked on app-start,
/// read further below for more details on the stateful design of stream subscribers, cursors
/// and acknowledgments.
///
/// ```text
/// ┌────────────────────────────────────────────┐
/// │ │ APPLICATION
/// │ User interface │ (example)
/// │ │
/// └───▲────────────────────────────────────┬───┘
/// │ ▼
/// ┌───────┼───────┐ User Action
/// │ │ │
/// │ Database │ │
/// │ │ │
/// └───────────▲───┘ │
/// │ │
/// Acknowledge │ │
/// ┌─────────┐ │ │
/// │ │ │ │
/// │ ┌───┼───────┼───┐ │
/// │ │ │ │
/// │ │ Application │ │
/// │ │ Stream │ │ Command
/// │ │ Processing │ ┌─────▼──────┐
/// │ │ │ │Create Event│
/// │ │ │ └─────┬──────┘
/// │ └───────▲───────┘ │
/// │ │ │
/// │ │ │
/// │ │ rx │ tx
/// │ │ │
/// ────┼─────────────┼────────────────────────────────────┼──────────────────
/// │ │ │ SYSTEM
/// │ ┌───────┼───────┐ │
/// │ │ │ │ Publish
/// │ │ │ ┌────────────────▼─────────────────┐
/// │ │ System │ │Create & sign p2panda operation w.│
/// │ │ Stream │ │"message" payload from application│
/// │ │ Processing │ └────────────────┬─────────────────┘
/// │ │ │ │
/// ┌───▼───┐ │ │ │
/// │ Acked │ │ │ │
/// │ State │ │ │ │
/// └───┬───┘ │ │ │
/// └─────┤ │ │
/// └───────▲───────┘ │
/// │ │
/// │ │
/// │◄───────────────────────────────────┤
/// │ │
/// │ │
/// │ Receive from other nodes │ Publish
/// │ │
/// ┌───┼────────────────────────────────────▼──┐
/// │ │
/// │ p2p network │
/// │ │
/// └───────────────────────────────────────────┘
/// ```
///
/// Locally created operations (via the stream publisher) are processed by the same pipeline. It
/// is possible to await the processing result which can be useful for some applications if they
/// want to block UI components etc.
///
/// ```rust
/// # use p2panda_core::Topic;
/// # #[tokio::main]
/// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
/// # let topic = Topic::random();
/// # let node = p2panda::builder().spawn().await?;
/// #
/// let (tx, _) = node.stream::<String>(topic).await?;
///
/// // Publish a message, internally this creates an "operation" which needs to be processed.
/// let processing = tx.publish("I'm being processed soon!".into()).await?;
///
/// // The hash of the created operation is directly available.
/// let hash = processing.hash();
///
/// // We can optionally await the result of the processor.
/// let result = processing.await?;
/// assert!(result.is_completed());
/// assert!(!result.is_failed());
/// #
/// # Ok(())
/// # }
/// ```
///
/// ## Stateful subscriptions and acknowledgments
///
/// The returned [`StreamSubscription`] is stateful and keeps track of already acknowledged
/// operations by persisting them in the local SQLite database. Operations which have not been
/// acknowledged yet will be automatically re-played when this stream is created again.
///
/// By default all events are automatically acknowledged. Use [`AckPolicy`] to change this
/// behaviour when configuring the node. It is recommended to switch to a manual policy and
/// explicitly acknowledge events _after_ processing them on application-layer was successful
/// (see diagram above). Like this applications can ensure every event is at least processed
/// once, guaranteeing resiliance in the context of application crashes.
///
/// The topic is used to identify each stream's state. It is not recommended to create more than
/// one subscription over the same topic using this high-level method as the acked state will be
/// shared across them, leading to potentially surprising behaviour ("work stealing" processing
/// behaviour across streams and potentially more duplicate events).
///
/// Applications _never_ acknowledge events which only concern system-level state (for example
/// pruning events without a payload, key agreement "control messages" etc.), these are _always_
/// acknowledged automatically after they've been processed successfully, independent of the
/// chosen ack policy.
///
/// ## Crash Resiliance & Re-plays
///
/// Un-acknowledged ("nacked") events are automatically re-played when a stream is created by
/// default. This gives us the "at least once" guarantee, making sure no events get lost, even
/// when facing system crashes or other unexpected exits (for example a user moving a mobile
/// application into the background, interrupting all current processing).
///
/// With the [`Node::stream_from`] method we can further determine the behaviour of re-plays.
/// For example we can begin streaming from a custom "cursor" position on or request to stream
/// _all_ currently known events for this topic from the start. All of these tools allow for
/// different patterns of application state materialization, rolling out breaking changes,
/// updates, etc.
///
/// Please note that this can be a destructive action as it will _replace_ and persist the
/// current acked stream state with the new arguments.
///
/// ## System-level failures
///
/// In most cases application developers will not need to deal with the system-level event
/// processing part. However, in rare cases (bugs, critical failures, etc.) processing an event,
/// re-playing or acknowledging it might have failed.
///
/// Usually these situations are connected to system failure (running out of resource like
/// hard-disc space) or bugs in p2panda. Since failed system-level events are not acknowledged,
/// they will be automatically replayed when the application starts again. If the underlying
/// cause of the error was not fixed by that, then you might want to consult if any patches have
/// been made in p2panda.
pub async fn stream<M>(
&self,
topic: impl Into<Topic>,
) -> Result<(StreamPublisher<M>, StreamSubscription<M>), CreateStreamError>
where
M: Serialize + for<'a> Deserialize<'a> + Send + 'static,
{
self.stream_from(topic, StreamFrom::Frontier).await
}
/// Eventually consistent publish and subscribe stream of messages from a given position.
///
/// Use [`StreamFrom`] to determine the starting position of the subscription stream.
///
/// See [`Node::stream`] for further information.
pub async fn stream_from<M>(
&self,
topic: impl Into<Topic>,
from: StreamFrom,
) -> Result<(StreamPublisher<M>, StreamSubscription<M>), CreateStreamError>
where
M: Serialize + for<'a> Deserialize<'a> + Send + 'static,
{
let live_mode = true;
let topic = topic.into();
let sync_handle = self
.network
.log_sync
.stream(topic, live_mode)
.await
.map_err(|err| CreateStreamError(err.to_string()))?;
let (tx, rx) = processed_stream(
topic,
self.config.ack_policy,
sync_handle,
self.store.clone(),
self.forge.clone(),
self.pipeline.clone(),
from,
)
.await
.map_err(|err| CreateStreamError(err.to_string()))?;
Ok((tx, rx))
}
/// Returns a publisher and subscriber pair for an ephemeral stream of messages over the given
/// topic.
///
/// Messages sent or received on this stream will not be persisted in local storage. Only
/// currently online and reachable nodes will receive published messages.
///
/// Message payloads are signed providing integrity and provenance guarantees, plus making sure
/// each message is unique with the help of a timestamp.
pub async fn ephemeral_stream<M>(
&self,
topic: impl Into<Topic>,
) -> Result<(EphemeralStreamPublisher<M>, EphemeralStreamSubscription<M>), CreateStreamError>
where
M: Serialize + for<'a> Deserialize<'a>,
{
let topic = topic.into();
let handle = self
.network
.gossip
.stream(topic)
.await
.map_err(|err| CreateStreamError(err.to_string()))?;
Ok(ephemeral_stream(topic, self.forge.clone(), handle))
}
/// Returns a stream of system events.
///
/// System events include all network-related events, such as discovery events, which are not
/// associated with a specific topic.
///
/// Any events generated before this method is called will _not_ be emitted. Therefore, it's
/// recommended to call `event_stream()` right after the `Node` is spawned if you wish to
/// observe network behaviour throughout the lifetime of the `Node`.
pub async fn event_stream(
&self,
) -> Result<impl Stream<Item = SystemEvent> + Send + Unpin + 'static, CreateStreamError> {
let discovery_events = self
.network
.discovery
.events()
.await
.map_err(|err| CreateStreamError(err.to_string()))?;
Ok(event_stream(discovery_events))
}
/// Returns the node identifier (public key).
pub fn id(&self) -> NodeId {
self.forge.verifying_key()
}
/// Returns the network identifier being used by the node.
pub fn network_id(&self) -> NetworkId {
self.network.network_id()
}
/// Inserts a bootstrap node into the local address book.
///
/// Bootstrap nodes are used as a starting point for the random-walk discovery algorithm to
/// find other nodes in the network, without the need for any centralised registry. Any node
/// can serve as a bootstrap into the network. The URL of the relay used by the bootstrap node
/// is required to assist with connectivity (via relaying of traffic and negotiation of
/// hole-punching for direct connections).
///
/// Multiple bootstrap nodes can be registered. Each iteration of the discovery algorithm
/// begins by picking a random node from the set of known bootstrap nodes. It's recommended to
/// register several bootstrap nodes, especially if they are not highly-available; this
/// offers redunancy in the case that any of the bootstrap nodes go offline or are otherwise
/// unavailable.
///
/// Consult the documentation of the `p2panda-discovery` crate for further details concerning
/// the discovery protocol.
pub async fn insert_bootstrap(
&self,
node_id: NodeId,
relay_url: RelayUrl,
) -> Result<(), NetworkError> {
self.network.insert_bootstrap(node_id, relay_url).await
}
}
#[cfg(any(test, feature = "test_utils"))]
impl Node {
/// Returns a clone of the underlying store for this `Node`.
// NOTE(adz): This feels like something we would like to have on the regular Node API as well,
// I'll leave it here for now until we've made a decision.
pub fn store(&self) -> SqliteStore {
self.store.clone()
}
}
/// Message acknowledgement policy for eventually-consistent topic streams.
///
/// Every `StreamSubscription` instance is stateful and keeps track of already acknowledged
/// operations by persisting them in the local SQLite database. Specifying a policy defines how and
/// when events are acknowledged.
///
/// Operations which have not been acknowledged yet will be automatically re-played when this stream
/// is created again.
#[derive(Clone, Copy, Default, Debug, PartialEq)]
pub enum AckPolicy {
/// Each individual message must be acknowledged.
Explicit,
/// No manual acknowledgment needed, node assumes acknowledgment on delivery.
#[default]
Automatic,
}
#[derive(Clone, Default, Debug)]
pub(crate) struct Config {
pub ack_policy: AckPolicy,
pub network: NetworkConfig,
}
/// Error occurred when spawning network or store processes.
#[derive(Debug, Error)]
pub enum SpawnError {
#[error(transparent)]
Network(#[from] NetworkError),
#[error(transparent)]
Store(#[from] SqliteError),
}
/// Broken / closed communication channel with the internal actor in `p2panda-net` prevented
/// creation of stream. This can be due to the actor crashing.
///
/// Users may re-attempt creating a new stream in case the actor restarted later.
#[derive(Error, Debug)]
#[error("error occurred in internal actor: {0}")]
pub struct CreateStreamError(pub String);