use crate::api::helpers::get_skiplink_for_entry;
use crate::api::validation::{
ensure_document_not_deleted, get_checked_document_id_for_view_id, get_expected_skiplink,
increment_seq_num, is_next_seq_num, validate_claimed_schema_id, verify_log_id,
};
use crate::api::DomainError;
use crate::document::DocumentId;
use crate::entry::decode::decode_entry;
use crate::entry::traits::{AsEncodedEntry, AsEntry};
use crate::entry::{EncodedEntry, LogId, SeqNum};
use crate::hash::Hash;
use crate::operation::plain::PlainOperation;
use crate::operation::traits::AsOperation;
use crate::operation::validate::validate_operation_with_entry;
use crate::operation::{EncodedOperation, Operation, OperationAction, OperationId};
use crate::schema::Schema;
use crate::storage_provider::traits::{EntryStore, LogStore, OperationStore};
type Backlink = Hash;
type Skiplink = Hash;
pub async fn publish<S: EntryStore + OperationStore + LogStore>(
store: &S,
schema: &Schema,
encoded_entry: &EncodedEntry,
plain_operation: &PlainOperation,
encoded_operation: &EncodedOperation,
) -> Result<(Option<Backlink>, Option<Skiplink>, SeqNum, LogId), DomainError> {
let entry = decode_entry(encoded_entry)?;
let (operation, operation_id) = validate_entry_and_operation(
store,
schema,
&entry,
encoded_entry,
plain_operation,
encoded_operation,
)
.await?;
let document_id = determine_document_id(store, &operation, &operation_id).await?;
verify_log_id(store, entry.public_key(), entry.log_id(), &document_id).await?;
let next_seq_num = increment_seq_num(&mut entry.seq_num().clone()).map_err(|_| {
DomainError::MaxSeqNumReached(entry.public_key().to_string(), entry.log_id().as_u64())
})?;
let skiplink =
get_skiplink_for_entry(store, &next_seq_num, entry.log_id(), entry.public_key()).await?;
if entry.seq_num().is_first() {
store
.insert_log(
entry.log_id(),
entry.public_key(),
&operation.schema_id(),
&document_id,
)
.await?;
}
store
.insert_entry(&entry, encoded_entry, Some(encoded_operation))
.await?;
store
.insert_operation(&operation_id, entry.public_key(), &operation, &document_id)
.await?;
Ok((
Some(encoded_entry.hash()),
skiplink,
next_seq_num,
entry.log_id().to_owned(),
))
}
async fn validate_entry_and_operation<S: EntryStore + OperationStore + LogStore>(
store: &S,
schema: &Schema,
entry: &impl AsEntry,
encoded_entry: &impl AsEncodedEntry,
plain_operation: &PlainOperation,
encoded_operation: &EncodedOperation,
) -> Result<(Operation, OperationId), DomainError> {
let latest_entry = store
.get_latest_entry(entry.public_key(), entry.log_id())
.await?;
let latest_seq_num = latest_entry.as_ref().map(|entry| entry.seq_num());
is_next_seq_num(latest_seq_num, entry.seq_num())?;
let skiplink = match entry.skiplink() {
Some(_) => Some(
get_expected_skiplink(store, entry.public_key(), entry.log_id(), entry.seq_num())
.await?,
),
None => None,
};
let skiplink_params = skiplink.as_ref().map(|entry| {
let hash = entry.hash();
(entry.clone(), hash)
});
let backlink_params = latest_entry.as_ref().map(|entry| {
let hash = entry.hash();
(entry.clone(), hash)
});
let (operation, operation_id) = validate_operation_with_entry(
entry,
encoded_entry,
skiplink_params.as_ref().map(|(entry, hash)| (entry, hash)),
backlink_params.as_ref().map(|(entry, hash)| (entry, hash)),
plain_operation,
encoded_operation,
schema,
)?;
Ok((operation, operation_id))
}
async fn determine_document_id<S: OperationStore>(
store: &S,
operation: &impl AsOperation,
operation_id: &OperationId,
) -> Result<DocumentId, DomainError> {
let document_id = match operation.action() {
OperationAction::Create => {
Ok::<DocumentId, DomainError>(DocumentId::new(operation_id))
}
_ => {
let previous = operation.previous().unwrap();
validate_claimed_schema_id(store, &operation.schema_id(), &previous).await?;
let document_id = get_checked_document_id_for_view_id(store, &previous).await?;
Ok(document_id)
}
}?;
ensure_document_not_deleted(store, &document_id)
.await
.map_err(|_| DomainError::DeletedDocument)?;
Ok(document_id)
}
#[cfg(test)]
mod tests {
use rstest::rstest;
use crate::api::{next_args, publish};
use crate::document::{DocumentId, DocumentViewId};
use crate::entry::encode::sign_and_encode_entry;
use crate::entry::traits::{AsEncodedEntry, AsEntry};
use crate::entry::{LogId, SeqNum};
use crate::hash::Hash;
use crate::identity::KeyPair;
use crate::operation::decode::decode_operation;
use crate::operation::encode::encode_operation;
use crate::operation::{
Operation, OperationAction, OperationBuilder, OperationId, OperationValue,
};
use crate::schema::{FieldType, Schema, SchemaId, SchemaName};
use crate::storage_provider::traits::{EntryStore, LogStore, OperationStore};
use crate::test_utils::constants::{test_fields, PRIVATE_KEY};
use crate::test_utils::fixtures::{
create_operation, delete_operation, key_pair, operation, populate_store_config,
random_document_view_id, random_hash, random_operation_id, schema, update_operation,
};
use crate::test_utils::memory_store::helpers::{
populate_store, remove_entries, remove_operations, send_to_store, PopulateStoreConfig,
};
use crate::test_utils::memory_store::{MemoryStore, StorageEntry};
use crate::WithId;
use super::{determine_document_id, validate_entry_and_operation};
type LogIdAndSeqNum = (u64, u64);
#[rstest]
#[tokio::test]
async fn determines_document_id(
random_operation_id: OperationId,
#[with(test_fields(), random_document_view_id())] update_operation: Operation,
#[from(populate_store_config)]
#[with(4, 1, 1)]
config: PopulateStoreConfig,
) {
let store = MemoryStore::default();
let (key_pairs, document_ids) = populate_store(&store, &config).await;
let document_id = document_ids.get(0).unwrap();
let public_key = key_pairs[0].public_key();
let entry_one = store
.get_entry_at_seq_num(&public_key, &LogId::new(0), &SeqNum::new(1).unwrap())
.await
.unwrap()
.unwrap();
let operation_one = store
.get_operation(&entry_one.hash().into())
.await
.unwrap()
.unwrap();
let entry_four = store
.get_entry_at_seq_num(&public_key, &LogId::new(0), &SeqNum::new(4).unwrap())
.await
.unwrap()
.unwrap();
let operation_four = store
.get_operation(&entry_four.hash().into())
.await
.unwrap()
.unwrap();
let id = determine_document_id(&store, &operation_one, operation_one.id())
.await
.unwrap();
assert_eq!(document_id, &id);
let id = determine_document_id(&store, &operation_four, operation_four.id())
.await
.unwrap();
assert_eq!(document_id, &id);
let result = determine_document_id(&store, &update_operation, &random_operation_id).await;
assert!(result.is_err())
}
#[rstest]
#[tokio::test]
async fn determines_document_id_deleted_document(
#[from(populate_store_config)]
#[with(4, 1, 1, true)]
config: PopulateStoreConfig,
) {
let store = MemoryStore::default();
let (key_pairs, _) = populate_store(&store, &config).await;
let public_key = key_pairs[0].public_key();
let entry_one = store
.get_entry_at_seq_num(&public_key, &LogId::new(0), &SeqNum::new(1).unwrap())
.await
.unwrap()
.unwrap();
let operation_one = store
.get_operation(&entry_one.hash().into())
.await
.unwrap()
.unwrap();
let entry_four = store
.get_entry_at_seq_num(&public_key, &LogId::new(0), &SeqNum::new(4).unwrap())
.await
.unwrap()
.unwrap();
let operation_four = store
.get_operation(&entry_four.hash().into())
.await
.unwrap()
.unwrap();
let result = determine_document_id(&store, &operation_one, operation_one.id()).await;
assert!(result.is_err());
let result = determine_document_id(&store, &operation_four, operation_four.id()).await;
assert!(result.is_err());
}
#[rstest]
#[case::ok(&[(0, 8)], (0, 8))]
#[should_panic(
expected = "Expected skiplink entry not found in store: public key 2f8e50c2ede6d936ecc3144187ff1c273808185cfbc5ff3d3748d1ff7353fc96, log id 0, seq num 4"
)]
#[case::skiplink_missing(&[(0, 4), (0, 8)], (0, 8))]
#[should_panic(
expected = "Entry's claimed seq num of 8 does not match expected seq num of 7 for given public key and log"
)]
#[case::backlink_missing(&[(0, 7), (0, 8)], (0, 8))]
#[should_panic(
expected = "Entry's claimed seq num of 8 does not match expected seq num of 7 for given public key and log"
)]
#[case::backlink_and_skiplink_missing(&[(0, 4), (0, 7), (0, 8)], (0, 8))]
#[should_panic(
expected = "Entry's claimed seq num of 8 does not match expected seq num of 9 for given public key and log"
)]
#[case::seq_num_occupied_again(&[], (0, 8))]
#[should_panic(
expected = "Entry's claimed seq num of 7 does not match expected seq num of 9 for given public key and log"
)]
#[case::seq_num_occupied_(&[], (0, 7))]
#[should_panic(
expected = "Entry's claimed seq num of 8 does not match expected seq num of 1 for given public key and log"
)]
#[case::no_entries_yet(&[(0, 1), (0, 2), (0, 3), (0, 4), (0, 5), (0, 6), (0, 7), (0, 8)], (0, 8))]
#[tokio::test]
async fn validate_against_entries_in_store(
schema: Schema,
#[case] entries_to_remove: &[LogIdAndSeqNum],
#[case] entry_to_publish: LogIdAndSeqNum,
#[from(populate_store_config)]
#[with(8, 1, 1)]
config: PopulateStoreConfig,
) {
let store = MemoryStore::default();
let (key_pairs, _) = populate_store(&store, &config).await;
let public_key = key_pairs[0].public_key();
let entry = store
.get_entry_at_seq_num(
&public_key,
&LogId::new(entry_to_publish.0),
&SeqNum::new(entry_to_publish.1).unwrap(),
)
.await
.unwrap()
.unwrap();
remove_operations(&store, &public_key, entries_to_remove);
remove_entries(&store, &public_key, entries_to_remove);
let operation = entry.payload().unwrap();
let plain_operation = decode_operation(&operation).unwrap();
validate_entry_and_operation(&store, &schema, &entry, &entry, &plain_operation, operation)
.await
.map_err(|err| err.to_string())
.unwrap();
}
#[rstest]
#[should_panic(
expected = "Expected skiplink entry not found in store: public key 2f8e50c2ede6d936ecc3144187ff1c273808185cfbc5ff3d3748d1ff7353fc96, log id 0, seq num 4"
)]
#[case::next_args_skiplink_missing(&[(0, 4), (0, 7), (0, 8)], (0, 7))]
#[tokio::test]
async fn next_args_skiplink_missing(
schema: Schema,
#[case] entries_to_remove: &[LogIdAndSeqNum],
#[case] entry_to_publish: LogIdAndSeqNum,
#[from(populate_store_config)]
#[with(8, 1, 1)]
config: PopulateStoreConfig,
) {
let store = MemoryStore::default();
let (key_pairs, _) = populate_store(&store, &config).await;
let public_key = key_pairs[0].public_key();
let entry = store
.get_entry_at_seq_num(
&public_key,
&LogId::new(entry_to_publish.0),
&SeqNum::new(entry_to_publish.1).unwrap(),
)
.await
.unwrap()
.unwrap();
remove_operations(&store, &public_key, entries_to_remove);
remove_entries(&store, &public_key, entries_to_remove);
let operation = entry.payload.unwrap();
publish(
&store,
&schema,
&entry.encoded_entry,
&decode_operation(&operation).unwrap(),
&operation,
)
.await
.map_err(|err| err.to_string())
.unwrap();
}
#[rstest]
#[case::ok_single_writer(
&[],
&[(0, 8)],
KeyPair::from_private_key_str(PRIVATE_KEY).unwrap()
)]
#[case::ok_many_previous(
&[],
&[(0, 8), (0, 7), (0, 6)],
KeyPair::from_private_key_str(PRIVATE_KEY).unwrap()
)]
#[case::ok_multi_writer(
&[],
&[(0, 8)],
KeyPair::new()
)]
#[should_panic(
expected = "Previous operation 00209038901221ce1002f023461f1530adf632081d9fcd2da1082c7c91fdcb534d03 not found in store"
)]
#[case::previous_operation_missing(
&[(0, 8)],
&[(0, 8)],
KeyPair::from_private_key_str(PRIVATE_KEY).unwrap()
)]
#[should_panic(
expected = "Previous operation 00201971f1257645a2f6d3465f8713991d269709f81a5c6c458168b9461d68af5ecf not found in store"
)]
#[case::one_of_some_previous_missing(
&[(0, 7)],
&[(0, 7), (0, 8)],
KeyPair::from_private_key_str(PRIVATE_KEY).unwrap()
)]
#[should_panic(
expected = "Previous operation 00209038901221ce1002f023461f1530adf632081d9fcd2da1082c7c91fdcb534d03 not found in store"
)]
#[case::one_of_some_previous_missing(
&[(0, 8)],
&[(0, 7), (0, 8)],
KeyPair::from_private_key_str(PRIVATE_KEY).unwrap()
)]
#[should_panic(
expected = "Previous operation 00209038901221ce1002f023461f1530adf632081d9fcd2da1082c7c91fdcb534d03 not found in store"
)]
#[case::missing_previous_operation_multi_writer(
&[(0, 8)],
&[(0, 8)],
KeyPair::new()
)]
#[should_panic(
expected = "Operations in passed document view id originate from different documents"
)]
#[case::previous_invalid_multiple_document_id(
&[],
&[(0, 8), (1, 8)],
KeyPair::from_private_key_str(PRIVATE_KEY).unwrap()
)]
#[tokio::test]
async fn validates_against_operations_in_store(
schema: Schema,
#[case] operations_to_remove: &[LogIdAndSeqNum],
#[case] previous: &[LogIdAndSeqNum],
#[case] key_pair: KeyPair,
#[from(populate_store_config)]
#[with(8, 2, 1)]
config: PopulateStoreConfig,
) {
let store = MemoryStore::default();
let (key_pairs, documents) = populate_store(&store, &config).await;
let existing_author = key_pairs[0].public_key();
let document = documents.first().map(|id| id.as_str().parse().unwrap());
let previous: Vec<OperationId> = previous
.iter()
.filter_map(|(log_id, seq_num)| {
store
.entries
.lock()
.unwrap()
.values()
.find(|entry| {
entry.seq_num().as_u64() == *seq_num
&& entry.log_id().as_u64() == *log_id
&& *entry.public_key() == existing_author
})
.map(|entry| entry.hash().into())
})
.collect();
let document_view_id = DocumentViewId::new(&previous);
let operation = OperationBuilder::new(schema.id())
.action(OperationAction::Update)
.previous(&document_view_id)
.fields(&test_fields())
.build()
.unwrap();
let (backlink, skiplink, seq_num, log_id) =
next_args(&store, &key_pair.public_key(), document.as_ref())
.await
.unwrap();
let encoded_operation = encode_operation(&operation).unwrap();
let encoded_entry = sign_and_encode_entry(
&log_id,
&seq_num,
skiplink.map(Hash::from).as_ref(),
backlink.map(Hash::from).as_ref(),
&encoded_operation,
&key_pair,
)
.unwrap();
remove_operations(&store, &existing_author, operations_to_remove);
let result = publish(
&store,
&schema,
&encoded_entry,
&decode_operation(&encoded_operation).unwrap(),
&encoded_operation,
)
.await;
result.map_err(|err| err.to_string()).unwrap();
}
#[rstest]
#[case::owner_publishes_update_to_correct_log(
LogId::new(0),
KeyPair::from_private_key_str(PRIVATE_KEY).unwrap())
]
#[case::new_author_updates_to_new_log(LogId::new(0), KeyPair::new())]
#[should_panic(
expected = "Entry's claimed log id of 1 does not match existing log id of 0 for given public key and document"
)]
#[case::owner_updates_to_wrong_and_taken_log(LogId::new(1), KeyPair::from_private_key_str(PRIVATE_KEY).unwrap())]
#[should_panic(
expected = "Entry's claimed log id of 2 does not match existing log id of 0 for given public key and document"
)]
#[case::owner_updates_to_wrong_but_free_log(LogId::new(2), KeyPair::from_private_key_str(PRIVATE_KEY).unwrap())]
#[should_panic(
expected = "Entry's claimed log id of 1 does not match expected next log id of 0 for given public key"
)]
#[case::new_author_updates_to_wrong_new_log(LogId::new(1), KeyPair::new())]
#[tokio::test]
async fn new_author_updates_existing_document(
schema: Schema,
#[case] log_id: LogId,
#[case] key_pair: KeyPair,
#[from(populate_store_config)]
#[with(2, 1, 1)]
config: PopulateStoreConfig,
) {
let store = MemoryStore::default();
let (_, documents) = populate_store(&store, &config).await;
let document_id = documents.first().unwrap();
let document_view_id: DocumentViewId = document_id.as_str().parse().unwrap();
let author_performing_update = key_pair.public_key();
let update_operation = OperationBuilder::new(schema.id())
.action(OperationAction::Update)
.previous(&document_view_id)
.fields(&test_fields())
.build()
.unwrap();
let latest_entry = store
.get_latest_entry(&author_performing_update, &log_id)
.await
.unwrap();
let encoded_operation = encode_operation(&update_operation).unwrap();
let encoded_entry = sign_and_encode_entry(
&log_id,
&latest_entry
.as_ref()
.map(|entry| entry.seq_num().clone().next().unwrap())
.unwrap_or_default(),
None,
latest_entry.map(|entry| entry.hash()).as_ref(),
&encoded_operation,
&key_pair,
)
.unwrap();
let result = publish(
&store,
&schema,
&encoded_entry.clone(),
&decode_operation(&encoded_operation).unwrap(),
&encoded_operation,
)
.await;
result.map_err(|err| err.to_string()).unwrap();
let log = store
.get_log_id(&author_performing_update, document_id)
.await
.unwrap();
assert!(log.is_some());
assert_eq!(log.unwrap(), LogId::new(0));
}
#[rstest]
#[case::owner_publishes_to_correct_log(
LogId::new(2),
KeyPair::from_private_key_str(PRIVATE_KEY).unwrap())
]
#[case::new_author_publishes_to_new_log(LogId::new(0), KeyPair::new())]
#[should_panic(
expected = "Entry's claimed seq num of 1 does not match expected seq num of 2 for given public key and log"
)]
#[case::owner_publishes_to_wrong_and_taken_log(
LogId::new(1),
KeyPair::from_private_key_str(PRIVATE_KEY).unwrap())
]
#[should_panic(
expected = "Entry's claimed log id of 3 does not match expected next log id of 2 for given public key"
)]
#[case::owner_publishes_to_wrong_but_free_log(
LogId::new(3),
KeyPair::from_private_key_str(PRIVATE_KEY).unwrap())
]
#[should_panic(
expected = "Entry's claimed log id of 1 does not match expected next log id of 0 for given public key"
)]
#[case::new_author_publishes_to_wrong_new_log(LogId::new(1), KeyPair::new())]
#[tokio::test]
async fn creating_new_document_inserts_log_correctly(
schema: Schema,
#[case] log_id: LogId,
#[case] key_pair: KeyPair,
operation: Operation,
#[from(populate_store_config)]
#[with(1, 2, 1)]
config: PopulateStoreConfig,
) {
let store = MemoryStore::default();
let _ = populate_store(&store, &config).await;
let encoded_operation = encode_operation(&operation).unwrap();
let encoded_entry = sign_and_encode_entry(
&log_id,
&SeqNum::default(),
None,
None,
&encoded_operation,
&key_pair,
)
.unwrap();
let _result = publish(
&store,
&schema,
&encoded_entry,
&decode_operation(&encoded_operation).unwrap(),
&encoded_operation,
)
.await
.map_err(|err| err.to_string())
.unwrap();
let public_key = key_pair.public_key();
let document_id = encoded_entry.hash().into();
let retrieved_log_id = store
.get_log_id(&public_key, &document_id)
.await
.expect("Retrieve log id for document");
assert_eq!(log_id, retrieved_log_id.unwrap())
}
#[rstest]
#[should_panic(
expected = "You are trying to update or delete a document which has been deleted"
)]
#[case(KeyPair::from_private_key_str(PRIVATE_KEY).unwrap())]
#[should_panic(
expected = "You are trying to update or delete a document which has been deleted"
)]
#[case(KeyPair::new())]
#[tokio::test]
async fn validates_that_document_is_deleted(
schema: Schema,
#[case] key_pair: KeyPair,
#[from(populate_store_config)]
#[with(2, 1, 1, true)]
config: PopulateStoreConfig,
) {
let store = MemoryStore::default();
let (_, documents) = populate_store(&store, &config).await;
let document_id = documents.first().unwrap();
let document_view_id: DocumentViewId = document_id.as_str().parse().unwrap();
let author_performing_update = key_pair.public_key();
let delete_operation = OperationBuilder::new(schema.id())
.action(OperationAction::Delete)
.previous(&document_view_id)
.build()
.unwrap();
let latest_entry = store
.get_latest_entry(&author_performing_update, &LogId::default())
.await
.unwrap();
let encoded_operation = encode_operation(&delete_operation).unwrap();
let encoded_entry = sign_and_encode_entry(
&LogId::default(),
&latest_entry
.as_ref()
.map(|entry| entry.seq_num().clone().next().unwrap())
.unwrap_or_default(),
None,
latest_entry.map(|entry| entry.hash()).as_ref(),
&encoded_operation,
&key_pair,
)
.unwrap();
let result = publish(
&store,
&schema,
&encoded_entry.clone(),
&decode_operation(&encoded_operation).unwrap(),
&encoded_operation,
)
.await;
result.map_err(|err| err.to_string()).unwrap();
}
#[rstest]
#[tokio::test]
async fn publish_many_entries(
#[with(vec![("name".to_string(), FieldType::String)])] schema: Schema,
key_pair: KeyPair,
) {
let store = MemoryStore::default();
let num_of_entries = 13;
let mut document_id: Option<DocumentId> = None;
let public_key = key_pair.public_key();
for index in 0..num_of_entries {
let document_view_id: Option<DocumentViewId> =
document_id.clone().map(|id| id.as_str().parse().unwrap());
let (backlink, skiplink, seq_num, log_id) =
next_args(&store, &public_key, document_view_id.as_ref())
.await
.unwrap();
let schema_id = schema.id().to_owned();
let operation = if index == 0 {
create_operation(
vec![("name", OperationValue::String("Panda".to_string()))],
schema_id,
)
} else if index == (num_of_entries - 1) {
delete_operation(backlink.clone().unwrap().into(), schema_id)
} else {
update_operation(
vec![("name", OperationValue::String("🐼".to_string()))],
backlink.clone().unwrap().into(),
schema_id,
)
};
let encoded_operation = encode_operation(&operation).unwrap();
let encoded_entry = sign_and_encode_entry(
&log_id,
&seq_num,
skiplink.map(Hash::from).as_ref(),
backlink.map(Hash::from).as_ref(),
&encoded_operation,
&key_pair,
)
.unwrap();
if index == 0 {
document_id = Some(encoded_entry.hash().into());
}
let result = publish(
&store,
&schema,
&encoded_entry.clone(),
&decode_operation(&encoded_operation).unwrap(),
&encoded_operation,
)
.await;
assert!(result.is_ok());
let (_, _, next_seq_num, _) = result.unwrap();
let mut previous_seq_num = seq_num;
assert_eq!(next_seq_num, previous_seq_num.next().unwrap());
assert_eq!(log_id, LogId::default());
}
}
#[rstest]
#[should_panic(
expected = "Max sequence number reached for public key 2f8e50c2ede6d936ecc3144187ff1c273808185cfbc5ff3d3748d1ff7353fc96 log 0"
)]
#[tokio::test]
async fn validates_max_seq_num_reached(
schema: Schema,
key_pair: KeyPair,
#[from(populate_store_config)]
#[with(2, 1, 1, false)]
config: PopulateStoreConfig,
) {
let store = MemoryStore::default();
let _ = populate_store(&store, &config).await;
let public_key = key_pair.public_key();
let entry_two = store
.get_entry_at_seq_num(&public_key, &LogId::default(), &SeqNum::new(2).unwrap())
.await
.unwrap()
.unwrap();
let encoded_entry = sign_and_encode_entry(
&LogId::default(),
&SeqNum::new(18446744073709551611).unwrap(),
Some(&random_hash()),
Some(&random_hash()),
entry_two.payload.as_ref().unwrap(),
&key_pair,
)
.unwrap();
let skiplink = StorageEntry::new(&encoded_entry, entry_two.payload.as_ref());
store
.entries
.lock()
.unwrap()
.insert(skiplink.hash(), skiplink.clone());
let encoded_entry = sign_and_encode_entry(
&LogId::default(),
&SeqNum::new(u64::MAX - 1).unwrap(),
None,
Some(&random_hash()),
entry_two.payload.as_ref().unwrap(),
&key_pair,
)
.unwrap();
let backlink = StorageEntry::new(&encoded_entry, entry_two.payload.as_ref());
store
.entries
.lock()
.unwrap()
.insert(backlink.hash(), backlink.clone());
let encoded_entry = sign_and_encode_entry(
&LogId::default(),
&SeqNum::new(u64::MAX).unwrap(),
Some(&skiplink.hash()),
Some(&backlink.hash()),
entry_two.payload.as_ref().unwrap(),
&key_pair,
)
.unwrap();
let operation = &entry_two.payload.unwrap();
let result = publish(
&store,
&schema,
&encoded_entry.clone(),
&decode_operation(operation).unwrap(),
operation,
)
.await;
let entry_at_max_seq_num = store.get_entry(&encoded_entry.hash()).await.unwrap();
assert!(entry_at_max_seq_num.is_none());
result.map_err(|err| err.to_string()).unwrap();
}
#[rstest]
#[should_panic(
expected = "Operation 00206a28f82fc8d27671b31948117af7501a5a0de709b0cf9bc3586b67abe67ac29a claims incorrect schema my_wrong_schema_name_"
)]
#[tokio::test]
async fn validates_incorrect_schema_id_in_previous_operation(
#[from(populate_store_config)]
#[with(1, 1, 1, false)]
config: PopulateStoreConfig,
) {
let store = MemoryStore::default();
let (key_pairs, document_ids) = populate_store(&store, &config).await;
let document_id = document_ids.get(0).unwrap().to_owned();
let key_pair = key_pairs.get(0).unwrap().to_owned();
let create_view_id: DocumentViewId = document_id.as_str().parse().unwrap();
let schema_name = SchemaName::new("my_wrong_schema_name").expect("Valid schema name");
let schema_id = SchemaId::new_application(&schema_name, &random_document_view_id());
let schema = schema(
vec![("age".into(), FieldType::Integer)],
schema_id.clone(),
"Schema with different id",
);
let update_with_different_schema_id = update_operation(
vec![("age", OperationValue::Integer(21))],
create_view_id,
schema_id,
);
let result =
send_to_store(&store, &update_with_different_schema_id, &schema, &key_pair).await;
result.map_err(|err| err.to_string()).unwrap();
}
}