use std::collections::HashSet;
use std::str::FromStr;
use futures_util::StreamExt;
use p2panda::streams::StreamEvent;
use p2panda_core::{Hash, Timestamp, Topic};
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc;
type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
type TodoItemId = Hash;
type TodoListId = Topic;
#[derive(Debug, Serialize, Deserialize)]
struct TodoEvent {
id: TodoItemId,
kind: TodoEventKind,
}
#[derive(Debug, Serialize, Deserialize)]
enum TodoEventKind {
Set { description: String },
Delete,
}
#[derive(Clone, Debug)]
struct TodoItem {
id: TodoItemId,
description: String,
timestamp: Timestamp,
}
impl PartialEq for TodoItem {
fn eq(&self, other: &Self) -> bool {
self.id == other.id
}
}
impl Eq for TodoItem {}
impl std::hash::Hash for TodoItem {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.id.hash(state);
}
}
struct TodoList {
id: TodoListId,
items: HashSet<TodoItem>,
tombstoned: HashSet<TodoItemId>,
}
impl TodoList {
pub fn new() -> Self {
Self::from_id(TodoListId::random())
}
pub fn from_id(id: TodoListId) -> Self {
Self {
id,
items: HashSet::new(),
tombstoned: HashSet::new(),
}
}
pub fn id(&self) -> TodoListId {
self.id
}
pub fn is_empty(&self) -> bool {
self.items
.iter()
.filter(|item| !self.tombstoned.contains(&item.id))
.count()
== 0
}
pub fn items(&self) -> Vec<&TodoItem> {
self.items
.iter()
.filter(|item| !self.tombstoned.contains(&item.id))
.collect()
}
pub fn find_item_id(&self, prefix: &str) -> Option<TodoItemId> {
self.items
.iter()
.find(|item| {
!self.tombstoned.contains(&item.id) && item.id.to_hex().starts_with(prefix)
})
.map(|item| item.id)
}
pub fn create(&mut self, description: &str) -> TodoEvent {
TodoEvent {
id: Topic::random().into(),
kind: TodoEventKind::Set {
description: description.into(),
},
}
}
pub fn update(&mut self, id: TodoItemId, description: &str) -> Result<TodoEvent> {
let Some(item) = self.items.iter().find(|item| item.id == id) else {
return Err(format!("unknown item with id {id}").into());
};
Ok(TodoEvent {
id: item.id,
kind: TodoEventKind::Set {
description: description.into(),
},
})
}
pub fn delete(&mut self, id: TodoItemId) -> Result<TodoEvent> {
let Some(item) = self.items.iter().find(|item| item.id == id) else {
return Err(format!("unknown item with id {id}").into());
};
Ok(TodoEvent {
id: item.id,
kind: TodoEventKind::Delete,
})
}
pub fn process(&mut self, event: &TodoEvent, timestamp: Timestamp) {
let item = self.items.iter().find(|item| item.id == event.id).cloned();
if let Some(ref item) = item
&& (item.timestamp > timestamp || self.tombstoned.contains(&item.id))
{
return;
}
match &event.kind {
TodoEventKind::Set { description } => {
println!(
"➭ {} todo item with id {}",
if item.is_none() { "created" } else { "updated" },
event.id,
);
self.items.replace(TodoItem {
id: event.id,
description: description.clone(),
timestamp,
});
}
TodoEventKind::Delete => {
println!("➭ deleted todo item with id {}", event.id);
if let Some(item) = item {
self.items.remove(&item);
}
self.tombstoned.insert(event.id);
}
}
}
}
#[tokio::main]
async fn main() -> Result<()> {
let args: Vec<String> = std::env::args().collect();
let mut todo_list = if args.len() > 1 {
let id = TodoListId::from_str(&args[1])
.map_err(|err| format!("passed invalid todo list id as argument: {err}"))?;
TodoList::from_id(id)
} else {
TodoList::new()
};
let node = p2panda::spawn().await?;
println!("TODO");
println!("⎯⎯⎯⎯⎯");
println!("★ todo list id: {}", todo_list.id());
println!("★ my node id: {}", node.id());
println!("⎯⎯⎯⎯⎯\n");
let (tx, mut rx) = node.stream::<TodoEvent>(todo_list.id()).await?;
let (line_tx, mut line_rx) = mpsc::channel(1);
std::thread::spawn(move || input_loop(line_tx));
loop {
tokio::select! {
biased;
Some(input) = line_rx.recv() => {
if let Some(description) = input.strip_prefix("/create") {
let event = todo_list.create(description.trim());
tx.publish(event).await?;
}
if let Some(value) = input.strip_prefix("/update") {
let mut parts = value.split_whitespace();
let Some(hash_str) = parts.next() else {
println!("✖ err: missing todo item id");
continue;
};
let Some(item_id) = todo_list.find_item_id(hash_str.trim()) else {
println!("✖ err: unknown todo item id");
continue;
};
let Some(description) = parts.next() else {
println!("✖ err: missing todo item description");
continue;
};
let mut description = description.to_string();
while let Some(remainder) = parts.next() {
description.push_str(" ");
description.push_str(remainder);
}
match todo_list.update(item_id, description.trim()) {
Ok(event) => {
tx.publish(event).await?;
}
Err(err) => {
println!("err: {}", err);
}
}
}
if let Some(value) = input.strip_prefix("/delete") {
let mut parts = value.split_whitespace();
let Some(hash_str) = parts.next() else {
println!("✖ err: missing todo item id");
continue;
};
let Some(item_id) = todo_list.find_item_id(hash_str.trim()) else {
println!("✖ err: unknown todo item id");
continue;
};
match todo_list.delete(item_id) {
Ok(event) => {
tx.publish(event).await?;
}
Err(err) => {
println!("✖ err: {err}");
}
}
}
if input.strip_prefix("/show").is_some() {
println!("⎯⎯⎯⎯⎯");
println!("TODO LIST: {}", todo_list.id());
if todo_list.is_empty() {
println!(".. no items yet ..");
} else {
println!("⎯⎯⎯⎯⎯");
for item in todo_list.items() {
let short_hex = item.id.to_hex()[0..4].to_string();
println!("◆ [{}]: {}", short_hex, item.description);
}
}
println!("⎯⎯⎯⎯⎯");
}
}
Some(ref event) = rx.next() => {
if let StreamEvent::SyncStarted { remote_node_id, incoming_bytes, .. } = event {
println!("∇ start sync with node {remote_node_id}, downloading {incoming_bytes} bytes");
}
if let StreamEvent::Processed { operation, .. } = event {
todo_list.process(operation.message(), operation.timestamp().into());
}
}
}
}
}
fn input_loop(line_tx: mpsc::Sender<String>) -> Result<()> {
let mut buffer = String::new();
let stdin = std::io::stdin();
loop {
stdin.read_line(&mut buffer)?;
line_tx.blocking_send(buffer.clone())?;
buffer.clear();
}
}