Skip to main content

hashiverse_lib/protocol/posting/
encoded_post_bundle_feedback.rs

1//! # `EncodedPostBundleFeedbackV1` — server-signed aggregate feedback for a bundle
2//!
3//! Feedback (reactions, upvotes/downvotes, flags) on the posts in a single
4//! `(location, time bucket)` is aggregated into its own bundle, parallel to — but
5//! independent of — the [`crate::protocol::posting::encoded_post_bundle`]. The two are
6//! split because feedback can change long after the post bundle itself has sealed, and
7//! many consumers don't need the feedback layer at all.
8//!
9//! Wire format:
10//!
11//! - **Header** (server-signed) — lists every feedback entry's `(post_id,
12//!   feedback_type, salt, pow)` along with a hash of the body. Signed by the
13//!   publishing [`crate::protocol::peer::Peer`].
14//! - **Body** — a packed array of fixed-size
15//!   [`crate::protocol::posting::encoded_post_feedback::EncodedPostFeedbackV1`] entries,
16//!   exactly `N * ENTRY_SIZE` bytes. Fixed sizing means iteration and verification are
17//!   O(N) with no per-entry header overhead.
18//!
19//! `verify()` checks the signature, confirms body length is a multiple of `ENTRY_SIZE`,
20//! re-hashes the body and compares against the header's hash, and checks the
21//! proof-of-work on every entry against the numeraire — Sybil-resistant voting in one
22//! pass.
23
24use crate::protocol::peer::Peer;
25use crate::protocol::posting::encoded_post_feedback::{EncodedPostFeedbackV1, EncodedPostFeedbackViewV1, ENTRY_SIZE};
26use crate::tools::time::TimeMillis;
27use crate::tools::types::{Hash, Id, Pow, Salt, Signature, SignatureKey, VerificationKey};
28use std::collections::HashMap;
29use crate::tools::{hashing, json, signing};
30use bytes::{Buf, BufMut, Bytes, BytesMut};
31use serde::{Deserialize, Serialize};
32use std::fmt::{Debug, Display};
33use crate::anyhow_assert_eq;
34
35// Contains all the posts for a given location_id in a particular bucket
36#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
37pub struct EncodedPostBundleFeedbackHeaderV1 {
38    pub time_millis: TimeMillis, // When was this bundle last updated
39    pub location_id: Id,
40    pub feedbacks_bytes_hash: Hash, // Hash(feedbacks_bytes from the body)
41    pub peer: Peer,
42
43    pub signature: Signature, // Peer.sign(get_hash_for_signing())
44}
45
46impl EncodedPostBundleFeedbackHeaderV1 {
47    pub fn get_hash_for_signing(&self) -> Hash {
48        let time_millis_be = self.time_millis.encode_be();
49
50        let hash_input: Vec<&[u8]> = vec![
51            time_millis_be.as_ref(),
52            self.location_id.as_ref(),
53            self.feedbacks_bytes_hash.as_ref(),
54            self.peer.signature.as_ref(),
55        ];
56
57        hashing::hash_multiple(&hash_input)
58    }
59
60    pub fn signature_generate(&mut self, signature_key: &SignatureKey) {
61        let hash = self.get_hash_for_signing();
62        self.signature = signing::sign(signature_key, hash.as_ref());
63    }
64
65    pub fn signature_verify(&self) -> anyhow::Result<()> {
66        let hash = self.get_hash_for_signing();
67        let verification_key = VerificationKey::from_bytes(&self.peer.verification_key_bytes)?;
68        signing::verify(&verification_key, &self.signature, hash.as_ref())
69    }
70
71    pub fn verify(&self) -> anyhow::Result<()> {
72        self.signature_verify()?;
73        Ok(())
74    }
75}
76
77impl Display for EncodedPostBundleFeedbackHeaderV1 {
78    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
79        write!(f, "EncodedPostBundleFeedbackHeaderV1 [ location_id: {}, time_millis: {} ]", self.location_id, self.time_millis)
80    }
81}
82
83#[derive(Debug, PartialEq, Clone)]
84pub struct EncodedPostBundleFeedbackV1 {
85    pub header: EncodedPostBundleFeedbackHeaderV1,
86    pub feedbacks_bytes: Bytes, // A concatenated array of EncodedPostFeedbackV1
87}
88impl Display for EncodedPostBundleFeedbackV1 {
89    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
90        write!(f, "EncodedPostBundleV1 [ header: {}, length: {} ]", self.header, self.feedbacks_bytes.len())
91    }
92}
93
94impl EncodedPostBundleFeedbackV1 {
95    pub fn to_bytes(&self) -> anyhow::Result<Bytes> {
96        let mut bytes = BytesMut::new();
97
98        let header = json::struct_to_bytes(&self.header)?;
99        bytes.put_u8(1u8); // Version
100        bytes.put_u64(header.len() as u64);
101        bytes.put_u64(self.feedbacks_bytes.len() as u64);
102        bytes.put_slice(header.as_ref());
103        bytes.put_slice(self.feedbacks_bytes.as_ref());
104
105        Ok(bytes.freeze())
106    }
107
108    pub fn from_bytes(mut bytes: Bytes) -> anyhow::Result<Self> {
109        if bytes.remaining() < 1 {
110            anyhow::bail!("Invalid buffer: missing post_bundle version");
111        }
112        let version = bytes.get_u8();
113        if 1 != version {
114            anyhow::bail!("Invalid buffer: unknown post_bundle version");
115        }
116
117        if bytes.remaining() < 16 {
118            anyhow::bail!("Invalid buffer: missing post_bundle lengths");
119        }
120
121        let header_len = bytes.get_u64() as usize;
122        let body_len = bytes.get_u64() as usize;
123        let total_length = header_len.checked_add(body_len).ok_or_else(|| anyhow::anyhow!("total_length overflow"))?;
124        if bytes.remaining() < total_length {
125            anyhow::bail!("Invalid buffer: post_bundle data truncated");
126        }
127        let header_bytes = bytes.copy_to_bytes(header_len);
128        let body_bytes = bytes.copy_to_bytes(body_len);
129
130        let header = json::bytes_to_struct(&header_bytes)?;
131
132        Ok(EncodedPostBundleFeedbackV1 { header, feedbacks_bytes: body_bytes })
133    }
134
135    /// Verifies that this bundle is legitimate enough to cache.
136    ///
137    /// Checks:
138    /// 1. The bundle header is valid and its signature verifies.
139    /// 2. The feedback body is a valid multiple of `ENTRY_SIZE` (no partial entries).
140    /// 3. The header's hash is the same as the body's
141    /// 4. Each feedback entry's proof-of-work is correct.
142    pub fn verify(&self) -> anyhow::Result<()> {
143        // (1) Header signature
144        self.header.verify()?;
145
146        // (2) Body must be an exact multiple of the fixed entry size
147        if !self.feedbacks_bytes.len().is_multiple_of(ENTRY_SIZE) {
148            anyhow::bail!(
149                "feedbacks_bytes length ({}) is not a multiple of ENTRY_SIZE ({})",
150                self.feedbacks_bytes.len(),
151                ENTRY_SIZE
152            );
153        }
154
155        // (3) Hash
156        let feedbacks_bytes_hash = hashing::hash(self.feedbacks_bytes.as_ref());
157        anyhow_assert_eq!(feedbacks_bytes_hash, self.header.feedbacks_bytes_hash, "feedbacks_bytes_hash mismatch");
158
159        // (4) PoW check for every entry
160        let num_entries = self.feedbacks_bytes.len() / ENTRY_SIZE;
161        for i in 0..num_entries {
162            let entry_bytes = &self.feedbacks_bytes[i * ENTRY_SIZE..(i + 1) * ENTRY_SIZE];
163            let feedback = EncodedPostFeedbackV1::decode_from_bytes(&mut &entry_bytes[..])
164                .map_err(|e| anyhow::anyhow!("feedback {}: invalid entry: {}", i, e))?;
165            feedback.pow_verify()
166                .map_err(|e| anyhow::anyhow!("feedback {}: pow verification failed: {}", i, e))?;
167        }
168
169        Ok(())
170    }
171
172    pub fn get_post_pow_for_feedback_type(&self, post_id: &Id, feedback_type: u8) -> Pow {
173        for view in EncodedPostFeedbackViewV1::iter(&self.feedbacks_bytes).flatten() {
174            // Check feedback_type first as it short circuits and is therefore faster...
175            if view.feedback_type() == feedback_type && view.post_id_bytes() == post_id.as_ref() {
176                return view.pow();
177            }
178        }
179        Pow(0)
180    }
181
182    /// Unions an array of bundles by taking the highest-pow entry for each
183    /// `(post_id, feedback_type)` pair across all bundles.  The header is
184    /// taken from the most-recently-timestamped bundle.  Returns `None` if
185    /// `bundles` is empty.
186    pub fn merge(bundles: &[Self]) -> Option<Self> {
187        let header = bundles.iter().max_by_key(|b| b.header.time_millis)?.header.clone();
188
189        let mut global_max: HashMap<(Id, u8), EncodedPostFeedbackV1> = HashMap::new();
190        for bundle in bundles {
191            for view in EncodedPostFeedbackViewV1::iter(&bundle.feedbacks_bytes) {
192                let Ok(view) = view else { continue };
193                let Ok(post_id) = Id::from_slice(view.post_id_bytes()) else { continue };
194                let key = (post_id, view.feedback_type());
195                let entry = global_max.entry(key).or_insert_with(|| EncodedPostFeedbackV1 {
196                    post_id,
197                    feedback_type: view.feedback_type(),
198                    salt: Salt::from_slice(view.salt_bytes()).unwrap_or_else(|_| Salt::zero()),
199                    pow: view.pow(),
200                });
201                if view.pow() > entry.pow {
202                    entry.salt = Salt::from_slice(view.salt_bytes()).unwrap_or_else(|_| Salt::zero());
203                    entry.pow = view.pow();
204                }
205            }
206        }
207
208        let mut feedbacks_bytes_mut = Vec::new();
209        for feedback in global_max.values() {
210            let _ = feedback.append_encode_to_bytes(&mut feedbacks_bytes_mut);
211        }
212
213        Some(Self { header, feedbacks_bytes: Bytes::from(feedbacks_bytes_mut) })
214    }
215
216    pub fn get_post_pows(&self, post_id: &Id) -> [Pow; 256] {
217        let mut result = [Pow(0); 256];
218        for view in EncodedPostFeedbackViewV1::iter(&self.feedbacks_bytes).flatten() {
219            if  view.post_id_bytes() == post_id.as_ref() {
220                result[view.feedback_type() as usize] = view.pow();
221            }
222        }
223
224        result
225    }
226}
227
228#[cfg(test)]
229mod tests {
230    use super::*;
231    use crate::tools::server_id::ServerId;
232    use crate::tools::time_provider::time_provider::{RealTimeProvider, TimeProvider};
233    use crate::tools::pow;
234    use crate::tools::pow_generator::pow_generator::PowGenerator;
235    use crate::tools::pow_generator::single_threaded_pow_generator::SingleThreadedPowGenerator;
236
237    /// Builds a valid single-entry feedback bundle.
238    async fn make_valid_bundle() -> anyhow::Result<EncodedPostBundleFeedbackV1> {
239        let time_provider = RealTimeProvider;
240        let pow_generator = SingleThreadedPowGenerator::new();
241        let server_id = ServerId::new("own_pow", &time_provider, Pow(0), true, &pow_generator).await?;
242        let peer = server_id.to_peer(&time_provider)?;
243
244        let post_id = Id::random();
245        let feedback_type = 1u8;
246        // One iteration with Pow(0) always succeeds immediately — cheap in tests
247        let data_hash = pow::pow_compute_data_hash(&[post_id.as_bytes(), &[feedback_type]]);
248        let (salt, achieved_pow, _) = pow_generator.generate_best_effort("make_valid_bundle_feedback", 1, Pow(0), data_hash).await?;
249        let feedback = EncodedPostFeedbackV1::new(post_id, feedback_type, salt, achieved_pow);
250
251        let mut feedbacks_bytes_mut = Vec::new();
252        feedback.append_encode_to_bytes(&mut feedbacks_bytes_mut)?;
253        let feedbacks_bytes = Bytes::from(feedbacks_bytes_mut);
254
255        let mut header = EncodedPostBundleFeedbackHeaderV1 {
256            time_millis: time_provider.current_time_millis(),
257            location_id: Id::random(),
258            feedbacks_bytes_hash: hashing::hash(feedbacks_bytes.as_ref()),
259            peer,
260            signature: Signature::zero(),
261        };
262        header.signature_generate(&server_id.keys.signature_key);
263
264        Ok(EncodedPostBundleFeedbackV1 { header, feedbacks_bytes })
265    }
266
267    #[tokio::test]
268    async fn test_verify_valid_bundle() -> anyhow::Result<()> {
269        let bundle = make_valid_bundle().await?;
270        bundle.verify()
271    }
272
273    #[tokio::test]
274    async fn test_verify_bad_header_signature() -> anyhow::Result<()> {
275        let mut bundle = make_valid_bundle().await?;
276        bundle.header.signature = Signature::zero();
277        assert!(bundle.verify().is_err());
278        Ok(())
279    }
280
281    #[tokio::test]
282    async fn test_verify_wrong_feedbacks_hash() -> anyhow::Result<()> {
283        let mut bundle = make_valid_bundle().await?;
284        let pow_generator = SingleThreadedPowGenerator::new();
285        let server_id = ServerId::new("own_pow", &RealTimeProvider, Pow(0), true, &pow_generator).await?;
286        bundle.header.feedbacks_bytes_hash = hashing::hash(b"wrong");
287        bundle.header.signature_generate(&server_id.keys.signature_key); // re-sign so header sig itself is valid
288        assert!(bundle.verify().is_err());
289        Ok(())
290    }
291
292    #[tokio::test]
293    async fn test_verify_partial_entry() -> anyhow::Result<()> {
294        let mut bundle = make_valid_bundle().await?;
295        let pow_generator = SingleThreadedPowGenerator::new();
296        let server_id = ServerId::new("own_pow", &RealTimeProvider, Pow(0), true, &pow_generator).await?;
297        // Append one extra byte to make the length not a multiple of ENTRY_SIZE
298        let mut bytes = bundle.feedbacks_bytes.to_vec();
299        bytes.push(0u8);
300        bundle.feedbacks_bytes = Bytes::from(bytes);
301        bundle.header.feedbacks_bytes_hash = hashing::hash(bundle.feedbacks_bytes.as_ref());
302        bundle.header.signature_generate(&server_id.keys.signature_key);
303        assert!(bundle.verify().is_err());
304        Ok(())
305    }
306
307    #[tokio::test]
308    async fn test_verify_wrong_pow() -> anyhow::Result<()> {
309        let mut bundle = make_valid_bundle().await?;
310        let pow_generator = SingleThreadedPowGenerator::new();
311        let server_id = ServerId::new("own_pow", &RealTimeProvider, Pow(0), true, &pow_generator).await?;
312        // Flip the last byte of the entry (the pow byte) to an incorrect value
313        let mut bytes = bundle.feedbacks_bytes.to_vec();
314        let last = bytes.last_mut().unwrap();
315        *last = last.wrapping_add(1);
316        bundle.feedbacks_bytes = Bytes::from(bytes);
317        bundle.header.feedbacks_bytes_hash = hashing::hash(bundle.feedbacks_bytes.as_ref());
318        bundle.header.signature_generate(&server_id.keys.signature_key);
319        assert!(bundle.verify().is_err());
320        Ok(())
321    }
322
323    #[tokio::test]
324    async fn encoded_post_bundle_header_v1_to_from_bytes_roundtrip() -> anyhow::Result<()> {
325        let time_provider = RealTimeProvider;
326        let pow_generator = SingleThreadedPowGenerator::new();
327        let server_id = ServerId::new("own_pow", &time_provider, Pow(0), true, &pow_generator).await?;
328        let peer = server_id.to_peer(&time_provider)?;
329        let feedbacks_bytes =  Bytes::new();
330
331        let mut header = EncodedPostBundleFeedbackHeaderV1 {
332            time_millis: TimeMillis::random(),
333            location_id: Id::random(),
334            feedbacks_bytes_hash: hashing::hash(feedbacks_bytes.as_ref()),
335            peer,
336            signature: Signature::zero(),
337        };
338
339        header.signature_generate(&server_id.keys.signature_key);
340        header.verify()?;
341
342        let bundle = EncodedPostBundleFeedbackV1 { header, feedbacks_bytes };
343
344        let bytes1 = bundle.to_bytes()?;
345        let decoded = EncodedPostBundleFeedbackV1::from_bytes(bytes1.clone())?;
346
347        assert_eq!(bundle, decoded);
348
349        // Optional extra sanity check: encoding the decoded struct should be stable.
350        let bytes2 = decoded.to_bytes()?;
351        assert_eq!(bytes1, bytes2);
352
353        Ok(())
354    }
355}