p2panda 0.6.1

Out-of-the-box p2panda Node API for application developers
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
// SPDX-License-Identifier: MIT OR Apache-2.0

//! Simple, collaborative todo list command line application.
//!
//! ## Usage
//!
//! ```text
//! # Start a new todo list, a random id will be generated
//! cargo run --example todo
//!
//! # Join an existing todo list by entering the id
//! cargo run --example todo -- <id>
//!
//! # Type /create <description> to create a new todo list item
//! /create Do laundry
//!
//! # Type /update <id> <description> to update an existing item
//! /update 34af Make a salad
//!
//! # Type /delete <id> to remove an existing item
//! /delete 34af
//!
//! # Print current todo list
//! /show
//! ```
//!
//! ## How does this work?
//!
//! This is an example of how to express a [CRDT], such as an LWW (Last-Write-Wins) and 2P-Set
//! (Two-Phase Set) for deletions on top of the `p2panda` Node API.
//!
//! This is a basic example of an "event sourcing" approach: We are creating "events" triggered by
//! "commands" (see `create`, `update` and `delete` methods) which are then processed (see `process`
//! method). Every processed event changes our internal state, this is also called
//! "materialisation".
//!
//! ```plain
//! [Command: "Update item"] ..
//!     |                                      |       [Event Processor]
//!     v                                      |
//!  [Event: "Update"] -- [Event: "Create"] => | Database w. materialised state:
//!                                            |
//!                                            | {
//!                                            |    <id>: description", ...
//!                                            | }
//! ```
//!
//! We are handling both our own, locally created events and events from remote nodes through the
//! same event processor.
//!
//! ## Can I use this over the Internet?
//!
//! This example only works over LAN, you can consult the `NodeBuilder` API documentation to extend
//! this code with a bootstrap and relay argument, which will allow you to then connect to nodes and
//! sync with them over the Internet.
//!
//! [CRDT]: https://en.wikipedia.org/wiki/Conflict-free_replicated_data_type
use std::collections::HashSet;
use std::str::FromStr;

use futures_util::StreamExt;
use p2panda::streams::StreamEvent;
use p2panda_core::{Hash, Timestamp, Topic};
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc;

type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;

type TodoItemId = Hash;

type TodoListId = Topic;

/// Todo list message type which is replicated across nodes in the p2p network.
///
/// Every event describes a change `kind` to a specific todo list item, addressed by the `id` field.
#[derive(Debug, Serialize, Deserialize)]
struct TodoEvent {
    id: TodoItemId,
    kind: TodoEventKind,
}

/// Changes we are applying to a todo list item.
#[derive(Debug, Serialize, Deserialize)]
enum TodoEventKind {
    /// Sets the description of an todo list item.
    ///
    /// If no item exists yet, it will be created, otherwise updated.
    Set { description: String },

    /// Delete todo list item.
    ///
    /// This "tombstones" the item internally.
    Delete,
}

/// Todo list item.
///
/// Representing the materialised state of the application in memory.
#[derive(Clone, Debug)]
struct TodoItem {
    id: TodoItemId,
    description: String,
    timestamp: Timestamp,
}

impl PartialEq for TodoItem {
    fn eq(&self, other: &Self) -> bool {
        // Compare only over id, so when inserted into `HashSet` we can replace existing items by
        // id. We need to implement the same for `Eq` and `Hash`.
        self.id == other.id
    }
}

impl Eq for TodoItem {}

impl std::hash::Hash for TodoItem {
    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
        self.id.hash(state);
    }
}

/// Todo list with items inside.
///
/// Representing the materialised state of the application in memory.
///
/// The 2P-Set (Two-Phase Set) is used to mark "deleted" items in a second set named `tombstoned`.
/// The difference between the two `items` and `tombstoned` sets is the 2P-set CRDT state with
/// "remove-wins" semantics.
struct TodoList {
    id: TodoListId,
    items: HashSet<TodoItem>,
    tombstoned: HashSet<TodoItemId>,
}

impl TodoList {
    pub fn new() -> Self {
        Self::from_id(TodoListId::random())
    }

