use rayon::prelude::*;
use serde::{Deserialize, Serialize};
use snafu::{ensure, OptionExt, ResultExt};
use ssb_legacy_msg_data::{
json::from_slice,
value::{ContentValue, Value},
LegacyF64,
};
use ssb_multiformats::multihash::Multihash;
use crate::error::{
AuthorsDidNotMatch, FirstMessageDidNotHavePreviousOfNull, FirstMessageDidNotHaveSequenceOfOne,
ForkedFeed, InvalidBase64, InvalidHashFunction, InvalidMessage, InvalidMessageValueLength,
InvalidMessageValueOrder, InvalidPreviousMessage, InvalidSequenceNumber, PreviousWasNull,
Result,
};
use crate::utils;
#[derive(Serialize, Deserialize, Debug)]
#[serde(deny_unknown_fields)]
pub struct SsbMessageValue {
pub previous: Option<Multihash>,
pub author: String,
pub sequence: u64,
pub timestamp: LegacyF64,
pub hash: String,
pub content: ContentValue,
pub signature: String,
}
pub fn par_validate_message_value_hash_chain_of_feed<T: AsRef<[u8]>, U: AsRef<[u8]>>(
messages: &[T],
previous: Option<U>,
) -> Result<()>
where
[T]: ParallelSlice<T>,
T: Sync,
U: Sync + Send + Copy,
{
messages
.par_iter()
.enumerate()
.try_fold(
|| (),
|_, (idx, msg)| {
if idx == 0 {
let prev = previous.map(|prev| prev.as_ref().to_owned());
validate_message_value_hash_chain(msg.as_ref(), prev)
} else {
validate_message_value_hash_chain(
msg.as_ref(),
Some(messages[idx - 1].as_ref()),
)
}
},
)
.try_reduce(|| (), |_, _| Ok(()))
}
pub fn validate_message_value_hash_chain<T: AsRef<[u8]>, U: AsRef<[u8]>>(
message_bytes: T,
previous_msg_bytes: Option<U>,
) -> Result<()> {
let message_bytes = message_bytes.as_ref();
let (previous_value, previous_key) = match previous_msg_bytes {
Some(message) => {
let previous = from_slice::<SsbMessageValue>(message.as_ref()).context(
InvalidPreviousMessage {
message: message.as_ref().to_owned(),
},
)?;
let previous_key = utils::multihash_from_bytes(message.as_ref());
(Some(previous), Some(previous_key))
}
None => (None, None),
};
let message_value = from_slice::<SsbMessageValue>(message_bytes).context(InvalidMessage {
message: message_bytes.to_owned(),
})?;
message_value_common_checks(
&message_value,
previous_value.as_ref(),
message_bytes,
previous_key.as_ref(),
true,
)?;
Ok(())
}
pub fn validate_message_value<T: AsRef<[u8]>>(message_bytes: T) -> Result<()> {
let message_bytes = message_bytes.as_ref();
let message_value = from_slice::<SsbMessageValue>(message_bytes).context(InvalidMessage {
message: message_bytes.to_owned(),
})?;
message_value_common_checks(&message_value, None, message_bytes, None, false)?;
Ok(())
}
pub fn par_validate_message_value<T: AsRef<[u8]>>(messages: &[T]) -> Result<()>
where
[T]: ParallelSlice<T>,
T: Sync,
{
messages
.par_iter()
.enumerate()
.try_fold(|| (), |_, (_idx, msg)| validate_message_value(msg.as_ref()))
.try_reduce(|| (), |_, _| Ok(()))
}
pub fn validate_ooo_message_value_hash_chain<T: AsRef<[u8]>, U: AsRef<[u8]>>(
message_bytes: T,
previous_msg_bytes: Option<U>,
) -> Result<()> {
let message_bytes = message_bytes.as_ref();
let previous_value = match previous_msg_bytes {
Some(message) => {
let previous = from_slice::<SsbMessageValue>(message.as_ref()).context(
InvalidPreviousMessage {
message: message.as_ref().to_owned(),
},
)?;
Some(previous)
}
None => (None),
};
let message_value = from_slice::<SsbMessageValue>(message_bytes).context(InvalidMessage {
message: message_bytes.to_owned(),
})?;
message_value_common_checks(&message_value, None, message_bytes, None, false)?;
if let Some(previous_value) = previous_value.as_ref() {
ensure!(
message_value.author == previous_value.author,
AuthorsDidNotMatch {
previous_author: previous_value.author.clone(),
author: message_value.author
}
);
}
Ok(())
}
pub fn par_validate_ooo_message_value_hash_chain_of_feed<T: AsRef<[u8]>, U: AsRef<[u8]>>(
messages: &[T],
previous: Option<U>,
) -> Result<()>
where
[T]: ParallelSlice<T>,
T: Sync,
U: Sync + Send + Copy,
{
messages
.par_iter()
.enumerate()
.try_fold(
|| (),
|_, (idx, msg)| {
if idx == 0 {
let prev = previous.map(|prev| prev.as_ref().to_owned());
validate_ooo_message_value_hash_chain(msg.as_ref(), prev)
} else {
validate_ooo_message_value_hash_chain(
msg.as_ref(),
Some(messages[idx - 1].as_ref()),
)
}
},
)
.try_reduce(|| (), |_, _| Ok(()))
}
pub fn message_value_common_checks(
message_value: &SsbMessageValue,
previous_value: Option<&SsbMessageValue>,
message_bytes: &[u8],
previous_key: Option<&Multihash>,
check_previous: bool,
) -> Result<()> {
ensure!(
utils::is_correct_order(message_bytes),
InvalidMessageValueOrder {
message: message_bytes.to_owned()
}
);
ensure!(
message_value.hash == "sha256",
InvalidHashFunction {
message: message_bytes.to_owned()
}
);
if let Value::String(private_msg) = &message_value.content.0 {
ensure!(
utils::is_canonical_base64(private_msg),
InvalidBase64 {
message: message_bytes,
}
);
}
if check_previous {
if let Some(previous_value) = previous_value {
ensure!(
message_value.author == previous_value.author,
AuthorsDidNotMatch {
previous_author: previous_value.author.clone(),
author: message_value.author.clone()
}
);
let expected_sequence = previous_value.sequence + 1;
ensure!(
message_value.sequence == expected_sequence,
InvalidSequenceNumber {
message: message_bytes.to_owned(),
actual: message_value.sequence,
expected: expected_sequence
}
);
ensure!(
message_value.previous.as_ref().context(PreviousWasNull)?
== previous_key.expect("expected the previous key to be Some(key), was None"),
ForkedFeed {
previous_seq: previous_value.sequence
}
);
} else {
ensure!(
message_value.sequence == 1,
FirstMessageDidNotHaveSequenceOfOne {
message: message_bytes.to_owned()
}
);
ensure!(
message_value.previous.is_none(),
FirstMessageDidNotHavePreviousOfNull {
message: message_bytes.to_owned()
}
);
};
}
ensure!(
utils::is_correct_length(message_value)?,
InvalidMessageValueLength {
message: message_bytes.to_owned()
}
);
Ok(())
}
#[cfg(test)]
mod tests {
use crate::message_value::{
par_validate_message_value, par_validate_message_value_hash_chain_of_feed,
par_validate_ooo_message_value_hash_chain_of_feed, validate_message_value,
validate_message_value_hash_chain, validate_ooo_message_value_hash_chain,
};
use crate::test_data::{
MESSAGE_VALUE_1, MESSAGE_VALUE_2, MESSAGE_VALUE_3, MESSAGE_VALUE_3_INCORRECT_AUTHOR,
};
#[test]
fn it_works_first_message_value() {
assert!(
validate_message_value_hash_chain::<_, &[u8]>(MESSAGE_VALUE_1.as_bytes(), None).is_ok()
);
}
#[test]
fn it_works_second_message_value() {
assert!(validate_message_value_hash_chain(
MESSAGE_VALUE_2.as_bytes(),
Some(MESSAGE_VALUE_1.as_bytes())
)
.is_ok());
}
#[test]
fn it_validates_an_ordered_sequence_of_message_values_in_parallel() {
let messages = [
MESSAGE_VALUE_1.as_bytes(),
MESSAGE_VALUE_2.as_bytes(),
MESSAGE_VALUE_3.as_bytes(),
];
let result = par_validate_message_value_hash_chain_of_feed::<_, &[u8]>(&messages[..], None);
assert!(result.is_ok());
}
#[test]
fn it_validates_a_single_message_value() {
assert!(validate_message_value(MESSAGE_VALUE_2.as_bytes()).is_ok());
}
#[test]
fn it_validates_message_values_in_parallel() {
let messages = [MESSAGE_VALUE_1.as_bytes(), MESSAGE_VALUE_2.as_bytes()];
let result = par_validate_message_value(&messages[..]);
assert!(result.is_ok());
}
#[test]
fn it_validates_a_pair_of_ooo_message_values() {
assert!(validate_ooo_message_value_hash_chain(
MESSAGE_VALUE_2.as_bytes(),
Some(MESSAGE_VALUE_3.as_bytes())
)
.is_ok());
}
#[test]
fn it_validates_ooo_message_values_in_parallel() {
let messages = [
MESSAGE_VALUE_3.as_bytes(),
MESSAGE_VALUE_1.as_bytes(),
MESSAGE_VALUE_2.as_bytes(),
];
let result =
par_validate_ooo_message_value_hash_chain_of_feed::<_, &[u8]>(&messages[..], None);
assert!(result.is_ok());
}
#[test]
fn it_validates_message_values_from_different_authors_in_parallel() {
let messages = [
MESSAGE_VALUE_2.as_bytes(),
MESSAGE_VALUE_3_INCORRECT_AUTHOR.as_bytes(),
];
let result = par_validate_message_value(&messages[..]);
assert!(result.is_ok());
}
}