Skip to main content

hashiverse_lib/protocol/posting/
encoded_post_bundle.rs

1//! # `EncodedPostBundleV1` — the server-signed index of posts at a location
2//!
3//! A "post bundle" is a server's durable record of every post accepted into a given
4//! `(location_id, time bucket)` pair. This is the unit of replication, caching, and
5//! healing on the network — timelines fetch bundles, caches store bundles, and the
6//! two-phase heal protocol reconciles divergent bundles.
7//!
8//! The encoding is two parts glued together:
9//!
10//! - [`EncodedPostBundleHeaderV1`] — JSON-encoded, server-signed, enumerating the
11//!   `post_id`s and per-post body lengths the bundle contains, plus
12//!   `sealed`/`overflowed` flags, any `healed_ids`, and the publishing
13//!   [`crate::protocol::peer::Peer`] record. This is the bit that proves "server X
14//!   attests that these posts lived here at time T".
15//! - A concatenation of each [`crate::protocol::posting::encoded_post::EncodedPostV1`]
16//!   body, in the same order as the header's post id list.
17//!
18//! `verify()` is exhaustive: it checks the header signature, confirms the sum of
19//! header-declared lengths exactly spans the body, and decrypts each post to verify
20//! its own signature and that its plaintext `post_id` matches the header's claim.
21
22use std::collections::HashSet;
23use crate::{anyhow_assert_eq, anyhow_assert_ge};
24use crate::protocol::peer::Peer;
25use crate::protocol::posting::encoded_post::EncodedPostV1;
26use crate::tools::time::TimeMillis;
27use crate::tools::types::{Hash, Id, ID_BYTES, Signature, SignatureKey, VerificationKey};
28use crate::tools::{hashing, json, signing};
29use bytes::{Buf, BufMut, Bytes, BytesMut};
30use serde::{Deserialize, Serialize};
31use std::fmt::{Debug, Display};
32
33/// The server-signed index for all posts accumulated under one (location, time-bucket) key.
34///
35/// Posts on hashiverse are not stored one-by-one on the DHT: they are grouped into "bundles"
36/// keyed by a [`crate::tools::buckets::BucketLocation`] (a location_id plus a time bucket).
37/// An `EncodedPostBundleHeaderV1` is the header that a server publishes describing the
38/// bundle's current state — the list of post IDs it holds, their lengths, which ones have
39/// been healed (re-uploaded after loss), whether the bundle has overflowed (too many posts,
40/// go more granular), whether it is sealed (old enough that new posts are unlikely), and the
41/// [`Peer`] record of the server publishing it.
42///
43/// The whole header is signed by the publishing peer, which lets any client reading the
44/// bundle verify both the contents list and the identity of the hosting server before
45/// deciding to fetch individual posts.
46// Contains all the posts for a given location_id in a particular bucket
47#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
48pub struct EncodedPostBundleHeaderV1 {
49    pub time_millis: TimeMillis, // When was this bundle last updated
50    pub location_id: Id,
51    pub overflowed: bool, // This bucket has overflowed, so go more granular
52    pub sealed: bool,    // Enough time has passed that it is unlikely that new posts will be added to this cluster
53    pub num_posts: u8,
54    pub encoded_post_ids: Vec<Id>, 
55    pub encoded_post_lengths: Vec<usize>,
56    pub encoded_post_healed: HashSet<Id>,
57    pub peer: Peer,
58
59    pub signature: Signature, // Peer.sign(get_hash_for_signing())
60}
61
62impl EncodedPostBundleHeaderV1 {
63    pub fn get_hash_for_signing(&self) -> anyhow::Result<Hash> {
64        let time_millis_be = self.time_millis.encode_be();
65        let overflowed_bytes = [self.overflowed as u8];
66        let sealed_bytes = [self.sealed as u8];
67        let num_posts_bytes = [self.num_posts];
68        let encoded_post_lengths_be: Vec<[u8; 8]> = self.encoded_post_lengths.iter().map(|&l| (l as u64).to_be_bytes()).collect();
69        let peer_hash = self.peer.signature_hash_generate()?;
70
71        let mut hash_input: Vec<&[u8]> = vec![
72            time_millis_be.as_ref(),
73            self.location_id.as_ref(),
74            &overflowed_bytes,
75            &sealed_bytes,
76            &num_posts_bytes,
77        ];
78
79        for encoded_post_id in &self.encoded_post_ids {
80            hash_input.push(encoded_post_id.as_ref());
81        }
82        for length_be in &encoded_post_lengths_be {
83            hash_input.push(length_be.as_ref());
84        }
85        let mut healed_ids_sorted: Vec<Id> = self.encoded_post_healed.iter().copied().collect();
86        healed_ids_sorted.sort();
87        for healed_id in &healed_ids_sorted {
88            hash_input.push(healed_id.as_ref());
89        }
90        hash_input.push(peer_hash.as_ref());
91
92        Ok(hashing::hash_multiple(&hash_input))
93    }
94
95    pub fn signature_generate(&mut self, signature_key: &SignatureKey) -> anyhow::Result<()> {
96        let hash = self.get_hash_for_signing()?;
97        self.signature = signing::sign(signature_key, hash.as_ref());
98        Ok(())
99    }
100
101    pub fn signature_verify(&self) -> anyhow::Result<()> {
102        let hash = self.get_hash_for_signing()?;
103        let verification_key = VerificationKey::from_bytes(&self.peer.verification_key_bytes)?;
104        signing::verify(&verification_key, &self.signature, hash.as_ref())
105    }
106
107    pub fn verify(&self) -> anyhow::Result<()> {
108        anyhow_assert_eq!(self.num_posts, self.encoded_post_lengths.len() as u8);
109        anyhow_assert_eq!(self.num_posts, self.encoded_post_ids.len() as u8);
110        for healed_id in &self.encoded_post_healed {
111            if !self.encoded_post_ids.contains(healed_id) {
112                anyhow::bail!("encoded_post_healed contains id not in encoded_post_ids: {}", healed_id);
113            }
114        }
115        self.signature_verify()?;
116        Ok(())
117    }
118}
119
120impl Display for EncodedPostBundleHeaderV1 {
121    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
122        write!(f, "EncodedPostBundleHeaderV1 [ location_id: {}, time_millis: {}, num_posts: {}, overflowed: {}, sealed: {} ]", self.location_id, self.time_millis, self.num_posts, self.overflowed, self.sealed)
123    }
124}
125
126#[derive(Debug, PartialEq, Clone)]
127pub struct EncodedPostBundleV1 {
128    pub header: EncodedPostBundleHeaderV1,
129    pub encoded_posts_bytes: Bytes,
130}
131
132impl Display for EncodedPostBundleV1 {
133    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
134        write!(f, "EncodedPostBundleV1 [ header: {}, length: {} ]", self.header, self.encoded_posts_bytes.len())
135    }
136}
137
138impl EncodedPostBundleV1 {
139    pub fn to_bytes(&self) -> anyhow::Result<Bytes> {
140        let mut bytes = BytesMut::new();
141
142        let json_post_bundle_header = json::struct_to_bytes(&self.header)?;
143        bytes.put_u8(1u8); // Version
144        bytes.put_u64(json_post_bundle_header.len() as u64);
145        bytes.put_u64(self.encoded_posts_bytes.len() as u64);
146        bytes.put_slice(json_post_bundle_header.as_ref());
147        bytes.put_slice(self.encoded_posts_bytes.as_ref());
148
149        Ok(bytes.freeze())
150    }
151
152    pub fn from_bytes(mut bytes: Bytes, decode_body: bool) -> anyhow::Result<Self> {
153        anyhow_assert_ge!(bytes.remaining(), 1, "Missing version");
154        let version = bytes.get_u8();
155        anyhow_assert_eq!(1, version, "Invalid version");
156
157        anyhow_assert_ge!(bytes.remaining(), 8, "Missing header length");
158        let header_len = bytes.get_u64() as usize;
159        anyhow_assert_ge!(bytes.remaining(), 8, "Missing body length");
160        let body_len = bytes.get_u64() as usize;
161
162        let total_length = header_len.checked_add(body_len).ok_or_else(|| anyhow::anyhow!("header_len + body_len overflow"))?;
163        anyhow_assert_ge!(bytes.remaining(), total_length, "Truncated post bundle data");
164
165        let header_bytes = bytes.copy_to_bytes(header_len);
166        let header = json::bytes_to_struct(&header_bytes)?;
167
168        let body = match decode_body {
169            true => {
170                let body_bytes = bytes.copy_to_bytes(body_len);
171                anyhow_assert_eq!(bytes.remaining(), 0, "Excess data");
172                body_bytes
173            },
174            false => Bytes::new(),
175        };
176
177        Ok(EncodedPostBundleV1 {
178            header,
179            encoded_posts_bytes: body,
180        })
181    }
182
183    /// Verifies that this bundle is legitimate enough to cache.
184    ///
185    /// Checks:
186    /// 1. The bundle header is structurally valid and its signature verifies.
187    /// 2. The sum of `encoded_post_lengths` exactly spans `encoded_posts_bytes`, and
188    ///    the plaintext `post_id` prefix of each slice matches the corresponding entry
189    ///    in `encoded_post_ids`.
190    /// 3. Each post slice can be decrypted using `base_id`, which also verifies the
191    ///    per-post signature and hash integrity.
192    pub fn verify(&self, base_id: &Id) -> anyhow::Result<()> {
193        // (1) Header structure and signature
194        self.header.verify()?;
195
196        // (2) Lengths must exactly span the body
197        let total_length: usize = self.header.encoded_post_lengths.iter().sum();
198        if total_length != self.encoded_posts_bytes.len() {
199            anyhow::bail!(
200                "sum of encoded_post_lengths ({}) != encoded_posts_bytes length ({})",
201                total_length,
202                self.encoded_posts_bytes.len()
203            );
204        }
205
206        // (2) + (3) Per-post checks
207        let mut offset = 0usize;
208        for (i, (&length, expected_post_id)) in self.header.encoded_post_lengths.iter().zip(self.header.encoded_post_ids.iter()).enumerate() {
209            let post_bytes = self.encoded_posts_bytes.slice(offset..offset + length);
210
211            // Plaintext post_id prefix must match the header's claim before we do any crypto
212            if post_bytes.len() < ID_BYTES {
213                anyhow::bail!("post {}: bytes too short to contain post_id", i);
214            }
215            let actual_post_id = Id::from_slice(&post_bytes[..ID_BYTES])?;
216            if actual_post_id != *expected_post_id {
217                anyhow::bail!("post {}: id mismatch — header claims {} but bytes contain {}", i, expected_post_id, actual_post_id);
218            }
219
220            // Decrypt header (expect_body=true, decode_body=false): verifies per-post signature and hashes
221            EncodedPostV1::decode_from_bytes(post_bytes, base_id, true, false)
222                .map_err(|e| anyhow::anyhow!("post {}: failed to verify with base_id: {}", i, e))?;
223
224            offset += length;
225        }
226
227        Ok(())
228    }
229}
230
231
232#[cfg(test)]
233mod tests {
234    use super::*;
235    use crate::client::key_locker::key_locker::{KeyLocker, KeyLockerManager};
236    use crate::client::key_locker::mem_key_locker::MemKeyLockerManager;
237    use std::sync::Arc;
238    use crate::protocol::posting::encoded_post::EncodedPostV1;
239    use crate::tools::server_id::ServerId;
240    use crate::tools::time_provider::time_provider::{RealTimeProvider, TimeProvider};
241    use crate::tools::tools;
242    use crate::tools::types::Pow;
243    use crate::tools::pow_generator::single_threaded_pow_generator::SingleThreadedPowGenerator;
244
245    /// Builds a valid single-post bundle encrypted under `base_id`.
246    async fn make_valid_bundle(base_id: Id) -> anyhow::Result<EncodedPostBundleV1> {
247        let time_provider = RealTimeProvider;
248        let pow_generator = SingleThreadedPowGenerator::new();
249        let server_id = ServerId::new("own_pow", &time_provider, Pow(0), true, &pow_generator).await?;
250        let peer = server_id.to_peer(&time_provider)?;
251
252        let key_locker_manager = MemKeyLockerManager::new().await?;
253        let key_locker: Arc<dyn KeyLocker> = key_locker_manager.create("test keyphrase".to_string()).await?;
254        let client_id = key_locker.client_id();
255        let timestamp = time_provider.current_time_millis();
256
257        let mut encoded_post = EncodedPostV1::new(client_id, timestamp, vec![base_id], "test post content");
258        let post_bytes_obj = encoded_post.encode_to_bytes_direct(&key_locker).await?;
259        let post_bytes = Bytes::copy_from_slice(post_bytes_obj.bytes());
260
261        let mut header = EncodedPostBundleHeaderV1 {
262            time_millis: timestamp,
263            location_id: Id::random(),
264            overflowed: false,
265            sealed: false,
266            num_posts: 1,
267            encoded_post_ids: vec![encoded_post.post_id],
268            encoded_post_lengths: vec![post_bytes.len()],
269            encoded_post_healed: HashSet::new(),
270            peer,
271            signature: Signature::zero(),
272        };
273        header.signature_generate(&server_id.keys.signature_key)?;
274
275        Ok(EncodedPostBundleV1 { header, encoded_posts_bytes: post_bytes })
276    }
277
278    #[tokio::test]
279    async fn test_verify_valid_bundle() -> anyhow::Result<()> {
280        let base_id = Id::random();
281        let bundle = make_valid_bundle(base_id).await?;
282        bundle.verify(&base_id)
283    }
284
285    #[tokio::test]
286    async fn test_verify_wrong_base_id() -> anyhow::Result<()> {
287        let base_id = Id::random();
288        let bundle = make_valid_bundle(base_id).await?;
289        let wrong_base_id = Id::random();
290        assert!(bundle.verify(&wrong_base_id).is_err());
291        Ok(())
292    }
293
294    #[tokio::test]
295    async fn test_verify_tampered_post_bytes() -> anyhow::Result<()> {
296        let base_id = Id::random();
297        let bundle = make_valid_bundle(base_id).await?;
298        let mut tampered_posts = bundle.encoded_posts_bytes.to_vec();
299        tampered_posts[ID_BYTES + 10] ^= 0xff; // flip a byte inside the encrypted header
300        let tampered_bundle = EncodedPostBundleV1 {
301            header: bundle.header,
302            encoded_posts_bytes: Bytes::from(tampered_posts),
303        };
304        assert!(tampered_bundle.verify(&base_id).is_err());
305        Ok(())
306    }
307
308    #[tokio::test]
309    async fn test_verify_wrong_post_id_in_header() -> anyhow::Result<()> {
310        let base_id = Id::random();
311        let mut bundle = make_valid_bundle(base_id).await?;
312        let pow_generator = SingleThreadedPowGenerator::new();
313        let server_id = ServerId::new("own_pow", &RealTimeProvider, Pow(0), true, &pow_generator).await?;
314        bundle.header.encoded_post_ids[0] = Id::random(); // wrong post_id
315        bundle.header.signature_generate(&server_id.keys.signature_key)?;
316        assert!(bundle.verify(&base_id).is_err());
317        Ok(())
318    }
319
320    #[tokio::test]
321    async fn test_verify_wrong_length_sum() -> anyhow::Result<()> {
322        let base_id = Id::random();
323        let mut bundle = make_valid_bundle(base_id).await?;
324        let pow_generator = SingleThreadedPowGenerator::new();
325        let server_id = ServerId::new("own_pow", &RealTimeProvider, Pow(0), true, &pow_generator).await?;
326        bundle.header.encoded_post_lengths[0] += 1; // length doesn't match bytes
327        bundle.header.signature_generate(&server_id.keys.signature_key)?;
328        assert!(bundle.verify(&base_id).is_err());
329        Ok(())
330    }
331
332    #[tokio::test]
333    async fn test_verify_tampered_post_length() -> anyhow::Result<()> {
334        // Changing encoded_post_lengths must invalidate the signature even when the
335        // structural checks (sum == body length, num_posts == lengths.len()) still pass.
336        let base_id = Id::random();
337        let bundle = make_valid_bundle(base_id).await?;
338        let original_length = bundle.header.encoded_post_lengths[0];
339        // Pad the body so the sum still matches, then bump the recorded length.
340        let mut tampered_posts = bundle.encoded_posts_bytes.to_vec();
341        tampered_posts.push(0u8); // one extra byte on the body
342        let mut tampered_header = bundle.header.clone();
343        tampered_header.encoded_post_lengths[0] = original_length + 1;
344        // Do NOT re-sign — the signature now covers a different lengths list.
345        let tampered_bundle = EncodedPostBundleV1 {
346            header: tampered_header,
347            encoded_posts_bytes: Bytes::from(tampered_posts),
348        };
349        assert!(tampered_bundle.verify(&base_id).is_err());
350        Ok(())
351    }
352
353    #[tokio::test]
354    async fn test_verify_bad_header_signature() -> anyhow::Result<()> {
355        let base_id = Id::random();
356        let mut bundle = make_valid_bundle(base_id).await?;
357        bundle.header.signature = Signature::zero(); // corrupt the bundle header signature
358        assert!(bundle.verify(&base_id).is_err());
359        Ok(())
360    }
361
362    #[tokio::test]
363    async fn encoded_post_bundle_v1_to_from_bytes_roundtrip() -> anyhow::Result<()> {
364        let time_provider = RealTimeProvider;
365        let pow_generator = SingleThreadedPowGenerator::new();
366        let server_id = ServerId::new("own_pow", &time_provider, Pow(0), true, &pow_generator).await?;
367        let peer = server_id.to_peer(&time_provider)?;
368
369        let num_posts: u8 = 3;
370
371        let mut header = EncodedPostBundleHeaderV1 {
372            time_millis: TimeMillis::random(),
373            location_id: Id::random(),
374            overflowed: true,
375            sealed: false,
376            num_posts,
377            encoded_post_ids: (0..num_posts).map(|_| Id::random()).collect(),
378            encoded_post_lengths: (0..num_posts).map(|_| tools::random_usize_bounded(1024)).collect(),
379            encoded_post_healed: HashSet::new(),
380            peer,
381            signature: Signature::zero(),
382        };
383
384        header.signature_generate(&server_id.keys.signature_key)?;
385        header.verify()?;
386
387        let total_bytes = header.encoded_post_lengths.iter().sum::<usize>();
388        let encoded_posts_bytes = Bytes::from(tools::random_bytes(total_bytes));
389
390        let bundle = EncodedPostBundleV1 {
391            header,
392            encoded_posts_bytes,
393        };
394
395        let bytes1 = bundle.to_bytes()?;
396        let decoded = EncodedPostBundleV1::from_bytes(bytes1.clone(), true)?;
397
398        assert_eq!(bundle, decoded);
399
400        // Optional extra sanity check: encoding the decoded struct should be stable.
401        let bytes2 = decoded.to_bytes()?;
402        assert_eq!(bytes1, bytes2);
403
404        Ok(())
405    }
406
407    #[tokio::test]
408    async fn encoded_post_bundle_v1_to_from_bytes_roundtrip_without_body() -> anyhow::Result<()> {
409        let time_provider = RealTimeProvider;
410        let pow_generator = SingleThreadedPowGenerator::new();
411        let server_id = ServerId::new("own_pow", &time_provider, Pow(0), true, &pow_generator).await?;
412        let peer = server_id.to_peer(&time_provider)?;
413
414        let num_posts: u8 = 3;
415
416        let mut header = EncodedPostBundleHeaderV1 {
417            time_millis: TimeMillis::random(),
418            location_id: Id::random(),
419            overflowed: true,
420            sealed: false,
421            num_posts,
422            encoded_post_ids: (0..num_posts).map(|_| Id::random()).collect(),
423            encoded_post_lengths: (0..num_posts).map(|_| tools::random_usize_bounded(1024)).collect(),
424            encoded_post_healed: HashSet::new(),
425            peer,
426            signature: Signature::zero(),
427        };
428
429        header.signature_generate(&server_id.keys.signature_key)?;
430        header.verify()?;
431
432        let total_bytes = header.encoded_post_lengths.iter().sum::<usize>();
433        let encoded_posts_bytes = Bytes::from(tools::random_bytes(total_bytes));
434
435        let bundle = EncodedPostBundleV1 {
436            header,
437            encoded_posts_bytes,
438        };
439
440        let bytes1 = bundle.to_bytes()?;
441        let decoded = EncodedPostBundleV1::from_bytes(bytes1.clone(), false)?;
442
443        assert_eq!(bundle.header, decoded.header);
444        assert!(decoded.encoded_posts_bytes.is_empty());
445
446        Ok(())
447    }
448
449    // ── Robustness tests: EncodedPostBundleV1::from_bytes ──
450
451    #[test]
452    fn test_from_bytes_empty() {
453        assert!(EncodedPostBundleV1::from_bytes(Bytes::new(), true).is_err());
454    }
455
456    #[test]
457    fn test_from_bytes_wrong_version() {
458        assert!(EncodedPostBundleV1::from_bytes(Bytes::from_static(&[99u8]), true).is_err());
459    }
460
461    #[test]
462    fn test_from_bytes_truncated_at_header_length() {
463        // Version byte only, no header length u64
464        assert!(EncodedPostBundleV1::from_bytes(Bytes::from_static(&[1u8]), true).is_err());
465    }
466
467    #[test]
468    fn test_from_bytes_truncated_at_body_length() {
469        // Version + header_len=0, but no body_len u64
470        let mut bytes = BytesMut::new();
471        bytes.put_u8(1); // version
472        bytes.put_u64(0); // header_len
473        assert!(EncodedPostBundleV1::from_bytes(bytes.freeze(), true).is_err());
474    }
475
476    #[test]
477    fn test_from_bytes_header_len_exceeds_remaining() {
478        let mut bytes = BytesMut::new();
479        bytes.put_u8(1); // version
480        bytes.put_u64(99999); // header_len way too large
481        bytes.put_u64(0); // body_len
482        assert!(EncodedPostBundleV1::from_bytes(bytes.freeze(), true).is_err());
483    }
484
485    #[test]
486    fn test_from_bytes_overflow_lengths() {
487        let mut bytes = BytesMut::new();
488        bytes.put_u8(1); // version
489        bytes.put_u64(u64::MAX); // header_len
490        bytes.put_u64(1); // body_len — header_len + body_len overflows usize
491        assert!(EncodedPostBundleV1::from_bytes(bytes.freeze(), true).is_err());
492    }
493
494    #[test]
495    fn test_from_bytes_garbage() {
496        assert!(EncodedPostBundleV1::from_bytes(Bytes::from_static(&[0xff; 128]), true).is_err());
497    }
498
499    #[cfg(not(target_arch = "wasm32"))]
500    mod bolero_fuzz {
501        use bytes::Bytes;
502        use crate::protocol::posting::encoded_post_bundle::EncodedPostBundleV1;
503
504        #[test]
505        fn fuzz_from_bytes() {
506            bolero::check!().for_each(|data: &[u8]| {
507                let _ = EncodedPostBundleV1::from_bytes(Bytes::copy_from_slice(data), true);
508            });
509        }
510
511        #[test]
512        fn fuzz_from_bytes_no_body() {
513            bolero::check!().for_each(|data: &[u8]| {
514                let _ = EncodedPostBundleV1::from_bytes(Bytes::copy_from_slice(data), false);
515            });
516        }
517    }
518}