use crate::protocol::peer::Peer;
use crate::protocol::posting::encoded_post_feedback::{EncodedPostFeedbackV1, EncodedPostFeedbackViewV1, ENTRY_SIZE};
use crate::tools::time::TimeMillis;
use crate::tools::types::{Hash, Id, Pow, Salt, Signature, SignatureKey, VerificationKey};
use std::collections::HashMap;
use crate::tools::{hashing, json, signing};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use serde::{Deserialize, Serialize};
use std::fmt::{Debug, Display};
use crate::anyhow_assert_eq;
#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
pub struct EncodedPostBundleFeedbackHeaderV1 {
pub time_millis: TimeMillis, pub location_id: Id,
pub feedbacks_bytes_hash: Hash, pub peer: Peer,
pub signature: Signature, }
impl EncodedPostBundleFeedbackHeaderV1 {
pub fn get_hash_for_signing(&self) -> Hash {
let time_millis_be = self.time_millis.encode_be();
let hash_input: Vec<&[u8]> = vec![
time_millis_be.as_ref(),
self.location_id.as_ref(),
self.feedbacks_bytes_hash.as_ref(),
self.peer.signature.as_ref(),
];
hashing::hash_multiple(&hash_input)
}
pub fn signature_generate(&mut self, signature_key: &SignatureKey) {
let hash = self.get_hash_for_signing();
self.signature = signing::sign(signature_key, hash.as_ref());
}
pub fn signature_verify(&self) -> anyhow::Result<()> {
let hash = self.get_hash_for_signing();
let verification_key = VerificationKey::from_bytes(&self.peer.verification_key_bytes)?;
signing::verify(&verification_key, &self.signature, hash.as_ref())
}
pub fn verify(&self) -> anyhow::Result<()> {
self.signature_verify()?;
Ok(())
}
}
impl Display for EncodedPostBundleFeedbackHeaderV1 {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "EncodedPostBundleFeedbackHeaderV1 [ location_id: {}, time_millis: {} ]", self.location_id, self.time_millis)
}
}
#[derive(Debug, PartialEq, Clone)]
pub struct EncodedPostBundleFeedbackV1 {
pub header: EncodedPostBundleFeedbackHeaderV1,
pub feedbacks_bytes: Bytes, }
impl Display for EncodedPostBundleFeedbackV1 {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "EncodedPostBundleV1 [ header: {}, length: {} ]", self.header, self.feedbacks_bytes.len())
}
}
impl EncodedPostBundleFeedbackV1 {
pub fn to_bytes(&self) -> anyhow::Result<Bytes> {
let mut bytes = BytesMut::new();
let header = json::struct_to_bytes(&self.header)?;
bytes.put_u8(1u8); bytes.put_u64(header.len() as u64);
bytes.put_u64(self.feedbacks_bytes.len() as u64);
bytes.put_slice(header.as_ref());
bytes.put_slice(self.feedbacks_bytes.as_ref());
Ok(bytes.freeze())
}
pub fn from_bytes(mut bytes: Bytes) -> anyhow::Result<Self> {
if bytes.remaining() < 1 {
anyhow::bail!("Invalid buffer: missing post_bundle version");
}
let version = bytes.get_u8();
if 1 != version {
anyhow::bail!("Invalid buffer: unknown post_bundle version");
}
if bytes.remaining() < 16 {
anyhow::bail!("Invalid buffer: missing post_bundle lengths");
}
let header_len = bytes.get_u64() as usize;
let body_len = bytes.get_u64() as usize;
let total_length = header_len.checked_add(body_len).ok_or_else(|| anyhow::anyhow!("total_length overflow"))?;
if bytes.remaining() < total_length {
anyhow::bail!("Invalid buffer: post_bundle data truncated");
}
let header_bytes = bytes.copy_to_bytes(header_len);
let body_bytes = bytes.copy_to_bytes(body_len);
let header = json::bytes_to_struct(&header_bytes)?;
Ok(EncodedPostBundleFeedbackV1 { header, feedbacks_bytes: body_bytes })
}
pub fn verify(&self) -> anyhow::Result<()> {
self.header.verify()?;
if !self.feedbacks_bytes.len().is_multiple_of(ENTRY_SIZE) {
anyhow::bail!(
"feedbacks_bytes length ({}) is not a multiple of ENTRY_SIZE ({})",
self.feedbacks_bytes.len(),
ENTRY_SIZE
);
}
let feedbacks_bytes_hash = hashing::hash(self.feedbacks_bytes.as_ref());
anyhow_assert_eq!(feedbacks_bytes_hash, self.header.feedbacks_bytes_hash, "feedbacks_bytes_hash mismatch");
let num_entries = self.feedbacks_bytes.len() / ENTRY_SIZE;
for i in 0..num_entries {
let entry_bytes = &self.feedbacks_bytes[i * ENTRY_SIZE..(i + 1) * ENTRY_SIZE];
let feedback = EncodedPostFeedbackV1::decode_from_bytes(&mut &entry_bytes[..])
.map_err(|e| anyhow::anyhow!("feedback {}: invalid entry: {}", i, e))?;
feedback.pow_verify()
.map_err(|e| anyhow::anyhow!("feedback {}: pow verification failed: {}", i, e))?;
}
Ok(())
}
pub fn get_post_pow_for_feedback_type(&self, post_id: &Id, feedback_type: u8) -> Pow {
for view in EncodedPostFeedbackViewV1::iter(&self.feedbacks_bytes).flatten() {
if view.feedback_type() == feedback_type && view.post_id_bytes() == post_id.as_ref() {
return view.pow();
}
}
Pow(0)
}
pub fn merge(bundles: &[Self]) -> Option<Self> {
let header = bundles.iter().max_by_key(|b| b.header.time_millis)?.header.clone();
let mut global_max: HashMap<(Id, u8), EncodedPostFeedbackV1> = HashMap::new();
for bundle in bundles {
for view in EncodedPostFeedbackViewV1::iter(&bundle.feedbacks_bytes) {
let Ok(view) = view else { continue };
let Ok(post_id) = Id::from_slice(view.post_id_bytes()) else { continue };
let key = (post_id, view.feedback_type());
let entry = global_max.entry(key).or_insert_with(|| EncodedPostFeedbackV1 {
post_id,
feedback_type: view.feedback_type(),
salt: Salt::from_slice(view.salt_bytes()).unwrap_or_else(|_| Salt::zero()),
pow: view.pow(),
});
if view.pow() > entry.pow {
entry.salt = Salt::from_slice(view.salt_bytes()).unwrap_or_else(|_| Salt::zero());
entry.pow = view.pow();
}
}
}
let mut feedbacks_bytes_mut = Vec::new();
for feedback in global_max.values() {
let _ = feedback.append_encode_to_bytes(&mut feedbacks_bytes_mut);
}
Some(Self { header, feedbacks_bytes: Bytes::from(feedbacks_bytes_mut) })
}
pub fn get_post_pows(&self, post_id: &Id) -> [Pow; 256] {
let mut result = [Pow(0); 256];
for view in EncodedPostFeedbackViewV1::iter(&self.feedbacks_bytes).flatten() {
if view.post_id_bytes() == post_id.as_ref() {
result[view.feedback_type() as usize] = view.pow();
}
}
result
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::tools::server_id::ServerId;
use crate::tools::time_provider::time_provider::{RealTimeProvider, TimeProvider};
use crate::tools::pow;
use crate::tools::pow_generator::pow_generator::PowGenerator;
use crate::tools::pow_generator::single_threaded_pow_generator::SingleThreadedPowGenerator;
async fn make_valid_bundle() -> anyhow::Result<EncodedPostBundleFeedbackV1> {
let time_provider = RealTimeProvider;
let pow_generator = SingleThreadedPowGenerator::new();
let server_id = ServerId::new("own_pow", &time_provider, Pow(0), true, &pow_generator).await?;
let peer = server_id.to_peer(&time_provider)?;
let post_id = Id::random();
let feedback_type = 1u8;
let data_hash = pow::pow_compute_data_hash(&[post_id.as_bytes(), &[feedback_type]]);
let (salt, achieved_pow, _) = pow_generator.generate_best_effort("make_valid_bundle_feedback", 1, Pow(0), data_hash).await?;
let feedback = EncodedPostFeedbackV1::new(post_id, feedback_type, salt, achieved_pow);
let mut feedbacks_bytes_mut = Vec::new();
feedback.append_encode_to_bytes(&mut feedbacks_bytes_mut)?;
let feedbacks_bytes = Bytes::from(feedbacks_bytes_mut);
let mut header = EncodedPostBundleFeedbackHeaderV1 {
time_millis: time_provider.current_time_millis(),
location_id: Id::random(),
feedbacks_bytes_hash: hashing::hash(feedbacks_bytes.as_ref()),
peer,
signature: Signature::zero(),
};
header.signature_generate(&server_id.keys.signature_key);
Ok(EncodedPostBundleFeedbackV1 { header, feedbacks_bytes })
}
#[tokio::test]
async fn test_verify_valid_bundle() -> anyhow::Result<()> {
let bundle = make_valid_bundle().await?;
bundle.verify()
}
#[tokio::test]
async fn test_verify_bad_header_signature() -> anyhow::Result<()> {
let mut bundle = make_valid_bundle().await?;
bundle.header.signature = Signature::zero();
assert!(bundle.verify().is_err());
Ok(())
}
#[tokio::test]
async fn test_verify_wrong_feedbacks_hash() -> anyhow::Result<()> {
let mut bundle = make_valid_bundle().await?;
let pow_generator = SingleThreadedPowGenerator::new();
let server_id = ServerId::new("own_pow", &RealTimeProvider, Pow(0), true, &pow_generator).await?;
bundle.header.feedbacks_bytes_hash = hashing::hash(b"wrong");
bundle.header.signature_generate(&server_id.keys.signature_key); assert!(bundle.verify().is_err());
Ok(())
}
#[tokio::test]
async fn test_verify_partial_entry() -> anyhow::Result<()> {
let mut bundle = make_valid_bundle().await?;
let pow_generator = SingleThreadedPowGenerator::new();
let server_id = ServerId::new("own_pow", &RealTimeProvider, Pow(0), true, &pow_generator).await?;
let mut bytes = bundle.feedbacks_bytes.to_vec();
bytes.push(0u8);
bundle.feedbacks_bytes = Bytes::from(bytes);
bundle.header.feedbacks_bytes_hash = hashing::hash(bundle.feedbacks_bytes.as_ref());
bundle.header.signature_generate(&server_id.keys.signature_key);
assert!(bundle.verify().is_err());
Ok(())
}
#[tokio::test]
async fn test_verify_wrong_pow() -> anyhow::Result<()> {
let mut bundle = make_valid_bundle().await?;
let pow_generator = SingleThreadedPowGenerator::new();
let server_id = ServerId::new("own_pow", &RealTimeProvider, Pow(0), true, &pow_generator).await?;
let mut bytes = bundle.feedbacks_bytes.to_vec();
let last = bytes.last_mut().unwrap();
*last = last.wrapping_add(1);
bundle.feedbacks_bytes = Bytes::from(bytes);
bundle.header.feedbacks_bytes_hash = hashing::hash(bundle.feedbacks_bytes.as_ref());
bundle.header.signature_generate(&server_id.keys.signature_key);
assert!(bundle.verify().is_err());
Ok(())
}
async fn make_bundle_with_feedbacks(time_millis: TimeMillis, feedbacks: &[(Id, u8, Pow)]) -> anyhow::Result<EncodedPostBundleFeedbackV1> {
let time_provider = RealTimeProvider;
let pow_generator = SingleThreadedPowGenerator::new();
let server_id = ServerId::new("own_pow", &time_provider, Pow(0), true, &pow_generator).await?;
let peer = server_id.to_peer(&time_provider)?;
let mut feedbacks_bytes_mut = Vec::new();
for (post_id, feedback_type, pow) in feedbacks {
EncodedPostFeedbackV1::new(*post_id, *feedback_type, Salt::random(), *pow).append_encode_to_bytes(&mut feedbacks_bytes_mut)?;
}
let feedbacks_bytes = Bytes::from(feedbacks_bytes_mut);
let mut header = EncodedPostBundleFeedbackHeaderV1 {
time_millis,
location_id: Id::random(),
feedbacks_bytes_hash: hashing::hash(feedbacks_bytes.as_ref()),
peer,
signature: Signature::zero(),
};
header.signature_generate(&server_id.keys.signature_key);
Ok(EncodedPostBundleFeedbackV1 { header, feedbacks_bytes })
}
#[tokio::test]
async fn get_post_pows_reports_pow_per_feedback_type() -> anyhow::Result<()> {
let post_a = Id::random();
let post_b = Id::random();
let bundle = make_bundle_with_feedbacks(TimeMillis(1000), &[
(post_a, 1, Pow(10)),
(post_a, 2, Pow(20)),
(post_b, 1, Pow(30)),
]).await?;
assert_eq!(Pow(10), bundle.get_post_pow_for_feedback_type(&post_a, 1));
assert_eq!(Pow(20), bundle.get_post_pow_for_feedback_type(&post_a, 2));
assert_eq!(Pow(0), bundle.get_post_pow_for_feedback_type(&post_a, 3), "absent type => Pow(0)");
assert_eq!(Pow(0), bundle.get_post_pow_for_feedback_type(&post_b, 2), "post_b has no type-2 feedback");
let pows_a = bundle.get_post_pows(&post_a);
assert_eq!(Pow(10), pows_a[1]);
assert_eq!(Pow(20), pows_a[2]);
assert_eq!(Pow(0), pows_a[0]);
assert_eq!(Pow(0), pows_a[3]);
let pows_b = bundle.get_post_pows(&post_b);
assert_eq!(Pow(30), pows_b[1]);
assert_eq!(Pow(0), pows_b[2]);
Ok(())
}
#[tokio::test]
async fn merge_keeps_highest_pow_per_post_and_type() -> anyhow::Result<()> {
let post_a = Id::random();
let bundle_low = make_bundle_with_feedbacks(TimeMillis(1000), &[(post_a, 1, Pow(10)), (post_a, 2, Pow(5))]).await?;
let bundle_high = make_bundle_with_feedbacks(TimeMillis(2000), &[(post_a, 1, Pow(25))]).await?;
let merged = EncodedPostBundleFeedbackV1::merge(&[bundle_low, bundle_high]).expect("non-empty input");
assert_eq!(Pow(25), merged.get_post_pow_for_feedback_type(&post_a, 1), "the higher pow for (post_a, type 1) must win");
assert_eq!(Pow(5), merged.get_post_pow_for_feedback_type(&post_a, 2), "the only pow for (post_a, type 2) must be preserved");
Ok(())
}
#[tokio::test]
async fn encoded_post_bundle_header_v1_to_from_bytes_roundtrip() -> anyhow::Result<()> {
let time_provider = RealTimeProvider;
let pow_generator = SingleThreadedPowGenerator::new();
let server_id = ServerId::new("own_pow", &time_provider, Pow(0), true, &pow_generator).await?;
let peer = server_id.to_peer(&time_provider)?;
let feedbacks_bytes = Bytes::new();
let mut header = EncodedPostBundleFeedbackHeaderV1 {
time_millis: TimeMillis::random(),
location_id: Id::random(),
feedbacks_bytes_hash: hashing::hash(feedbacks_bytes.as_ref()),
peer,
signature: Signature::zero(),
};
header.signature_generate(&server_id.keys.signature_key);
header.verify()?;
let bundle = EncodedPostBundleFeedbackV1 { header, feedbacks_bytes };
let bytes1 = bundle.to_bytes()?;
let decoded = EncodedPostBundleFeedbackV1::from_bytes(bytes1.clone())?;
assert_eq!(bundle, decoded);
let bytes2 = decoded.to_bytes()?;
assert_eq!(bytes1, bytes2);
Ok(())
}
}