hashiverse_lib/protocol/posting/
encoded_post_bundle_feedback.rs1use crate::protocol::peer::Peer;
25use crate::protocol::posting::encoded_post_feedback::{EncodedPostFeedbackV1, EncodedPostFeedbackViewV1, ENTRY_SIZE};
26use crate::tools::time::TimeMillis;
27use crate::tools::types::{Hash, Id, Pow, Salt, Signature, SignatureKey, VerificationKey};
28use std::collections::HashMap;
29use crate::tools::{hashing, json, signing};
30use bytes::{Buf, BufMut, Bytes, BytesMut};
31use serde::{Deserialize, Serialize};
32use std::fmt::{Debug, Display};
33use crate::anyhow_assert_eq;
34
35#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
37pub struct EncodedPostBundleFeedbackHeaderV1 {
38 pub time_millis: TimeMillis, pub location_id: Id,
40 pub feedbacks_bytes_hash: Hash, pub peer: Peer,
42
43 pub signature: Signature, }
45
46impl EncodedPostBundleFeedbackHeaderV1 {
47 pub fn get_hash_for_signing(&self) -> Hash {
48 let time_millis_be = self.time_millis.encode_be();
49
50 let hash_input: Vec<&[u8]> = vec![
51 time_millis_be.as_ref(),
52 self.location_id.as_ref(),
53 self.feedbacks_bytes_hash.as_ref(),
54 self.peer.signature.as_ref(),
55 ];
56
57 hashing::hash_multiple(&hash_input)
58 }
59
60 pub fn signature_generate(&mut self, signature_key: &SignatureKey) {
61 let hash = self.get_hash_for_signing();
62 self.signature = signing::sign(signature_key, hash.as_ref());
63 }
64
65 pub fn signature_verify(&self) -> anyhow::Result<()> {
66 let hash = self.get_hash_for_signing();
67 let verification_key = VerificationKey::from_bytes(&self.peer.verification_key_bytes)?;
68 signing::verify(&verification_key, &self.signature, hash.as_ref())
69 }
70
71 pub fn verify(&self) -> anyhow::Result<()> {
72 self.signature_verify()?;
73 Ok(())
74 }
75}
76
77impl Display for EncodedPostBundleFeedbackHeaderV1 {
78 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
79 write!(f, "EncodedPostBundleFeedbackHeaderV1 [ location_id: {}, time_millis: {} ]", self.location_id, self.time_millis)
80 }
81}
82
83#[derive(Debug, PartialEq, Clone)]
84pub struct EncodedPostBundleFeedbackV1 {
85 pub header: EncodedPostBundleFeedbackHeaderV1,
86 pub feedbacks_bytes: Bytes, }
88impl Display for EncodedPostBundleFeedbackV1 {
89 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
90 write!(f, "EncodedPostBundleV1 [ header: {}, length: {} ]", self.header, self.feedbacks_bytes.len())
91 }
92}
93
94impl EncodedPostBundleFeedbackV1 {
95 pub fn to_bytes(&self) -> anyhow::Result<Bytes> {
96 let mut bytes = BytesMut::new();
97
98 let header = json::struct_to_bytes(&self.header)?;
99 bytes.put_u8(1u8); bytes.put_u64(header.len() as u64);
101 bytes.put_u64(self.feedbacks_bytes.len() as u64);
102 bytes.put_slice(header.as_ref());
103 bytes.put_slice(self.feedbacks_bytes.as_ref());
104
105 Ok(bytes.freeze())
106 }
107
108 pub fn from_bytes(mut bytes: Bytes) -> anyhow::Result<Self> {
109 if bytes.remaining() < 1 {
110 anyhow::bail!("Invalid buffer: missing post_bundle version");
111 }
112 let version = bytes.get_u8();
113 if 1 != version {
114 anyhow::bail!("Invalid buffer: unknown post_bundle version");
115 }
116
117 if bytes.remaining() < 16 {
118 anyhow::bail!("Invalid buffer: missing post_bundle lengths");
119 }
120
121 let header_len = bytes.get_u64() as usize;
122 let body_len = bytes.get_u64() as usize;
123 let total_length = header_len.checked_add(body_len).ok_or_else(|| anyhow::anyhow!("total_length overflow"))?;
124 if bytes.remaining() < total_length {
125 anyhow::bail!("Invalid buffer: post_bundle data truncated");
126 }
127 let header_bytes = bytes.copy_to_bytes(header_len);
128 let body_bytes = bytes.copy_to_bytes(body_len);
129
130 let header = json::bytes_to_struct(&header_bytes)?;
131
132 Ok(EncodedPostBundleFeedbackV1 { header, feedbacks_bytes: body_bytes })
133 }
134
135 pub fn verify(&self) -> anyhow::Result<()> {
143 self.header.verify()?;
145
146 if !self.feedbacks_bytes.len().is_multiple_of(ENTRY_SIZE) {
148 anyhow::bail!(
149 "feedbacks_bytes length ({}) is not a multiple of ENTRY_SIZE ({})",
150 self.feedbacks_bytes.len(),
151 ENTRY_SIZE
152 );
153 }
154
155 let feedbacks_bytes_hash = hashing::hash(self.feedbacks_bytes.as_ref());
157 anyhow_assert_eq!(feedbacks_bytes_hash, self.header.feedbacks_bytes_hash, "feedbacks_bytes_hash mismatch");
158
159 let num_entries = self.feedbacks_bytes.len() / ENTRY_SIZE;
161 for i in 0..num_entries {
162 let entry_bytes = &self.feedbacks_bytes[i * ENTRY_SIZE..(i + 1) * ENTRY_SIZE];
163 let feedback = EncodedPostFeedbackV1::decode_from_bytes(&mut &entry_bytes[..])
164 .map_err(|e| anyhow::anyhow!("feedback {}: invalid entry: {}", i, e))?;
165 feedback.pow_verify()
166 .map_err(|e| anyhow::anyhow!("feedback {}: pow verification failed: {}", i, e))?;
167 }
168
169 Ok(())
170 }
171
172 pub fn get_post_pow_for_feedback_type(&self, post_id: &Id, feedback_type: u8) -> Pow {
173 for view in EncodedPostFeedbackViewV1::iter(&self.feedbacks_bytes).flatten() {
174 if view.feedback_type() == feedback_type && view.post_id_bytes() == post_id.as_ref() {
176 return view.pow();
177 }
178 }
179 Pow(0)
180 }
181
182 pub fn merge(bundles: &[Self]) -> Option<Self> {
187 let header = bundles.iter().max_by_key(|b| b.header.time_millis)?.header.clone();
188
189 let mut global_max: HashMap<(Id, u8), EncodedPostFeedbackV1> = HashMap::new();
190 for bundle in bundles {
191 for view in EncodedPostFeedbackViewV1::iter(&bundle.feedbacks_bytes) {
192 let Ok(view) = view else { continue };
193 let Ok(post_id) = Id::from_slice(view.post_id_bytes()) else { continue };
194 let key = (post_id, view.feedback_type());
195 let entry = global_max.entry(key).or_insert_with(|| EncodedPostFeedbackV1 {
196 post_id,
197 feedback_type: view.feedback_type(),
198 salt: Salt::from_slice(view.salt_bytes()).unwrap_or_else(|_| Salt::zero()),
199 pow: view.pow(),
200 });
201 if view.pow() > entry.pow {
202 entry.salt = Salt::from_slice(view.salt_bytes()).unwrap_or_else(|_| Salt::zero());
203 entry.pow = view.pow();
204 }
205 }
206 }
207
208 let mut feedbacks_bytes_mut = Vec::new();
209 for feedback in global_max.values() {
210 let _ = feedback.append_encode_to_bytes(&mut feedbacks_bytes_mut);
211 }
212
213 Some(Self { header, feedbacks_bytes: Bytes::from(feedbacks_bytes_mut) })
214 }
215
216 pub fn get_post_pows(&self, post_id: &Id) -> [Pow; 256] {
217 let mut result = [Pow(0); 256];
218 for view in EncodedPostFeedbackViewV1::iter(&self.feedbacks_bytes).flatten() {
219 if view.post_id_bytes() == post_id.as_ref() {
220 result[view.feedback_type() as usize] = view.pow();
221 }
222 }
223
224 result
225 }
226}
227
228#[cfg(test)]
229mod tests {
230 use super::*;
231 use crate::tools::server_id::ServerId;
232 use crate::tools::time_provider::time_provider::{RealTimeProvider, TimeProvider};
233 use crate::tools::pow;
234 use crate::tools::pow_generator::single_threaded_pow_generator::SingleThreadedPowGenerator;
235
236 async fn make_valid_bundle() -> anyhow::Result<EncodedPostBundleFeedbackV1> {
238 let time_provider = RealTimeProvider;
239 let pow_generator = SingleThreadedPowGenerator::new();
240 let server_id = ServerId::new("own_pow", &time_provider, Pow(0), true, &pow_generator).await?;
241 let peer = server_id.to_peer(&time_provider)?;
242
243 let post_id = Id::random();
244 let feedback_type = 1u8;
245 let data_hash = pow::pow_compute_data_hash(&[post_id.as_bytes(), &[feedback_type]]);
247 let (salt, achieved_pow, _) = pow::pow_generate_with_iteration_limit(1, Pow(0), &data_hash).await?;
248 let feedback = EncodedPostFeedbackV1::new(post_id, feedback_type, salt, achieved_pow);
249
250 let mut feedbacks_bytes_mut = Vec::new();
251 feedback.append_encode_to_bytes(&mut feedbacks_bytes_mut)?;
252 let feedbacks_bytes = Bytes::from(feedbacks_bytes_mut);
253
254 let mut header = EncodedPostBundleFeedbackHeaderV1 {
255 time_millis: time_provider.current_time_millis(),
256 location_id: Id::random(),
257 feedbacks_bytes_hash: hashing::hash(feedbacks_bytes.as_ref()),
258 peer,
259 signature: Signature::zero(),
260 };
261 header.signature_generate(&server_id.keys.signature_key);
262
263 Ok(EncodedPostBundleFeedbackV1 { header, feedbacks_bytes })
264 }
265
266 #[tokio::test]
267 async fn test_verify_valid_bundle() -> anyhow::Result<()> {
268 let bundle = make_valid_bundle().await?;
269 bundle.verify()
270 }
271
272 #[tokio::test]
273 async fn test_verify_bad_header_signature() -> anyhow::Result<()> {
274 let mut bundle = make_valid_bundle().await?;
275 bundle.header.signature = Signature::zero();
276 assert!(bundle.verify().is_err());
277 Ok(())
278 }
279
280 #[tokio::test]
281 async fn test_verify_wrong_feedbacks_hash() -> anyhow::Result<()> {
282 let mut bundle = make_valid_bundle().await?;
283 let pow_generator = SingleThreadedPowGenerator::new();
284 let server_id = ServerId::new("own_pow", &RealTimeProvider, Pow(0), true, &pow_generator).await?;
285 bundle.header.feedbacks_bytes_hash = hashing::hash(b"wrong");
286 bundle.header.signature_generate(&server_id.keys.signature_key); assert!(bundle.verify().is_err());
288 Ok(())
289 }
290
291 #[tokio::test]
292 async fn test_verify_partial_entry() -> anyhow::Result<()> {
293 let mut bundle = make_valid_bundle().await?;
294 let pow_generator = SingleThreadedPowGenerator::new();
295 let server_id = ServerId::new("own_pow", &RealTimeProvider, Pow(0), true, &pow_generator).await?;
296 let mut bytes = bundle.feedbacks_bytes.to_vec();
298 bytes.push(0u8);
299 bundle.feedbacks_bytes = Bytes::from(bytes);
300 bundle.header.feedbacks_bytes_hash = hashing::hash(bundle.feedbacks_bytes.as_ref());
301 bundle.header.signature_generate(&server_id.keys.signature_key);
302 assert!(bundle.verify().is_err());
303 Ok(())
304 }
305
306 #[tokio::test]
307 async fn test_verify_wrong_pow() -> anyhow::Result<()> {
308 let mut bundle = make_valid_bundle().await?;
309 let pow_generator = SingleThreadedPowGenerator::new();
310 let server_id = ServerId::new("own_pow", &RealTimeProvider, Pow(0), true, &pow_generator).await?;
311 let mut bytes = bundle.feedbacks_bytes.to_vec();
313 let last = bytes.last_mut().unwrap();
314 *last = last.wrapping_add(1);
315 bundle.feedbacks_bytes = Bytes::from(bytes);
316 bundle.header.feedbacks_bytes_hash = hashing::hash(bundle.feedbacks_bytes.as_ref());
317 bundle.header.signature_generate(&server_id.keys.signature_key);
318 assert!(bundle.verify().is_err());
319 Ok(())
320 }
321
322 #[tokio::test]
323 async fn encoded_post_bundle_header_v1_to_from_bytes_roundtrip() -> anyhow::Result<()> {
324 let time_provider = RealTimeProvider;
325 let pow_generator = SingleThreadedPowGenerator::new();
326 let server_id = ServerId::new("own_pow", &time_provider, Pow(0), true, &pow_generator).await?;
327 let peer = server_id.to_peer(&time_provider)?;
328 let feedbacks_bytes = Bytes::new();
329
330 let mut header = EncodedPostBundleFeedbackHeaderV1 {
331 time_millis: TimeMillis::random(),
332 location_id: Id::random(),
333 feedbacks_bytes_hash: hashing::hash(feedbacks_bytes.as_ref()),
334 peer,
335 signature: Signature::zero(),
336 };
337
338 header.signature_generate(&server_id.keys.signature_key);
339 header.verify()?;
340
341 let bundle = EncodedPostBundleFeedbackV1 { header, feedbacks_bytes };
342
343 let bytes1 = bundle.to_bytes()?;
344 let decoded = EncodedPostBundleFeedbackV1::from_bytes(bytes1.clone())?;
345
346 assert_eq!(bundle, decoded);
347
348 let bytes2 = decoded.to_bytes()?;
350 assert_eq!(bytes1, bytes2);
351
352 Ok(())
353 }
354}