use async_trait::async_trait;
use crate::document::DocumentId;
use crate::entry::SeqNum;
use crate::hash::Hash;
use crate::operation::{AsOperation, AsVerifiedOperation, Operation};
use crate::storage_provider::errors::PublishEntryError;
use crate::storage_provider::traits::{
AsEntryArgsRequest, AsEntryArgsResponse, AsPublishEntryRequest, AsPublishEntryResponse,
AsStorageEntry, AsStorageLog, EntryStore, LogStore, OperationStore,
};
use crate::Validate;
#[async_trait]
pub trait StorageProvider<
StorageEntry: AsStorageEntry,
StorageLog: AsStorageLog,
StorageOperation: AsVerifiedOperation,
>: EntryStore<StorageEntry> + LogStore<StorageLog> + OperationStore<StorageOperation>
{
type EntryArgsRequest: AsEntryArgsRequest + Sync;
type EntryArgsResponse: AsEntryArgsResponse;
type PublishEntryRequest: AsPublishEntryRequest + Sync;
type PublishEntryResponse: AsPublishEntryResponse;
async fn get_document_by_entry(
&self,
entry_hash: &Hash,
) -> Result<Option<DocumentId>, Box<dyn std::error::Error + Send + Sync>>;
async fn get_entry_args(
&self,
params: &Self::EntryArgsRequest,
) -> Result<Self::EntryArgsResponse, Box<dyn std::error::Error + Send + Sync>> {
params.validate()?;
let log = self
.find_document_log_id(params.author(), params.document_id().as_ref())
.await?;
let entry_latest = self.get_latest_entry(params.author(), &log).await?;
match entry_latest.clone() {
Some(entry_backlink) => {
let entry_latest = entry_latest.unwrap();
let entry_hash_backlink = entry_backlink.hash();
let entry_hash_skiplink = self.determine_next_skiplink(&entry_latest).await?;
Ok(Self::EntryArgsResponse::new(
Some(entry_hash_backlink.clone()),
entry_hash_skiplink,
entry_latest.seq_num().clone().next().unwrap(),
entry_latest.log_id(),
))
}
None => Ok(Self::EntryArgsResponse::new(
None,
None,
SeqNum::default(),
log,
)),
}
}
async fn publish_entry(
&self,
params: &Self::PublishEntryRequest,
) -> Result<Self::PublishEntryResponse, Box<dyn std::error::Error + Send + Sync>> {
let entry = StorageEntry::new(params.entry_signed(), params.operation_encoded())?;
entry.validate()?;
let document_id = if entry.operation().is_create() {
DocumentId::new(entry.hash().into())
} else {
let operation = Operation::from(params.operation_encoded());
operation.validate()?;
let previous_operation_id = operation
.previous_operations()
.unwrap()
.into_iter()
.next()
.unwrap();
self.get_document_by_entry(previous_operation_id.as_hash())
.await?
.ok_or_else(|| PublishEntryError::DocumentMissing(entry.hash()))?
};
let document_log_id = self
.find_document_log_id(&entry.author(), Some(&document_id))
.await?;
if document_log_id != entry.log_id() {
return Err(PublishEntryError::InvalidLogId(
entry.log_id().as_u64(),
document_log_id.as_u64(),
)
.into());
}
let entry_backlink_bytes = self
.try_get_backlink(&entry)
.await?
.map(|link| link.entry_bytes());
let entry_skiplink_bytes = self
.try_get_skiplink(&entry)
.await?
.map(|link| link.entry_bytes());
bamboo_rs_core_ed25519_yasmf::verify(
&entry.entry_bytes(),
Some(¶ms.operation_encoded().to_bytes()),
entry_skiplink_bytes.as_deref(),
entry_backlink_bytes.as_deref(),
)?;
if entry.operation().is_create() {
let log = StorageLog::new(
&entry.author(),
&entry.operation().schema(),
&document_id,
&entry.log_id(),
);
self.insert_log(log).await?;
}
self.insert_entry(entry.clone()).await?;
let entry_latest: StorageEntry = self
.get_latest_entry(&entry.author(), &entry.log_id())
.await?
.unwrap();
let entry_hash_skiplink = self.determine_next_skiplink(&entry_latest).await?;
let next_seq_num = entry_latest.seq_num().clone().next().unwrap();
Ok(Self::PublishEntryResponse::new(
Some(entry.hash()),
entry_hash_skiplink,
next_seq_num,
entry.log_id(),
))
}
}
#[cfg(test)]
pub mod tests {
use std::convert::TryFrom;
use std::sync::{Arc, Mutex};
use async_trait::async_trait;
use rstest::rstest;
use crate::document::DocumentId;
use crate::entry::{sign_and_encode, Entry, LogId};
use crate::hash::Hash;
use crate::identity::KeyPair;
use crate::operation::{
AsOperation, OperationEncoded, OperationFields, OperationId, OperationValue,
VerifiedOperation,
};
use crate::storage_provider::traits::test_utils::{
test_db, EntryArgsRequest, EntryArgsResponse, PublishEntryRequest, PublishEntryResponse,
SimplestStorageProvider, StorageEntry, StorageLog, TestStore,
};
use crate::storage_provider::traits::{
AsEntryArgsResponse, AsPublishEntryResponse, AsStorageEntry, AsStorageLog,
};
use crate::test_utils::fixtures::{entry, key_pair, operation, operation_fields, operation_id};
use super::StorageProvider;
#[async_trait]
impl StorageProvider<StorageEntry, StorageLog, VerifiedOperation> for SimplestStorageProvider {
type EntryArgsRequest = EntryArgsRequest;
type EntryArgsResponse = EntryArgsResponse;
type PublishEntryRequest = PublishEntryRequest;
type PublishEntryResponse = PublishEntryResponse;
async fn get_document_by_entry(
&self,
entry_hash: &Hash,
) -> Result<Option<DocumentId>, Box<dyn std::error::Error + Sync + Send>> {
let entries = self.entries.lock().unwrap();
let entry = entries.iter().find(|entry| entry.hash() == *entry_hash);
let entry = match entry {
Some(entry) => entry,
None => return Ok(None),
};
let logs = self.logs.lock().unwrap();
let log = logs
.iter()
.find(|log| log.id() == entry.log_id() && log.author() == entry.author());
Ok(Some(log.unwrap().document_id()))
}
}
#[rstest]
#[async_std::test]
async fn can_publish_entries(
#[from(test_db)]
#[with(20, 1)]
#[future]
db: TestStore,
) {
let db = db.await;
let new_db = SimplestStorageProvider::default();
let entries = db.store.entries.lock().unwrap().clone();
for entry in entries.clone() {
let publish_entry_request = PublishEntryRequest(
entry.entry_signed(),
entry.operation_encoded().unwrap().clone(),
);
let publish_entry_response = new_db.publish_entry(&publish_entry_request).await;
assert!(publish_entry_response.is_ok());
let mut seq_num = entry.seq_num();
if seq_num.as_u64() == entries.len() as u64 {
break;
};
let next_seq_num = seq_num.next().unwrap();
let skiplink = entries
.get(next_seq_num.as_u64() as usize - 1)
.unwrap()
.skiplink_hash();
let backlink = entries
.get(next_seq_num.as_u64() as usize - 1)
.unwrap()
.backlink_hash();
let expected_reponse =
PublishEntryResponse::new(backlink, skiplink, next_seq_num, LogId::default());
assert_eq!(publish_entry_response.unwrap(), expected_reponse);
}
}
#[rstest]
#[async_std::test]
async fn rejects_invalid_backlink(
key_pair: KeyPair,
#[from(test_db)]
#[with(4, 1)]
#[future]
db: TestStore,
) {
let db = db.await;
let new_db = SimplestStorageProvider::default();
let entries = db.store.entries.lock().unwrap().clone();
for index in 0..3 {
let entry = entries.get(index).unwrap();
let publish_entry_request = PublishEntryRequest(
entry.entry_signed(),
entry.operation_encoded().unwrap().clone(),
);
new_db.publish_entry(&publish_entry_request).await.unwrap();
}
let entry_four = entries.get(3).unwrap();
let entry_with_invalid_backlink = Entry::new(
&entry_four.log_id(),
Some(&entry_four.operation()),
entry_four.skiplink_hash().as_ref(),
Some(&entries.get(1).unwrap().hash()),
&entry_four.seq_num(),
)
.unwrap();
let entry_signed = sign_and_encode(&entry_with_invalid_backlink, &key_pair).unwrap();
let publish_entry_request = PublishEntryRequest(
entry_signed.clone(),
entry_four.operation_encoded().unwrap(),
);
let error_response = new_db.publish_entry(&publish_entry_request).await;
println!("{:#?}", error_response);
assert_eq!(
format!("{}", error_response.unwrap_err()),
format!(
"The backlink hash encoded in the entry: {} did not match the expected backlink hash",
entry_signed.hash()
)
)
}
#[rstest]
#[async_std::test]
async fn rejects_invalid_skiplink(
key_pair: KeyPair,
#[from(test_db)]
#[with(4, 1)]
#[future]
db: TestStore,
) {
let db = db.await;
let new_db = SimplestStorageProvider::default();
let entries = db.store.entries.lock().unwrap().clone();
for index in 0..3 {
let entry = entries.get(index).unwrap();
let publish_entry_request = PublishEntryRequest(
entry.entry_signed(),
entry.operation_encoded().unwrap().clone(),
);
new_db.publish_entry(&publish_entry_request).await.unwrap();
}
let entry_four = entries.get(3).unwrap();
let entry_with_invalid_backlink = Entry::new(
&entry_four.log_id(),
Some(&entry_four.operation()),
Some(&entries.get(1).unwrap().hash()),
entry_four.backlink_hash().as_ref(),
&entry_four.seq_num(),
)
.unwrap();
let entry_signed = sign_and_encode(&entry_with_invalid_backlink, &key_pair).unwrap();
let publish_entry_request = PublishEntryRequest(
entry_signed.clone(),
entry_four.operation_encoded().unwrap(),
);
let error_response = new_db.publish_entry(&publish_entry_request).await;
println!("{:#?}", error_response);
assert_eq!(
format!("{}", error_response.unwrap_err()),
format!(
"The skiplink hash encoded in the entry: {} did not match the known hash of the skiplink target",
entry_signed.hash()
)
)
}
#[rstest]
#[async_std::test]
async fn gets_entry_args(
#[from(test_db)]
#[with(20, 1)]
#[future]
db: TestStore,
) {
let db = db.await;
let new_db = SimplestStorageProvider::default();
let entries = db.store.entries.lock().unwrap().clone();
for entry in entries.clone() {
let is_create = entry.operation().is_create();
let document_id: Option<DocumentId> = match is_create {
true => None,
false => Some(entries.get(0).unwrap().hash().into()),
};
let entry_args_request = EntryArgsRequest {
author: entry.author(),
document: document_id,
};
let entry_args_response = new_db.get_entry_args(&entry_args_request).await;
assert!(entry_args_response.is_ok());
let seq_num = entry.seq_num();
let backlink = entry.backlink_hash();
let skiplink = entry.skiplink_hash();
let expected_reponse =
EntryArgsResponse::new(backlink, skiplink, seq_num, LogId::default());
assert_eq!(entry_args_response.unwrap(), expected_reponse);
let publish_entry_request = PublishEntryRequest(
entry.entry_signed(),
entry.operation_encoded().unwrap().clone(),
);
new_db.publish_entry(&publish_entry_request).await.unwrap();
}
}
#[rstest]
#[async_std::test]
async fn wrong_log_id(
key_pair: KeyPair,
#[from(test_db)]
#[with(2, 1)]
#[future]
db: TestStore,
) {
let db = db.await;
let new_db = SimplestStorageProvider::default();
let entries = db.store.entries.lock().unwrap().clone();
let publish_entry_request = PublishEntryRequest(
entries.get(0).unwrap().entry_signed(),
entries.get(0).unwrap().operation_encoded().unwrap(),
);
new_db.publish_entry(&publish_entry_request).await.unwrap();
let entry_with_wrong_log_id = Entry::new(
&LogId::new(2), Some(&entries.get(1).unwrap().operation()),
entries.get(1).unwrap().skiplink_hash().as_ref(),
entries.get(1).unwrap().backlink_hash().as_ref(),
&entries.get(1).unwrap().seq_num(),
)
.unwrap();
let signed_entry_with_wrong_log_id =
sign_and_encode(&entry_with_wrong_log_id, &key_pair).unwrap();
let encoded_operation =
OperationEncoded::try_from(&entries.get(1).unwrap().operation()).unwrap();
let request_with_wrong_log_id =
PublishEntryRequest(signed_entry_with_wrong_log_id, encoded_operation);
let error_response = new_db.publish_entry(&request_with_wrong_log_id).await;
assert_eq!(
format!("{}", error_response.unwrap_err()),
"Requested log id 2 does not match expected log id 1"
)
}
#[rstest]
#[async_std::test]
async fn skiplink_does_not_exist(
#[from(test_db)]
#[with(8, 1)]
#[future]
db: TestStore,
) {
let db = db.await;
let entries = db.store.entries.lock().unwrap().clone();
let logs = db.store.logs.lock().unwrap().clone();
let log_entries_with_skiplink_missing = vec![
entries.get(0).unwrap().clone(),
entries.get(1).unwrap().clone(),
entries.get(2).unwrap().clone(),
entries.get(4).unwrap().clone(),
entries.get(5).unwrap().clone(),
entries.get(6).unwrap().clone(),
];
let new_db = SimplestStorageProvider {
logs: Arc::new(Mutex::new(logs)),
entries: Arc::new(Mutex::new(log_entries_with_skiplink_missing)),
operations: Arc::new(Mutex::new(Vec::new())),
};
let entry = entries.get(7).unwrap();
let publish_entry_request =
PublishEntryRequest(entry.entry_signed(), entry.operation_encoded().unwrap());
let error_response = new_db.publish_entry(&publish_entry_request).await;
assert_eq!(
format!("{}", error_response.unwrap_err()),
format!(
"Could not find expected skiplink in database for entry with id: {}",
entry.hash()
)
)
}
#[rstest]
#[async_std::test]
async fn prev_op_does_not_exist(
#[from(test_db)]
#[with(4, 1)]
#[future]
db: TestStore,
operation_fields: OperationFields,
#[from(operation_id)] invalid_prev_op: OperationId,
key_pair: KeyPair,
) {
let db = db.await;
let entries = db.store.entries.lock().unwrap().clone();
let logs = db.store.logs.lock().unwrap().clone();
let three_valid_entries = vec![
entries.get(0).unwrap().clone(),
entries.get(1).unwrap().clone(),
entries.get(2).unwrap().clone(),
];
let new_db = SimplestStorageProvider {
logs: Arc::new(Mutex::new(logs)),
entries: Arc::new(Mutex::new(three_valid_entries)),
operations: Arc::new(Mutex::new(Vec::new())),
};
let next_entry = entries.get(3).unwrap();
let update_operation_with_invalid_previous_operations = operation(
Some(operation_fields.clone()),
Some(invalid_prev_op.into()),
None,
);
let update_entry = entry(
next_entry.seq_num().as_u64(),
next_entry.log_id().as_u64(),
next_entry.backlink_hash(),
next_entry.skiplink_hash(),
Some(update_operation_with_invalid_previous_operations.clone()),
);
let encoded_entry = sign_and_encode(&update_entry, &key_pair).unwrap();
let encoded_operation =
OperationEncoded::try_from(&update_operation_with_invalid_previous_operations).unwrap();
let publish_entry_request = PublishEntryRequest(encoded_entry.clone(), encoded_operation);
let error_response = new_db.publish_entry(&publish_entry_request).await;
assert_eq!(
format!("{}", error_response.unwrap_err()),
format!(
"Could not find document for entry in database with id: {}",
encoded_entry.hash()
)
)
}
#[rstest]
#[async_std::test]
async fn invalid_entry_op_pair(
#[from(test_db)]
#[with(4, 1)]
#[future]
db: TestStore,
) {
let db = db.await;
let entries = db.store.entries.lock().unwrap().clone();
let logs = db.store.logs.lock().unwrap().clone();
let three_valid_entries = vec![
entries.get(0).unwrap().clone(),
entries.get(1).unwrap().clone(),
entries.get(2).unwrap().clone(),
];
let new_db = SimplestStorageProvider {
logs: Arc::new(Mutex::new(logs)),
entries: Arc::new(Mutex::new(three_valid_entries)),
operations: Arc::new(Mutex::new(Vec::new())),
};
let next_entry = entries.get(3).unwrap();
let mismatched_operation = operation(
Some(operation_fields(vec![(
"poopy",
OperationValue::Text("This is the WRONG operation :-(".to_string()),
)])),
Some(next_entry.operation_encoded().unwrap().hash().into()),
None,
);
let encoded_operation = OperationEncoded::try_from(&mismatched_operation).unwrap();
let publish_entry_request =
PublishEntryRequest(next_entry.entry_signed(), encoded_operation);
let error_response = new_db.publish_entry(&publish_entry_request).await;
assert_eq!(
format!("{}", error_response.unwrap_err()),
"operation needs to match payload hash of encoded entry"
)
}
}