    pub fn from_id(id: TodoListId) -> Self {
        Self {
            id,
            items: HashSet::new(),
            tombstoned: HashSet::new(),
        }
    }

    pub fn id(&self) -> TodoListId {
        self.id
    }

    pub fn is_empty(&self) -> bool {
        self.items
            .iter()
            .filter(|item| !self.tombstoned.contains(&item.id))
            .count()
            == 0
    }

    pub fn items(&self) -> Vec<&TodoItem> {
        self.items
            .iter()
            .filter(|item| !self.tombstoned.contains(&item.id))
            .collect()
    }

    pub fn find_item_id(&self, prefix: &str) -> Option<TodoItemId> {
        self.items
            .iter()
            .find(|item| {
                !self.tombstoned.contains(&item.id) && item.id.to_hex().starts_with(prefix)
            })
            .map(|item| item.id)
    }

    pub fn create(&mut self, description: &str) -> TodoEvent {
        TodoEvent {
            id: Topic::random().into(),
            kind: TodoEventKind::Set {
                description: description.into(),
            },
        }
    }

    pub fn update(&mut self, id: TodoItemId, description: &str) -> Result<TodoEvent> {
        let Some(item) = self.items.iter().find(|item| item.id == id) else {
            return Err(format!("unknown item with id {id}").into());
        };

        Ok(TodoEvent {
            id: item.id,
            kind: TodoEventKind::Set {
                description: description.into(),
            },
        })
    }

    pub fn delete(&mut self, id: TodoItemId) -> Result<TodoEvent> {
        let Some(item) = self.items.iter().find(|item| item.id == id) else {
            return Err(format!("unknown item with id {id}").into());
        };

        Ok(TodoEvent {
            id: item.id,
            kind: TodoEventKind::Delete,
        })
    }

    pub fn process(&mut self, event: &TodoEvent, timestamp: Timestamp) {
        let item = self.items.iter().find(|item| item.id == event.id).cloned();

        // Ignore event if it is older than our latest write ("last-write wins") or if the addressed
        // item was already tombstoned ("remove-wins").
        if let Some(ref item) = item
            && (item.timestamp > timestamp || self.tombstoned.contains(&item.id))
        {
            return;
        }

        match &event.kind {
            TodoEventKind::Set { description } => {
                println!(
                    "{} todo item with id {}",
                    if item.is_none() { "created" } else { "updated" },
                    event.id,
                );

                // We've checked via the timestamp before that any event here is "later" than the
                // current one, so we can simply replace what was in the state before with the
                // "later" version.
                self.items.replace(TodoItem {
                    id: event.id,
                    description: description.clone(),
                    timestamp,
                });
            }
            TodoEventKind::Delete => {
                println!("➭ deleted todo item with id {}", event.id);

                // Remove item if it exists in our state.
                if let Some(item) = item {
                    self.items.remove(&item);
                }

                // Add item to "removed" set for our 2P-Set (Two-Phase Set) CRDT.
                self.tombstoned.insert(event.id);
            }
        }
    }
}

