use bamboo_rs_core_ed25519_yasmf::entry::is_lipmaa_required;
use log::{debug, info};
use std::collections::{HashMap, HashSet};
use std::convert::TryFrom;
use crate::document::{Document, DocumentBuilder};
use crate::entry::{decode_entry, EntrySigned, SeqNum};
use crate::hash::Hash;
use crate::identity::Author;
use crate::operation::{
AsOperation, AsVerifiedOperation, Operation, OperationEncoded, VerifiedOperation,
};
use crate::test_utils::mocks::logs::{AuthorLogs, LogEntry};
use crate::test_utils::mocks::utils::Result;
use crate::test_utils::mocks::Client;
use crate::test_utils::utils::NextEntryArgs;
pub fn send_to_node(
node: &mut Node,
client: &Client,
operation: &Operation,
) -> Result<(Hash, NextEntryArgs)> {
let document_id = if operation.is_create() {
None
} else {
let previous_operations = operation
.previous_operations()
.expect("UPDATE / DELETE operations must contain previous_operations");
let document_id = node
.get_document_id_by_entry(previous_operations.into_iter().next().unwrap().as_hash());
Some(document_id.expect("This node does not contain the required document"))
};
let entry_args = node.get_next_entry_args(&client.author(), document_id.as_ref(), None)?;
let entry_encoded = client.signed_encoded_entry(operation.to_owned(), entry_args);
let operation_encoded = OperationEncoded::try_from(operation).unwrap();
let next_entry_args = node.publish_entry(&entry_encoded, &operation_encoded)?;
Ok((entry_encoded.hash(), next_entry_args))
}
fn calculate_links(seq_num: &SeqNum, log: &[LogEntry]) -> (Option<Hash>, Option<Hash>) {
let skiplink = match seq_num.skiplink_seq_num() {
Some(seq) if is_lipmaa_required(seq_num.as_u64()) => Some(
log.get(seq.as_u64() as usize - 1)
.expect("Skiplink missing!")
.hash(),
),
_ => None,
};
let backlink = seq_num.backlink_seq_num().map(|seq| {
log.get(seq.as_u64() as usize - 1)
.expect("Backlink missing!")
.hash()
});
(backlink, skiplink)
}
pub type Database = HashMap<String, AuthorLogs>;
#[derive(Debug, Default)]
pub struct Node {
db: Database,
}
impl Node {
pub fn new() -> Self {
Self {
db: Database::new(),
}
}
pub fn db(&self) -> Database {
self.db.clone()
}
pub fn get_authors(&self) -> Vec<&String> {
self.db.keys().into_iter().collect()
}
fn get_author_logs_mut(&mut self, author: &Author) -> Option<&mut AuthorLogs> {
self.db.get_mut(author.as_str())
}
fn get_author_logs(&self, author: &Author) -> Option<&AuthorLogs> {
self.db.get(author.as_str())
}
pub fn get_entry(&self, id: &Hash) -> LogEntry {
self.all_entries()
.iter()
.find(|entry| &entry.hash() == id)
.unwrap()
.to_owned()
}
fn get_document_id_by_entry(&self, entry: &Hash) -> Option<Hash> {
let mut document_id = None;
self.db.iter().any(|(_author, logs)| {
let document_log = logs.find_document_log_by_entry(entry);
match document_log {
Some(log) => {
document_id = Some(log.document());
true
}
None => false,
}
});
document_id
}
pub fn all_entries(&self) -> Vec<LogEntry> {
let mut all_entries: Vec<LogEntry> = Vec::new();
self.db.iter().for_each(|(_id, author_logs)| {
author_logs
.iter()
.for_each(|log| all_entries.append(log.entries().as_mut()))
});
all_entries
}
pub fn get_next_entry_args(
&self,
author: &Author,
document_id: Option<&Hash>,
seq_num: Option<&SeqNum>,
) -> Result<NextEntryArgs> {
info!(
"[next_entry_args] REQUEST: next entry args for author {} document {} {}",
author.as_str(),
document_id.map(|id| id.as_str()).unwrap_or("not provided"),
seq_num
.map(|seq_num| format!("at sequence number {}", seq_num.as_u64()))
.unwrap_or_else(|| "".into())
);
debug!("\n{:?}\n{:?}\n{:?}", author, document_id, seq_num);
let next_entry_args = self.next_entry_args(author, document_id, seq_num)?;
info!(
"[next_entry_args] RESPONSE: log id: {} seq num: {} backlink: {} skiplink: {}",
next_entry_args.log_id.as_u64(),
next_entry_args.seq_num.as_u64(),
next_entry_args
.backlink
.as_ref()
.map(|hash| hash.as_str())
.unwrap_or("none"),
next_entry_args
.skiplink
.as_ref()
.map(|hash| hash.as_str())
.unwrap_or("none"),
);
debug!("\n{:?}", next_entry_args);
Ok(next_entry_args)
}
fn next_entry_args(
&self,
author: &Author,
document_id: Option<&Hash>,
seq_num: Option<&SeqNum>,
) -> Result<NextEntryArgs> {
let author_logs = match self.get_author_logs(author) {
Some(logs) => logs.clone(),
None => AuthorLogs::new(),
};
let document_log = match document_id {
Some(document_id) => author_logs.get_log_by_document_id(document_id),
None => None,
};
let entry_args = match document_log {
Some(log) => {
let mut entries = log.entries();
let seq_num_inner = match seq_num {
Some(s) => {
entries = entries[..s.as_u64() as usize - 1].to_owned();
*s
}
None => {
log.next_seq_num()
}
};
let (backlink, skiplink) = calculate_links(&seq_num_inner, &entries);
NextEntryArgs {
log_id: log.id(),
seq_num: seq_num_inner,
skiplink,
backlink,
}
}
None => NextEntryArgs {
log_id: author_logs.next_log_id(),
seq_num: SeqNum::default(),
skiplink: None,
backlink: None,
},
};
Ok(entry_args)
}
pub fn publish_entry(
&mut self,
entry_encoded: &EntrySigned,
operation_encoded: &OperationEncoded,
) -> Result<NextEntryArgs> {
let entry = decode_entry(entry_encoded, Some(operation_encoded))?;
let log_id = entry.log_id();
let author = entry_encoded.author();
let operation = entry.operation().unwrap();
info!(
"[publish_entry] REQUEST: publish entry: {} from author: {}",
entry_encoded.hash().as_str(),
author.as_str()
);
debug!("\n{:?}\n{:?}", entry_encoded, operation_encoded);
let document_id = if !operation.is_create() {
let previous_operations = operation.previous_operations().unwrap_or_else(|| {
panic!(
"Document log for entry {} not found on node",
entry_encoded.hash().as_str()
)
});
let document_id = self
.get_document_id_by_entry(previous_operations.into_iter().next().unwrap().as_hash())
.unwrap_or_else(|| {
panic!(
"Document log for entry {} not found on node",
entry_encoded.hash().as_str()
)
});
info!("Document found with id {}", document_id.as_str());
document_id
} else {
info!(
"Creating new document with id {}",
entry_encoded.hash().as_str()
);
entry_encoded.hash()
};
let author_logs = match self.get_author_logs_mut(&author) {
Some(logs) => logs,
None => {
self.db.insert(author.as_str().into(), AuthorLogs::new());
self.get_author_logs_mut(&author).unwrap()
}
};
match author_logs.get_log_mut(log_id) {
Some(_) if operation.is_create() => {
return Err(format!("Log with id: {} already exists.", log_id.as_u64()).into());
}
Some(log) => {
log.add_entry(LogEntry::new(entry_encoded, operation_encoded));
}
None => {
let expected_log_id = author_logs.next_log_id();
if *log_id != expected_log_id {
return Err(format!(
"Passed log id {} does not match expected log id {}",
log_id.as_u64(),
expected_log_id.as_u64()
)
.into());
};
author_logs.create_new_log(document_id.clone(), entry_encoded, operation_encoded);
}
};
let next_entry_args = self.next_entry_args(&author, Some(&document_id), None)?;
info!(
"[publish_entry] RESPONSE: succesfully published entry: {} to log: {} and returning next entry args",
entry_encoded.hash().as_str(),
log_id.as_u64()
);
debug!("\n{:?}", next_entry_args);
Ok(next_entry_args)
}
pub fn get_document_entries(&self, id: &Hash) -> Vec<LogEntry> {
self.db()
.iter()
.flat_map(|(_, author_logs)| author_logs.iter().filter(|log| log.document() == *id))
.flat_map(|log| log.entries())
.collect()
}
pub fn get_document(&self, id: &Hash) -> Document {
let entries = self.get_document_entries(id);
let operations = entries
.iter()
.map(|entry| {
VerifiedOperation::new_from_entry(
&entry.entry_encoded(),
&entry.operation_encoded(),
)
.unwrap()
})
.collect();
DocumentBuilder::new(operations).build().unwrap()
}
pub fn get_documents(&self) -> Vec<Document> {
let mut documents = HashSet::new();
for (_author, author_logs) in self.db() {
author_logs.iter().for_each(|log| {
documents.insert(log.document().as_str().to_string());
});
}
documents
.iter()
.map(|x| self.get_document(&Hash::new(x).unwrap()))
.collect()
}
}
#[cfg(test)]
mod tests {
use rstest::rstest;
use crate::document::DocumentViewId;
use crate::entry::{LogId, SeqNum};
use crate::identity::KeyPair;
use crate::operation::OperationValue;
use crate::test_utils::fixtures::{create_operation, key_pair, private_key, update_operation};
use crate::test_utils::mocks::client::Client;
use crate::test_utils::utils::NextEntryArgs;
use super::{send_to_node, Node};
#[rstest]
fn publishing_entries(private_key: String) {
let panda = Client::new("panda".to_string(), key_pair(&private_key));
let mut node = Node::new();
let next_entry_args = node
.get_next_entry_args(&panda.author(), None, None)
.unwrap();
let mut expected_next_entry_args = NextEntryArgs {
log_id: LogId::new(1),
seq_num: SeqNum::new(1).unwrap(),
backlink: None,
skiplink: None,
};
assert_eq!(next_entry_args.log_id, expected_next_entry_args.log_id);
assert_eq!(next_entry_args.seq_num, expected_next_entry_args.seq_num);
assert_eq!(next_entry_args.backlink, expected_next_entry_args.backlink);
assert_eq!(next_entry_args.skiplink, expected_next_entry_args.skiplink);
let (panda_entry_1_hash, next_entry_args) = send_to_node(
&mut node,
&panda,
&create_operation(&[(
"message",
OperationValue::Text("Ohh, my first message! [Panda]".to_string()),
)]),
)
.unwrap();
expected_next_entry_args = NextEntryArgs {
log_id: LogId::new(1),
seq_num: SeqNum::new(2).unwrap(),
backlink: Some(panda_entry_1_hash.clone()),
skiplink: None,
};
assert_eq!(next_entry_args.log_id, expected_next_entry_args.log_id);
assert_eq!(next_entry_args.seq_num, expected_next_entry_args.seq_num);
assert_eq!(next_entry_args.backlink, expected_next_entry_args.backlink);
assert_eq!(next_entry_args.skiplink, expected_next_entry_args.skiplink);
assert_eq!(node.get_authors().len(), 1);
assert_eq!(node.get_author_logs(&panda.author()).unwrap().len(), 1);
let (panda_entry_2_hash, next_entry_args) = send_to_node(
&mut node,
&panda,
&update_operation(
&[(
"message",
OperationValue::Text("Which I now update. [Panda]".to_string()),
)],
&panda_entry_1_hash.clone().into(),
),
)
.unwrap();
expected_next_entry_args = NextEntryArgs {
log_id: LogId::new(1),
seq_num: SeqNum::new(3).unwrap(),
backlink: Some(panda_entry_2_hash.clone()),
skiplink: None,
};
assert_eq!(next_entry_args.log_id, expected_next_entry_args.log_id);
assert_eq!(next_entry_args.seq_num, expected_next_entry_args.seq_num);
assert_eq!(next_entry_args.backlink, expected_next_entry_args.backlink);
assert_eq!(next_entry_args.skiplink, expected_next_entry_args.skiplink);
assert_eq!(node.get_authors().len(), 1);
assert_eq!(node.get_author_logs(&panda.author()).unwrap().len(), 1);
let penguin = Client::new("penguin".to_string(), KeyPair::new());
let next_entry_args = node
.next_entry_args(&penguin.author(), Some(&panda_entry_1_hash), None)
.unwrap();
expected_next_entry_args = NextEntryArgs {
log_id: LogId::new(1),
seq_num: SeqNum::new(1).unwrap(),
backlink: None,
skiplink: None,
};
assert_eq!(next_entry_args.log_id, expected_next_entry_args.log_id);
assert_eq!(next_entry_args.seq_num, expected_next_entry_args.seq_num);
assert_eq!(next_entry_args.backlink, expected_next_entry_args.backlink);
assert_eq!(next_entry_args.skiplink, expected_next_entry_args.skiplink);
let (penguin_entry_1_hash, next_entry_args) = send_to_node(
&mut node,
&penguin,
&update_operation(
&[(
"message",
OperationValue::Text("My turn to update. [Penguin]".to_string()),
)],
&panda_entry_2_hash.into(),
),
)
.unwrap();
expected_next_entry_args = NextEntryArgs {
log_id: LogId::new(1),
seq_num: SeqNum::new(2).unwrap(),
backlink: Some(penguin_entry_1_hash.clone()),
skiplink: None,
};
assert_eq!(next_entry_args.log_id, expected_next_entry_args.log_id);
assert_eq!(next_entry_args.seq_num, expected_next_entry_args.seq_num);
assert_eq!(next_entry_args.backlink, expected_next_entry_args.backlink);
assert_eq!(next_entry_args.skiplink, expected_next_entry_args.skiplink);
assert_eq!(node.get_authors().len(), 2);
assert_eq!(node.get_author_logs(&penguin.author()).unwrap().len(), 1);
let (penguin_entry_2_hash, next_entry_args) = send_to_node(
&mut node,
&penguin,
&update_operation(
&[(
"message",
OperationValue::Text("And again. [Penguin]".to_string()),
)],
&penguin_entry_1_hash.into(),
),
)
.unwrap();
expected_next_entry_args = NextEntryArgs {
log_id: LogId::new(1),
seq_num: SeqNum::new(3).unwrap(),
backlink: Some(penguin_entry_2_hash),
skiplink: None,
};
assert_eq!(next_entry_args.log_id, expected_next_entry_args.log_id);
assert_eq!(next_entry_args.seq_num, expected_next_entry_args.seq_num);
assert_eq!(next_entry_args.backlink, expected_next_entry_args.backlink);
assert_eq!(next_entry_args.skiplink, expected_next_entry_args.skiplink);
assert_eq!(node.get_authors().len(), 2);
assert_eq!(node.get_author_logs(&penguin.author()).unwrap().len(), 1);
let document = node.get_document(&panda_entry_1_hash);
let document_view_value = document.view().unwrap().get("message").unwrap();
assert_eq!(
document_view_value.value(),
&OperationValue::Text("And again. [Penguin]".to_string())
);
assert_eq!(node.get_documents().len(), 1);
let (panda_entry_1_hash, next_entry_args) = send_to_node(
&mut node,
&panda,
&create_operation(&[(
"message",
OperationValue::Text("Ohh, my first message in a new document!".to_string()),
)]),
)
.unwrap();
expected_next_entry_args = NextEntryArgs {
log_id: LogId::new(2),
seq_num: SeqNum::new(2).unwrap(),
backlink: Some(panda_entry_1_hash),
skiplink: None,
};
assert_eq!(next_entry_args.log_id, expected_next_entry_args.log_id);
assert_eq!(next_entry_args.seq_num, expected_next_entry_args.seq_num);
assert_eq!(next_entry_args.backlink, expected_next_entry_args.backlink);
assert_eq!(next_entry_args.skiplink, expected_next_entry_args.skiplink);
assert_eq!(node.get_authors().len(), 2);
assert_eq!(node.get_author_logs(&panda.author()).unwrap().len(), 2);
assert_eq!(node.get_documents().len(), 2);
}
#[rstest]
fn next_entry_args_at_specific_seq_num(private_key: String) {
let panda = Client::new("panda".to_string(), key_pair(&private_key));
let mut node = Node::new();
let (entry1_hash, _) = send_to_node(
&mut node,
&panda,
&create_operation(&[(
"message",
OperationValue::Text("Ohh, my first message!".to_string()),
)]),
)
.unwrap();
send_to_node(
&mut node,
&panda,
&update_operation(
&[(
"message",
OperationValue::Text("Which I now update.".to_string()),
)],
&entry1_hash.clone().into(),
),
)
.unwrap();
let next_entry_args = node
.next_entry_args(
&panda.author(),
Some(&entry1_hash),
Some(&SeqNum::new(2).unwrap()),
)
.unwrap();
let expected_next_entry_args = NextEntryArgs {
log_id: LogId::new(1),
seq_num: SeqNum::new(2).unwrap(),
backlink: Some(entry1_hash),
skiplink: None,
};
assert_eq!(next_entry_args.log_id, expected_next_entry_args.log_id);
assert_eq!(next_entry_args.seq_num, expected_next_entry_args.seq_num);
assert_eq!(next_entry_args.backlink, expected_next_entry_args.backlink);
assert_eq!(next_entry_args.skiplink, expected_next_entry_args.skiplink);
}
#[rstest]
fn concurrent_updates(private_key: String) {
let panda = Client::new("panda".to_string(), key_pair(&private_key));
let penguin = Client::new(
"penguin".to_string(),
key_pair("eb852fefa703901e42f17cdc2aa507947f392a72101b2c1a6d30023af14f75e3"),
);
let mut node = Node::new();
let (panda_entry_1_hash, _) = send_to_node(
&mut node,
&panda,
&create_operation(&[
(
"cafe_name",
OperationValue::Text("Polar Pear Cafe".to_string()),
),
(
"address",
OperationValue::Text("1, Polar Bear Rise, Panda Town".to_string()),
),
]),
)
.unwrap();
let document = node.get_document(&panda_entry_1_hash);
let document_view_value = document.view().unwrap().get("cafe_name").unwrap();
assert_eq!(
document_view_value.value(),
&OperationValue::Text("Polar Pear Cafe".to_string())
);
let (panda_entry_2_hash, _) = send_to_node(
&mut node,
&panda,
&update_operation(
&[(
"cafe_name",
OperationValue::Text("Polar Bear Cafe".to_string()),
)],
&panda_entry_1_hash.clone().into(),
),
)
.unwrap();
let document = node.get_document(&panda_entry_1_hash);
let document_view_value = document.view().unwrap().get("cafe_name").unwrap();
assert_eq!(
document_view_value.value(),
&OperationValue::Text("Polar Bear Cafe".to_string())
);
let (penguin_entry_1_hash, _) = send_to_node(
&mut node,
&penguin,
&update_operation(
&[(
"address",
OperationValue::Text("1, Polar Bear rd, Panda Town".to_string()),
)],
&panda_entry_1_hash.clone().into(),
),
)
.unwrap();
let document = node.get_document(&panda_entry_1_hash);
let document_view_value = document.view().unwrap().get("cafe_name").unwrap();
assert_eq!(
document_view_value.value(),
&OperationValue::Text("Polar Bear Cafe".to_string())
);
let (_penguin_entry_2_hash, _) = send_to_node(
&mut node,
&penguin,
&update_operation(
&[(
"cafe_name",
OperationValue::Text("Polar Bear Café".to_string()),
)],
&DocumentViewId::new(&[penguin_entry_1_hash.into(), panda_entry_2_hash.into()])
.unwrap(),
),
)
.unwrap();
let document = node.get_document(&panda_entry_1_hash);
let document_view_value = document.view().unwrap().get("cafe_name").unwrap();
assert_eq!(
document_view_value.value(),
&OperationValue::Text("Polar Bear Café".to_string())
);
}
}