#[tokio::main]
async fn main() -> Result<()> {
    // Pass in todo list id as an argument to find other nodes interested in the same list. If not
    // set, we are generating a new, random identifier and print it.
    //
    // Usage:
    //
    // ```bash
    // cargo run --example todo -- <todo_list_id>
    // ```
    let args: Vec<String> = std::env::args().collect();

    let mut todo_list = if args.len() > 1 {
        let id = TodoListId::from_str(&args[1])
            .map_err(|err| format!("passed invalid todo list id as argument: {err}"))?;
        TodoList::from_id(id)
    } else {
        TodoList::new()
    };

    // Spawn a p2panda node where all state is persisted in memory. Since we're not adding any
    // bootstrap node and relay server we can't connect over the internet. This example only works
    // on the LAN.
    //
    // Check out our `NodeBuilder` documentation if you want to learn how to add a bootstrap node
    // and relay.
    let node = p2panda::spawn().await?;

    println!("TODO");
    println!("⎯⎯⎯⎯⎯");
    println!("★ todo list id: {}", todo_list.id());
    println!("★ my node id: {}", node.id());
    println!("⎯⎯⎯⎯⎯\n");

    // Establish a publish/subscribe topic stream which will help us to find nodes who are also
    // interested in the same todo list. We will automatically sync all `TodoEvent` messages with
    // these nodes so we can process them.
    let (tx, mut rx) = node.stream::<TodoEvent>(todo_list.id()).await?;

    let (line_tx, mut line_rx) = mpsc::channel(1);
    std::thread::spawn(move || input_loop(line_tx));

    loop {
        tokio::select! {
            biased;

            // Parse user input via stdin. These inputs trigger our "commands" which again will
            // create and publish single events into the topic stream via `tx`.
            Some(input) = line_rx.recv() => {
                // Create a new todo list item.
                //
                // ```text
                // /create Do laundry
                // ```
                if let Some(description) = input.strip_prefix("/create") {
                    let event = todo_list.create(description.trim());
                    tx.publish(event).await?;
                }

                // Update an existing todo list item.
                //
                // ```text
                // /update be2a Make a salad
                // ```
                if let Some(value) = input.strip_prefix("/update") {
                    let mut parts = value.split_whitespace();

                    let Some(hash_str) = parts.next() else {
                        println!("✖ err: missing todo item id");
                        continue;
                    };

                    let Some(item_id) = todo_list.find_item_id(hash_str.trim()) else {
                        println!("✖ err: unknown todo item id");
                        continue;
                    };

                    let Some(description) = parts.next() else {
                        println!("✖ err: missing todo item description");
                        continue;
                    };

                    let mut description = description.to_string();
                    while let Some(remainder) = parts.next() {
                        description.push_str(" ");
                        description.push_str(remainder);
                    }

                    match todo_list.update(item_id, description.trim()) {
                        Ok(event) => {
                            tx.publish(event).await?;
                        }
                        Err(err) => {
                            println!("err: {}", err);
                        }
                    }
                }

                // Delete an existing todo list item.
                //
                // ```text
                // /delete be2a
                // ```
                if let Some(value) = input.strip_prefix("/delete") {
                    let mut parts = value.split_whitespace();

                    let Some(hash_str) = parts.next() else {
                        println!("✖ err: missing todo item id");
                        continue;
                    };

                    let Some(item_id) = todo_list.find_item_id(hash_str.trim()) else {
                        println!("✖ err: unknown todo item id");
                        continue;
                    };

                    match todo_list.delete(item_id) {
                        Ok(event) => {
                            tx.publish(event).await?;
                        }
                        Err(err) => {
                            println!("✖ err: {err}");
                        }
                    }
                }

                // Print current todo list state.
                //
                // ```text
                // /show
                // ```
                if input.strip_prefix("/show").is_some() {
                    println!("⎯⎯⎯⎯⎯");
                    println!("TODO LIST: {}", todo_list.id());

                    if todo_list.is_empty() {
                        println!(".. no items yet ..");
                    } else {
                        println!("⎯⎯⎯⎯⎯");
                        for item in todo_list.items() {
                            let short_hex = item.id.to_hex()[0..4].to_string();
                            println!("◆ [{}]: {}", short_hex, item.description);
                        }
                    }

                    println!("⎯⎯⎯⎯⎯");
                }
            }

            // We handle all todo list events through the same processor. This includes a) events
            // received from remote nodes and b) our own, locally created events.
            Some(ref event) = rx.next() => {
                if let StreamEvent::SyncStarted { remote_node_id, incoming_bytes, .. } = event {
                    println!("∇ start sync with node {remote_node_id}, downloading {incoming_bytes} bytes");
                }

                if let StreamEvent::Processed { operation, .. } = event {
                    todo_list.process(operation.message(), operation.timestamp().into());
                }
            }
        }
    }
}

fn input_loop(line_tx: mpsc::Sender<String>) -> Result<()> {
    let mut buffer = String::new();
    let stdin = std::io::stdin();
    loop {
        stdin.read_line(&mut buffer)?;
        line_tx.blocking_send(buffer.clone())?;
        buffer.clear();
    }
}