memberlist_plumtree/integration.rs
1//! Integration with memberlist delegate system.
2//!
3//! This module provides the glue between Plumtree and memberlist,
4//! handling message routing and membership synchronization.
5//!
6//! # Sender Identity
7//!
8//! Since memberlist's broadcast mechanism doesn't provide sender information,
9//! Plumtree wraps messages in a `NetworkEnvelope` that includes the sender's
10//! node ID. This is critical for proper protocol operation because:
11//!
12//! - **IHave**: Sender must be known to promote to Eager and send Graft
13//! - **Graft**: Requester must be known to send data directly to them
14//! - **Prune**: Sender must be known to demote to Lazy
15//!
16//! The envelope is transparent to users - encoding/decoding happens automatically.
17
18use bytes::{Buf, BufMut, Bytes, BytesMut};
19use memberlist_core::{
20 delegate::{
21 AliveDelegate, ConflictDelegate, Delegate, EventDelegate, MergeDelegate, NodeDelegate,
22 PingDelegate,
23 },
24 proto::{Meta, NodeState},
25};
26use nodecraft::{CheapClone, Id};
27use std::{borrow::Cow, fmt::Debug, hash::Hash, marker::PhantomData, sync::Arc};
28
29use crate::{
30 config::PlumtreeConfig,
31 error::Result,
32 message::{MessageId, PlumtreeMessage},
33 peer_state::PeerState,
34 plumtree::{Plumtree, PlumtreeDelegate, PlumtreeHandle},
35};
36
37/// Trait for encoding and decoding node IDs for network transmission.
38///
39/// This trait must be implemented for any node ID type used with
40/// `PlumtreeMemberlist` to enable sender identity tracking.
41///
42/// # Example
43///
44/// ```rust
45/// use memberlist_plumtree::IdCodec;
46/// use bytes::{Buf, BufMut};
47///
48/// #[derive(Clone, Debug, PartialEq)]
49/// struct MyNodeId(u64);
50///
51/// impl IdCodec for MyNodeId {
52/// fn encode_id(&self, buf: &mut impl BufMut) {
53/// buf.put_u64(self.0);
54/// }
55///
56/// fn decode_id(buf: &mut impl Buf) -> Option<Self> {
57/// if buf.remaining() >= 8 {
58/// Some(MyNodeId(buf.get_u64()))
59/// } else {
60/// None
61/// }
62/// }
63///
64/// fn encoded_id_len(&self) -> usize {
65/// 8
66/// }
67/// }
68/// ```
69pub trait IdCodec: Sized {
70 /// Encode this ID into the buffer.
71 fn encode_id(&self, buf: &mut impl BufMut);
72
73 /// Decode an ID from the buffer.
74 fn decode_id(buf: &mut impl Buf) -> Option<Self>;
75
76 /// Get the encoded length of this ID in bytes.
77 fn encoded_id_len(&self) -> usize;
78}
79
80// Implement IdCodec for common integer types
81impl IdCodec for u8 {
82 fn encode_id(&self, buf: &mut impl BufMut) {
83 buf.put_u8(*self);
84 }
85
86 fn decode_id(buf: &mut impl Buf) -> Option<Self> {
87 if buf.remaining() >= 1 {
88 Some(buf.get_u8())
89 } else {
90 None
91 }
92 }
93
94 fn encoded_id_len(&self) -> usize {
95 1
96 }
97}
98
99impl IdCodec for u16 {
100 fn encode_id(&self, buf: &mut impl BufMut) {
101 buf.put_u16(*self);
102 }
103
104 fn decode_id(buf: &mut impl Buf) -> Option<Self> {
105 if buf.remaining() >= 2 {
106 Some(buf.get_u16())
107 } else {
108 None
109 }
110 }
111
112 fn encoded_id_len(&self) -> usize {
113 2
114 }
115}
116
117impl IdCodec for u32 {
118 fn encode_id(&self, buf: &mut impl BufMut) {
119 buf.put_u32(*self);
120 }
121
122 fn decode_id(buf: &mut impl Buf) -> Option<Self> {
123 if buf.remaining() >= 4 {
124 Some(buf.get_u32())
125 } else {
126 None
127 }
128 }
129
130 fn encoded_id_len(&self) -> usize {
131 4
132 }
133}
134
135impl IdCodec for u64 {
136 fn encode_id(&self, buf: &mut impl BufMut) {
137 buf.put_u64(*self);
138 }
139
140 fn decode_id(buf: &mut impl Buf) -> Option<Self> {
141 if buf.remaining() >= 8 {
142 Some(buf.get_u64())
143 } else {
144 None
145 }
146 }
147
148 fn encoded_id_len(&self) -> usize {
149 8
150 }
151}
152
153impl IdCodec for u128 {
154 fn encode_id(&self, buf: &mut impl BufMut) {
155 buf.put_u128(*self);
156 }
157
158 fn decode_id(buf: &mut impl Buf) -> Option<Self> {
159 if buf.remaining() >= 16 {
160 Some(buf.get_u128())
161 } else {
162 None
163 }
164 }
165
166 fn encoded_id_len(&self) -> usize {
167 16
168 }
169}
170
171// Implement IdCodec for String (variable-length)
172impl IdCodec for String {
173 fn encode_id(&self, buf: &mut impl BufMut) {
174 let bytes = self.as_bytes();
175 buf.put_u16(bytes.len() as u16);
176 buf.put_slice(bytes);
177 }
178
179 fn decode_id(buf: &mut impl Buf) -> Option<Self> {
180 if buf.remaining() < 2 {
181 return None;
182 }
183 let len = buf.get_u16() as usize;
184 if buf.remaining() < len {
185 return None;
186 }
187 let bytes = buf.copy_to_bytes(len);
188 String::from_utf8(bytes.to_vec()).ok()
189 }
190
191 fn encoded_id_len(&self) -> usize {
192 2 + self.len()
193 }
194}
195
196/// Magic byte prefix for Plumtree messages.
197///
198/// Used to distinguish Plumtree protocol messages from user messages.
199const PLUMTREE_MAGIC: u8 = 0x50;
200
201/// Outgoing broadcast envelope (unencooded).
202///
203/// This represents a Plumtree message that should be broadcast to all cluster
204/// members via memberlist's gossip/broadcast mechanism.
205///
206/// Use [`encode()`](Self::encode) to serialize for network transmission.
207#[derive(Debug, Clone)]
208pub struct BroadcastEnvelope<I> {
209 /// Sender's node ID (included in envelope for protocol operation).
210 pub sender: I,
211 /// The Plumtree protocol message.
212 pub message: PlumtreeMessage,
213}
214
215impl<I: IdCodec> BroadcastEnvelope<I> {
216 /// Encode this envelope for network transmission.
217 ///
218 /// Returns bytes that can be passed to memberlist's broadcast API.
219 pub fn encode(&self) -> Bytes {
220 encode_plumtree_envelope(&self.sender, &self.message)
221 }
222}
223
224/// Outgoing unicast envelope (unencooded).
225///
226/// Used internally to defer serialization until the message is actually
227/// sent over the network, reducing unnecessary allocations.
228#[derive(Debug, Clone)]
229pub(crate) struct UnicastEnvelope<I> {
230 /// Sender's node ID (included in envelope for protocol operation).
231 sender: I,
232 /// Target peer to send to.
233 target: I,
234 /// The Plumtree protocol message.
235 message: PlumtreeMessage,
236}
237
238impl<I: IdCodec> UnicastEnvelope<I> {
239 /// Encode this envelope for network transmission.
240 fn encode(&self) -> Bytes {
241 encode_plumtree_envelope(&self.sender, &self.message)
242 }
243}
244
245/// Combined Plumtree + Memberlist system.
246///
247/// Wraps a memberlist instance and provides efficient O(n) broadcast
248/// via the Plumtree protocol while using memberlist for membership.
249///
250/// # Recommended Usage (High-Performance Integration)
251///
252/// For production, it is recommended to combine Plumtree with a reliable unicast
253/// transport (like QUIC) and a pooling layer for backpressure control.
254///
255/// ```ignore
256/// use memberlist_plumtree::{
257/// PlumtreeMemberlist, PlumtreeConfig, NoopDelegate,
258/// QuicTransport, QuicConfig, PooledTransport, PoolConfig,
259/// MapPeerResolver, decode_plumtree_envelope
260/// };
261/// use std::sync::Arc;
262///
263/// // 1. Setup QUIC Transport with Connection Pooling
264/// // MapPeerResolver maps Node IDs to actual Socket Addresses
265/// let resolver = Arc::new(MapPeerResolver::new(local_addr));
266/// let quic_transport = QuicTransport::new(QuicConfig::default(), resolver, quinn_endpoint);
267///
268/// // 2. Wrap with PooledTransport for Concurrency & Queueing control
269/// let pool_config = PoolConfig::default()
270/// .with_max_concurrent_global(100)
271/// .with_max_queue_per_peer(512);
272/// let transport = PooledTransport::new(quic_transport, pool_config);
273///
274/// // 3. Initialize PlumtreeMemberlist
275/// let pm = Arc::new(PlumtreeMemberlist::new(local_id, PlumtreeConfig::lan(), NoopDelegate));
276///
277/// // 4. Start the runners
278/// // run_with_transport automatically drives the internal PlumtreeRunner (runner.rs)
279/// // and pumps Unicast messages (Graft/Prune) through your PooledTransport.
280/// let pm_run = pm.clone();
281/// let transport_clone = transport.clone();
282/// tokio::spawn(async move {
283/// pm_run.run_with_transport(transport_clone).await
284/// });
285///
286/// // Handle incoming message processor
287/// let pm_proc = pm.clone();
288/// tokio::spawn(async move { pm_proc.run_incoming_processor().await });
289///
290/// // 5. Bridge Broadcasts to Memberlist Gossip
291/// // While Unicast uses QUIC, common Gossip/IHave still use memberlist's UDP channel.
292/// let pm_out = pm.clone();
293/// let ml = memberlist_instance.clone();
294/// tokio::spawn(async move {
295/// let mut broadcast_rx = pm_out.outgoing_receiver_raw();
296/// while let Ok(envelope) = broadcast_rx.recv().await {
297/// ml.broadcast(envelope.encode()).await;
298/// }
299/// });
300///
301/// // 6. Inbound Injection (Inside your NodeDelegate or Transport Acceptor)
302/// // if let Some((sender_id, message)) = decode_plumtree_envelope::<u64>(&raw_data) {
303/// // pm.incoming_sender().send((sender_id, message)).await.ok();
304/// // }
305/// ```
306pub struct PlumtreeMemberlist<I, PD>
307where
308 I: Id,
309{
310 /// Plumtree broadcast layer.
311 plumtree: Plumtree<I, PlumtreeEventHandler<I, PD>>,
312 /// Handle for message I/O.
313 handle: PlumtreeHandle<I>,
314 /// Channel for sending incoming Plumtree messages to be processed.
315 incoming_tx: async_channel::Sender<(I, PlumtreeMessage)>,
316 /// Channel for receiving incoming Plumtree messages from memberlist.
317 incoming_rx: async_channel::Receiver<(I, PlumtreeMessage)>,
318 /// Channel for outgoing broadcast messages (unencooded, serialization deferred).
319 outgoing_rx: async_channel::Receiver<BroadcastEnvelope<I>>,
320 /// Sender for outgoing broadcast messages.
321 outgoing_tx: async_channel::Sender<BroadcastEnvelope<I>>,
322 /// Channel for outgoing unicast messages (unencooded, serialization deferred).
323 /// These MUST be sent directly to the target peer, NOT broadcast!
324 unicast_rx: async_channel::Receiver<UnicastEnvelope<I>>,
325 /// Sender for outgoing unicast messages.
326 unicast_tx: async_channel::Sender<UnicastEnvelope<I>>,
327}
328
329impl<I, PD> PlumtreeMemberlist<I, PD>
330where
331 I: Id + IdCodec + Clone + Eq + Hash + Debug + Send + Sync + 'static,
332 PD: PlumtreeDelegate<I>,
333{
334 /// Create a new PlumtreeMemberlist instance.
335 ///
336 /// # Arguments
337 ///
338 /// * `local_id` - The local node's identifier
339 /// * `config` - Plumtree configuration (hash ring is automatically enabled)
340 /// * `delegate` - Application delegate for message delivery
341 ///
342 /// # Note
343 ///
344 /// Hash ring topology is automatically enabled for `PlumtreeMemberlist` to ensure
345 /// deterministic peer selection and Z≥2 redundancy guarantees.
346 pub fn new(local_id: I, config: PlumtreeConfig, delegate: PD) -> Self {
347 // Create channels for communication
348 let (incoming_tx, incoming_rx) = async_channel::bounded(1024);
349 let (outgoing_tx, outgoing_rx) = async_channel::bounded(1024);
350 let (unicast_tx, unicast_rx) = async_channel::bounded(1024);
351
352 // Create the event handler that wraps the user delegate
353 let event_handler = PlumtreeEventHandler::new(delegate);
354
355 // Force hash ring topology for PlumtreeMemberlist
356 let config = config.with_hash_ring(true);
357
358 // Create the Plumtree instance
359 let (plumtree, handle) = Plumtree::new(local_id, config, event_handler);
360
361 Self {
362 plumtree,
363 handle,
364 incoming_tx,
365 incoming_rx,
366 outgoing_rx,
367 outgoing_tx,
368 unicast_rx,
369 unicast_tx,
370 }
371 }
372
373 /// Get the sender for incoming Plumtree messages.
374 ///
375 /// Pass this to `PlumtreeNodeDelegate` to forward received messages.
376 pub fn incoming_sender(&self) -> async_channel::Sender<(I, PlumtreeMessage)> {
377 self.incoming_tx.clone()
378 }
379
380 /// Get the receiver for outgoing broadcast envelopes.
381 ///
382 /// These messages have no specific target and should be disseminated
383 /// via memberlist's gossip/broadcast mechanism.
384 ///
385 /// **Note**: Messages are unencooded. Call `.encode()` on the envelope
386 /// to get bytes for transmission.
387 ///
388 /// Pass this to `PlumtreeNodeDelegate` for memberlist broadcast.
389 #[allow(dead_code)]
390 pub(crate) fn outgoing_receiver_raw(&self) -> async_channel::Receiver<BroadcastEnvelope<I>> {
391 self.outgoing_rx.clone()
392 }
393
394 /// Get the receiver for outgoing unicast envelopes.
395 ///
396 /// **CRITICAL**: These messages MUST be sent directly to the specified
397 /// target peer using memberlist's direct send API (e.g., `send_reliable`,
398 /// `send_to`, or similar). DO NOT broadcast these messages!
399 ///
400 /// **Note**: Messages are unencooded. Call `.encode()` on the envelope
401 /// to get bytes, and use `.target` to get the destination peer.
402 ///
403 /// # Example
404 ///
405 /// ```ignore
406 /// let unicast_rx = pm.unicast_receiver_raw();
407 /// while let Ok(envelope) = unicast_rx.recv().await {
408 /// let target = envelope.target.clone();
409 /// let encoded = envelope.encode();
410 /// // Use memberlist's direct send, NOT broadcast!
411 /// memberlist.send_reliable(&target, encoded).await;
412 /// }
413 /// ```
414 #[allow(dead_code)]
415 pub(crate) fn unicast_receiver_raw(&self) -> async_channel::Receiver<UnicastEnvelope<I>> {
416 self.unicast_rx.clone()
417 }
418
419 /// Get the outgoing broadcast envelope sender.
420 #[allow(dead_code)]
421 pub(crate) fn outgoing_sender_raw(&self) -> async_channel::Sender<BroadcastEnvelope<I>> {
422 self.outgoing_tx.clone()
423 }
424
425 /// Get the outgoing unicast envelope sender.
426 #[allow(dead_code)]
427 pub(crate) fn unicast_sender_raw(&self) -> async_channel::Sender<UnicastEnvelope<I>> {
428 self.unicast_tx.clone()
429 }
430
431 /// Broadcast a message to all nodes in the cluster.
432 ///
433 /// Uses Plumtree's efficient O(n) spanning tree broadcast.
434 pub async fn broadcast(&self, payload: impl Into<Bytes>) -> Result<MessageId> {
435 self.plumtree.broadcast(payload).await
436 }
437
438 /// Handle an incoming Plumtree message from the network.
439 ///
440 /// This should be called when a Plumtree message is received via memberlist.
441 pub async fn handle_message(&self, from: I, message: PlumtreeMessage) -> Result<()> {
442 self.plumtree.handle_message(from, message).await
443 }
444
445 /// Get a reference to the underlying Plumtree instance.
446 pub fn plumtree(&self) -> &Plumtree<I, PlumtreeEventHandler<I, PD>> {
447 &self.plumtree
448 }
449
450 /// Get a reference to the Plumtree handle.
451 pub fn handle(&self) -> &PlumtreeHandle<I> {
452 &self.handle
453 }
454
455 /// Get the Plumtree configuration.
456 pub fn config(&self) -> &PlumtreeConfig {
457 self.plumtree.config()
458 }
459
460 /// Get peer statistics.
461 pub fn peer_stats(&self) -> crate::peer_state::PeerStats {
462 self.plumtree.peer_stats()
463 }
464
465 /// Get cache statistics.
466 pub fn cache_stats(&self) -> crate::message::CacheStats {
467 self.plumtree.cache_stats()
468 }
469
470 /// Add a peer to the Plumtree overlay with automatic classification.
471 ///
472 /// Called when a node joins the memberlist cluster.
473 /// The peer is automatically classified as eager or lazy based on
474 /// the current state and configuration.
475 ///
476 /// Returns the result of the add operation.
477 pub fn add_peer(&self, peer: I) -> crate::peer_state::AddPeerResult {
478 self.plumtree.add_peer(peer)
479 }
480
481 /// Add a peer to the lazy set only (traditional behavior).
482 ///
483 /// This bypasses the `max_peers` limit check and auto-classification.
484 pub fn add_peer_lazy(&self, peer: I) {
485 self.plumtree.add_peer_lazy(peer);
486 }
487
488 /// Remove a peer from the Plumtree overlay.
489 ///
490 /// Called when a node leaves the memberlist cluster.
491 pub fn remove_peer(&self, peer: &I) {
492 self.plumtree.remove_peer(peer);
493 }
494
495 /// Get a reference to the shared peer state.
496 ///
497 /// This returns the Plumtree's internal peer state, which tracks
498 /// eager and lazy peers for message routing.
499 pub fn peers(&self) -> &Arc<PeerState<I>> {
500 self.plumtree.peers()
501 }
502
503 /// Wrap a memberlist delegate with Plumtree integration.
504 ///
505 /// This creates a `PlumtreeNodeDelegate` that wraps your delegate and
506 /// automatically synchronizes peer state when nodes join or leave the cluster.
507 ///
508 /// The wrapped delegate should be passed to memberlist when creating it.
509 ///
510 /// # Arguments
511 ///
512 /// * `delegate` - Your memberlist delegate implementation
513 ///
514 /// # Example
515 ///
516 /// ```ignore
517 /// use memberlist_plumtree::{PlumtreeMemberlist, PlumtreeConfig};
518 ///
519 /// // Create PlumtreeMemberlist
520 /// let pm = PlumtreeMemberlist::new(node_id, PlumtreeConfig::default(), my_plumtree_delegate);
521 ///
522 /// // Wrap your memberlist delegate
523 /// let wrapped = pm.wrap_delegate(my_memberlist_delegate);
524 ///
525 /// // Create memberlist with the wrapped delegate
526 /// let memberlist = Memberlist::new(wrapped, memberlist_config).await?;
527 ///
528 /// // Run PlumtreeMemberlist with transport
529 /// pm.run_with_transport(transport).await;
530 /// ```
531 pub fn wrap_delegate<A, D>(&self, delegate: D) -> PlumtreeNodeDelegate<I, A, D>
532 where
533 A: Send + Sync + 'static,
534 {
535 let config = self.plumtree.config();
536 PlumtreeNodeDelegate::new(
537 delegate,
538 self.incoming_tx.clone(),
539 self.outgoing_rx.clone(),
540 self.plumtree.peers().clone(),
541 config.eager_fanout,
542 config.max_peers,
543 )
544 }
545
546 /// Run the Plumtree background tasks (manual unicast handling).
547 ///
548 /// This runs the IHave scheduler, Graft timer, and outgoing processor.
549 /// Unicast messages are sent to `unicast_receiver()` channel.
550 ///
551 /// **WARNING**: You MUST process messages from `unicast_receiver()` and send them
552 /// directly to the target peer. Failing to do so will break the protocol.
553 ///
554 /// For safer usage, prefer [`run_with_transport`](Self::run_with_transport) which
555 /// handles unicast delivery automatically.
556 pub async fn run(&self) {
557 futures::future::join4(
558 self.plumtree.run_ihave_scheduler(),
559 self.plumtree.run_graft_timer(),
560 self.plumtree.run_seen_cleanup(),
561 self.run_outgoing_processor(),
562 )
563 .await;
564 }
565
566 /// Run the Plumtree background tasks with automatic unicast handling.
567 ///
568 /// This is the **recommended** way to run PlumtreeMemberlist. It handles
569 /// unicast message delivery automatically using the provided transport,
570 /// preventing protocol failures from forgotten unicast handling.
571 ///
572 /// # Arguments
573 ///
574 /// * `transport` - Implementation of [`Transport`](crate::Transport) for unicast delivery
575 ///
576 /// # Example
577 ///
578 /// ```ignore
579 /// use memberlist_plumtree::{PlumtreeMemberlist, Transport};
580 ///
581 /// struct MyTransport { memberlist: Memberlist }
582 ///
583 /// impl Transport<NodeId> for MyTransport {
584 /// type Error = MyError;
585 /// async fn send_to(&self, target: &NodeId, data: Bytes) -> Result<(), Self::Error> {
586 /// self.memberlist.send_reliable(target, data).await
587 /// }
588 /// }
589 ///
590 /// // No need to handle unicast_receiver() - it's done automatically!
591 /// let transport = MyTransport { memberlist };
592 /// pm.run_with_transport(transport).await;
593 /// ```
594 pub async fn run_with_transport<T>(&self, transport: T)
595 where
596 T: crate::Transport<I>,
597 {
598 futures::future::join5(
599 self.plumtree.run_ihave_scheduler(),
600 self.plumtree.run_graft_timer(),
601 self.plumtree.run_seen_cleanup(),
602 self.run_outgoing_processor(),
603 self.run_unicast_sender(transport),
604 )
605 .await;
606 }
607
608 /// Internal task that sends unicast messages via the provided transport.
609 ///
610 /// Serialization is deferred to this point to minimize allocations.
611 async fn run_unicast_sender<T>(&self, transport: T)
612 where
613 T: crate::Transport<I>,
614 {
615 while let Ok(envelope) = self.unicast_rx.recv().await {
616 // Encode at the last moment before network transmission
617 let encoded = envelope.encode();
618 if let Err(e) = transport.send_to(&envelope.target, encoded).await {
619 tracing::warn!(
620 "failed to send unicast message to {:?}: {}",
621 envelope.target,
622 e
623 );
624 }
625 }
626 }
627
628 /// Run the outgoing message processor.
629 ///
630 /// Routes outgoing messages to the appropriate channel:
631 /// - Unicast messages (with target) -> unicast_tx
632 /// - Broadcast messages (no target) -> outgoing_tx
633 ///
634 /// Messages are passed as unencooded envelopes. Serialization is deferred
635 /// until the message is actually sent over the network to minimize
636 /// unnecessary allocations.
637 async fn run_outgoing_processor(&self) {
638 let local_id = self.plumtree.local_id().clone();
639
640 while let Some(outgoing) = self.handle.next_outgoing().await {
641 if let Some(target) = outgoing.target {
642 // Unicast: create envelope with target (encoding deferred)
643 let envelope = UnicastEnvelope {
644 sender: local_id.clone(),
645 target,
646 message: outgoing.message,
647 };
648 if self.unicast_tx.send(envelope).await.is_err() {
649 // Channel closed
650 break;
651 }
652 } else {
653 // Broadcast: create envelope (encoding deferred)
654 let envelope = BroadcastEnvelope {
655 sender: local_id.clone(),
656 message: outgoing.message,
657 };
658 if self.outgoing_tx.send(envelope).await.is_err() {
659 // Channel closed
660 break;
661 }
662 }
663 }
664 }
665
666 /// Process incoming messages from the incoming channel.
667 ///
668 /// Should be spawned as a background task if using channels.
669 pub async fn run_incoming_processor(&self) {
670 while let Ok((from, message)) = self.incoming_rx.recv().await {
671 if let Err(e) = self.plumtree.handle_message(from, message).await {
672 tracing::warn!("failed to handle plumtree message: {}", e);
673 }
674 }
675 }
676
677 /// Shutdown the Plumtree layer.
678 pub fn shutdown(&self) {
679 self.plumtree.shutdown();
680 self.outgoing_tx.close();
681 self.unicast_tx.close();
682 }
683
684 /// Check if shutdown has been requested.
685 pub fn is_shutdown(&self) -> bool {
686 self.plumtree.is_shutdown()
687 }
688}
689
690/// Delegate that intercepts messages for Plumtree protocol.
691///
692/// Wraps a user delegate and handles Plumtree message routing.
693/// Use this when creating a memberlist to integrate Plumtree.
694pub struct PlumtreeNodeDelegate<I, A, D> {
695 /// Inner user delegate.
696 inner: D,
697 /// Channel to send received Plumtree messages to PlumtreeMemberlist.
698 plumtree_tx: async_channel::Sender<(I, PlumtreeMessage)>,
699 /// Outgoing Plumtree messages to broadcast via memberlist (unencooded).
700 /// Serialization is deferred until messages are sent over the network.
701 outgoing_rx: async_channel::Receiver<BroadcastEnvelope<I>>,
702 /// Shared peer state for synchronizing membership changes.
703 /// When memberlist detects a node join/leave, this is updated to keep
704 /// Plumtree's peer state in sync.
705 peers: Arc<PeerState<I>>,
706 /// Target number of eager peers for auto-classification.
707 eager_fanout: usize,
708 /// Maximum total peers allowed (None = unlimited).
709 max_peers: Option<usize>,
710 /// Marker.
711 _marker: PhantomData<A>,
712}
713
714impl<I, A, D> PlumtreeNodeDelegate<I, A, D> {
715 /// Create a new Plumtree node delegate.
716 ///
717 /// This wraps a user's memberlist delegate to automatically synchronize
718 /// peer state when nodes join or leave the cluster.
719 ///
720 /// # Arguments
721 ///
722 /// * `inner` - The user's delegate implementation
723 /// * `plumtree_tx` - Sender for forwarding received Plumtree messages
724 /// * `outgoing_rx` - Receiver for outgoing Plumtree messages to broadcast
725 /// * `peers` - Shared peer state for synchronizing membership changes
726 /// * `eager_fanout` - Target number of eager peers
727 /// * `max_peers` - Maximum total peers allowed (None = unlimited)
728 ///
729 /// # Example
730 ///
731 /// ```ignore
732 /// // Create PlumtreeMemberlist first
733 /// let pm = PlumtreeMemberlist::new(node_id, config, delegate);
734 ///
735 /// // Wrap your memberlist delegate with PlumtreeNodeDelegate
736 /// let wrapped_delegate = pm.wrap_delegate(your_memberlist_delegate);
737 ///
738 /// // Use wrapped_delegate when creating memberlist
739 /// let memberlist = Memberlist::new(wrapped_delegate, memberlist_config).await?;
740 /// ```
741 pub fn new(
742 inner: D,
743 plumtree_tx: async_channel::Sender<(I, PlumtreeMessage)>,
744 outgoing_rx: async_channel::Receiver<BroadcastEnvelope<I>>,
745 peers: Arc<PeerState<I>>,
746 eager_fanout: usize,
747 max_peers: Option<usize>,
748 ) -> Self {
749 Self {
750 inner,
751 plumtree_tx,
752 outgoing_rx,
753 peers,
754 eager_fanout,
755 max_peers,
756 _marker: PhantomData,
757 }
758 }
759
760 /// Get a reference to the inner delegate.
761 pub fn inner(&self) -> &D {
762 &self.inner
763 }
764
765 /// Get a clone of the plumtree message sender.
766 ///
767 /// This can be used to forward Plumtree messages from external sources.
768 pub fn plumtree_sender(&self) -> async_channel::Sender<(I, PlumtreeMessage)> {
769 self.plumtree_tx.clone()
770 }
771
772 /// Get a clone of the outgoing envelope receiver.
773 ///
774 /// This can be used to receive outgoing Plumtree messages from external sources.
775 /// Messages are unencooded - call `.encode()` on the envelope to get bytes.
776 #[allow(dead_code)]
777 pub(crate) fn outgoing_receiver(&self) -> async_channel::Receiver<BroadcastEnvelope<I>> {
778 self.outgoing_rx.clone()
779 }
780}
781
782impl<I, A, D> NodeDelegate for PlumtreeNodeDelegate<I, A, D>
783where
784 I: Id + IdCodec + Clone + Send + Sync + 'static,
785 A: CheapClone + Send + Sync + 'static,
786 D: NodeDelegate,
787{
788 async fn node_meta(&self, limit: usize) -> Meta {
789 self.inner.node_meta(limit).await
790 }
791
792 async fn notify_message(&self, msg: Cow<'_, [u8]>) {
793 // Check if this is a Plumtree message
794 if msg.len() > 1 && msg[0] == PLUMTREE_MAGIC {
795 // Decode Plumtree envelope (sender_id + message)
796 if let Some((sender, plumtree_msg)) = decode_plumtree_envelope::<I>(&msg) {
797 tracing::trace!(
798 "received plumtree {:?} from {:?}",
799 plumtree_msg.tag(),
800 sender
801 );
802 // Forward to Plumtree for proper handling with sender identity
803 if self.plumtree_tx.send((sender, plumtree_msg)).await.is_err() {
804 tracing::warn!("plumtree channel closed, cannot forward message");
805 }
806 } else {
807 tracing::warn!("failed to decode plumtree envelope");
808 }
809 } else {
810 // Forward non-Plumtree messages to inner delegate
811 self.inner.notify_message(msg).await;
812 }
813 }
814
815 async fn broadcast_messages<F>(
816 &self,
817 limit: usize,
818 encoded_len: F,
819 ) -> impl Iterator<Item = Bytes> + Send
820 where
821 F: Fn(Bytes) -> (usize, Bytes) + Send + Sync + 'static,
822 {
823 // Use proportional limiting to prevent Plumtree from starving user messages.
824 // Reserve at least 30% of bandwidth for user messages to ensure they always
825 // have a chance to be sent, even under heavy Plumtree control traffic.
826 const PLUMTREE_MAX_RATIO: usize = 70;
827 let plumtree_limit = (limit * PLUMTREE_MAX_RATIO) / 100;
828 let user_reserved = limit - plumtree_limit;
829
830 // Collect Plumtree outgoing messages up to proportional limit
831 // Serialization is deferred to here - encode just before network transmission
832 let mut plumtree_msgs = Vec::new();
833 let mut plumtree_used = 0;
834
835 while let Ok(envelope) = self.outgoing_rx.try_recv() {
836 // Encode at the last moment before network transmission
837 let encoded = envelope.encode();
838 let (len, msg) = encoded_len(encoded);
839 if plumtree_used + len <= plumtree_limit {
840 plumtree_used += len;
841 plumtree_msgs.push(msg);
842 } else {
843 // Plumtree hit its proportional limit, stop collecting
844 break;
845 }
846 }
847
848 // User gets reserved space plus any unused Plumtree quota
849 let user_limit = user_reserved + (plumtree_limit - plumtree_used);
850 let user_msgs = self.inner.broadcast_messages(user_limit, encoded_len).await;
851
852 // Combine both iterators
853 plumtree_msgs.into_iter().chain(user_msgs)
854 }
855
856 async fn local_state(&self, join: bool) -> Bytes {
857 self.inner.local_state(join).await
858 }
859
860 async fn merge_remote_state(&self, buf: &[u8], join: bool) {
861 self.inner.merge_remote_state(buf, join).await
862 }
863}
864
865impl<I, A, D> PingDelegate for PlumtreeNodeDelegate<I, A, D>
866where
867 I: Id + IdCodec + Clone + Send + Sync + 'static,
868 A: CheapClone + Send + Sync + 'static,
869 D: PingDelegate<Id = I, Address = A>,
870{
871 type Id = I;
872 type Address = A;
873
874 async fn ack_payload(&self) -> Bytes {
875 self.inner.ack_payload().await
876 }
877
878 async fn notify_ping_complete(
879 &self,
880 node: Arc<NodeState<Self::Id, Self::Address>>,
881 rtt: std::time::Duration,
882 payload: Bytes,
883 ) {
884 self.inner.notify_ping_complete(node, rtt, payload).await
885 }
886
887 fn disable_reliable_pings(&self, target: &Self::Id) -> bool {
888 self.inner.disable_reliable_pings(target)
889 }
890}
891
892impl<I, A, D> EventDelegate for PlumtreeNodeDelegate<I, A, D>
893where
894 I: Id + IdCodec + Clone + Ord + Send + Sync + 'static,
895 A: CheapClone + Send + Sync + 'static,
896 D: EventDelegate<Id = I, Address = A>,
897{
898 type Id = I;
899 type Address = A;
900
901 async fn notify_join(&self, node: Arc<NodeState<Self::Id, Self::Address>>) {
902 // Sync: Add to Plumtree peers with hash ring-based auto-classification
903 // This ensures new peers are placed correctly in the topology
904 self.peers
905 .add_peer_auto(node.id().clone(), self.max_peers, self.eager_fanout);
906
907 self.inner.notify_join(node).await;
908 }
909
910 async fn notify_leave(&self, node: Arc<NodeState<Self::Id, Self::Address>>) {
911 // Sync: Remove from Plumtree peers immediately to stop sending to dead node
912 let result = self.peers.remove_peer(node.id());
913
914 // If an eager peer was removed, rebalance to promote a lazy peer
915 // This maintains tree connectivity when eager peers leave
916 if result.was_eager() {
917 self.peers.rebalance(self.eager_fanout);
918 }
919
920 self.inner.notify_leave(node).await;
921 }
922
923 async fn notify_update(&self, node: Arc<NodeState<Self::Id, Self::Address>>) {
924 self.inner.notify_update(node).await;
925 }
926}
927
928impl<I, A, D> ConflictDelegate for PlumtreeNodeDelegate<I, A, D>
929where
930 I: Id + IdCodec + Clone + Send + Sync + 'static,
931 A: CheapClone + Send + Sync + 'static,
932 D: ConflictDelegate<Id = I, Address = A>,
933{
934 type Id = I;
935 type Address = A;
936
937 async fn notify_conflict(
938 &self,
939 existing: Arc<NodeState<Self::Id, Self::Address>>,
940 other: Arc<NodeState<Self::Id, Self::Address>>,
941 ) {
942 self.inner.notify_conflict(existing, other).await;
943 }
944}
945
946impl<I, A, D> AliveDelegate for PlumtreeNodeDelegate<I, A, D>
947where
948 I: Id + IdCodec + Clone + Send + Sync + 'static,
949 A: CheapClone + Send + Sync + 'static,
950 D: AliveDelegate<Id = I, Address = A>,
951{
952 type Error = D::Error;
953 type Id = I;
954 type Address = A;
955
956 async fn notify_alive(
957 &self,
958 peer: Arc<NodeState<Self::Id, Self::Address>>,
959 ) -> std::result::Result<(), Self::Error> {
960 self.inner.notify_alive(peer).await
961 }
962}
963
964impl<I, A, D> MergeDelegate for PlumtreeNodeDelegate<I, A, D>
965where
966 I: Id + IdCodec + Clone + Send + Sync + 'static,
967 A: CheapClone + Send + Sync + 'static,
968 D: MergeDelegate<Id = I, Address = A>,
969{
970 type Error = D::Error;
971 type Id = I;
972 type Address = A;
973
974 async fn notify_merge(
975 &self,
976 peers: Arc<[NodeState<Self::Id, Self::Address>]>,
977 ) -> std::result::Result<(), Self::Error> {
978 self.inner.notify_merge(peers).await
979 }
980}
981
982impl<I, A, D> Delegate for PlumtreeNodeDelegate<I, A, D>
983where
984 I: Id + IdCodec + Clone + Send + Sync + 'static,
985 A: CheapClone + Send + Sync + 'static,
986 D: Delegate<Id = I, Address = A>,
987{
988 type Id = I;
989 type Address = A;
990}
991
992/// Event handler that synchronizes memberlist events to Plumtree.
993///
994/// Wraps a user's PlumtreeDelegate to forward events.
995pub struct PlumtreeEventHandler<I, PD> {
996 /// Inner Plumtree delegate for application events.
997 inner: PD,
998 /// Marker for I type parameter.
999 _marker: std::marker::PhantomData<I>,
1000}
1001
1002impl<I, PD> PlumtreeEventHandler<I, PD> {
1003 /// Create a new event handler.
1004 pub fn new(inner: PD) -> Self {
1005 Self {
1006 inner,
1007 _marker: std::marker::PhantomData,
1008 }
1009 }
1010}
1011
1012impl<I, PD> PlumtreeDelegate<I> for PlumtreeEventHandler<I, PD>
1013where
1014 I: Clone + Eq + Hash + Send + Sync + 'static,
1015 PD: PlumtreeDelegate<I>,
1016{
1017 fn on_deliver(&self, message_id: MessageId, payload: Bytes) {
1018 self.inner.on_deliver(message_id, payload);
1019 }
1020
1021 fn on_eager_promotion(&self, peer: &I) {
1022 self.inner.on_eager_promotion(peer);
1023 }
1024
1025 fn on_lazy_demotion(&self, peer: &I) {
1026 self.inner.on_lazy_demotion(peer);
1027 }
1028
1029 fn on_graft_sent(&self, peer: &I, message_id: &MessageId) {
1030 self.inner.on_graft_sent(peer, message_id);
1031 }
1032
1033 fn on_prune_sent(&self, peer: &I) {
1034 self.inner.on_prune_sent(peer);
1035 }
1036
1037 fn on_graft_failed(&self, message_id: &MessageId, peer: &I) {
1038 self.inner.on_graft_failed(message_id, peer);
1039 }
1040}
1041
1042/// Encode a Plumtree message for transmission via memberlist.
1043///
1044/// Format: `[MAGIC][message]`
1045///
1046/// **Note**: This function does NOT include sender identity. For proper protocol
1047/// operation, use [`encode_plumtree_envelope`] which includes the sender ID.
1048pub fn encode_plumtree_message(msg: &PlumtreeMessage) -> Bytes {
1049 let encoded = msg.encode_to_bytes();
1050 let mut buf = BytesMut::with_capacity(1 + encoded.len());
1051 buf.put_u8(PLUMTREE_MAGIC);
1052 buf.extend_from_slice(&encoded);
1053 buf.freeze()
1054}
1055
1056/// Encode a Plumtree message with sender identity for transmission.
1057///
1058/// Format: `[MAGIC][sender_id][message]`
1059///
1060/// This is the **recommended** encoding function as it includes the sender's
1061/// node ID, which is critical for proper Plumtree protocol operation.
1062///
1063/// # Zero-Copy Optimization
1064///
1065/// This function encodes directly into a single pre-sized buffer, avoiding
1066/// intermediate allocations. The buffer size is calculated upfront using
1067/// [`envelope_encoded_len`] to ensure a single allocation.
1068pub fn encode_plumtree_envelope<I: IdCodec>(sender: &I, msg: &PlumtreeMessage) -> Bytes {
1069 let id_len = sender.encoded_id_len();
1070 let msg_len = msg.encoded_len();
1071 let mut buf = BytesMut::with_capacity(1 + id_len + msg_len);
1072 buf.put_u8(PLUMTREE_MAGIC);
1073 sender.encode_id(&mut buf);
1074 // Encode message directly into buffer (zero-copy, no intermediate allocation)
1075 msg.encode(&mut buf);
1076 buf.freeze()
1077}
1078
1079/// Encode a Plumtree envelope directly into an existing buffer.
1080///
1081/// This is useful for buffer pooling scenarios where you want to reuse
1082/// buffers to avoid allocation overhead.
1083///
1084/// Format: `[MAGIC][sender_id][message]`
1085///
1086/// # Arguments
1087///
1088/// * `sender` - The sender's node ID
1089/// * `msg` - The Plumtree message to encode
1090/// * `buf` - The buffer to encode into (must have sufficient capacity)
1091///
1092/// # Returns
1093///
1094/// The number of bytes written to the buffer.
1095pub fn encode_plumtree_envelope_into<I: IdCodec>(
1096 sender: &I,
1097 msg: &PlumtreeMessage,
1098 buf: &mut impl BufMut,
1099) -> usize {
1100 let start_len = 1 + sender.encoded_id_len() + msg.encoded_len();
1101 buf.put_u8(PLUMTREE_MAGIC);
1102 sender.encode_id(buf);
1103 msg.encode(buf);
1104 start_len
1105}
1106
1107/// Calculate the encoded length of a Plumtree envelope.
1108///
1109/// Use this to pre-allocate buffers of the correct size.
1110///
1111/// # Arguments
1112///
1113/// * `sender` - The sender's node ID
1114/// * `msg` - The Plumtree message
1115///
1116/// # Returns
1117///
1118/// The total number of bytes the encoded envelope will occupy.
1119pub fn envelope_encoded_len<I: IdCodec>(sender: &I, msg: &PlumtreeMessage) -> usize {
1120 1 + sender.encoded_id_len() + msg.encoded_len()
1121}
1122
1123/// Decode a Plumtree envelope extracting sender ID and message.
1124///
1125/// Format: `[MAGIC][sender_id][message]`
1126///
1127/// Returns `Some((sender, message))` on success, `None` on decode failure.
1128pub fn decode_plumtree_envelope<I: IdCodec>(data: &[u8]) -> Option<(I, PlumtreeMessage)> {
1129 if data.is_empty() || data[0] != PLUMTREE_MAGIC {
1130 return None;
1131 }
1132
1133 let mut cursor = std::io::Cursor::new(&data[1..]);
1134
1135 // Decode sender ID
1136 let sender = I::decode_id(&mut cursor)?;
1137
1138 // Decode message from remaining bytes
1139 let msg = PlumtreeMessage::decode(&mut cursor)?;
1140
1141 Some((sender, msg))
1142}
1143
1144/// Try to decode a Plumtree message from received bytes (without sender).
1145///
1146/// Use this for simple testing or when sender is tracked separately.
1147/// For production use, prefer [`decode_plumtree_envelope`] which extracts
1148/// the sender identity needed for proper protocol operation.
1149pub fn decode_plumtree_message(data: &[u8]) -> Option<PlumtreeMessage> {
1150 if data.len() > 1 && data[0] == PLUMTREE_MAGIC {
1151 PlumtreeMessage::decode_from_slice(&data[1..])
1152 } else {
1153 None
1154 }
1155}
1156
1157/// Check if data is a Plumtree message.
1158pub fn is_plumtree_message(data: &[u8]) -> bool {
1159 !data.is_empty() && data[0] == PLUMTREE_MAGIC
1160}
1161
1162#[cfg(test)]
1163mod tests {
1164 use super::*;
1165 use crate::NoopDelegate;
1166
1167 #[test]
1168 fn test_encode_decode() {
1169 let msg = PlumtreeMessage::Gossip {
1170 id: MessageId::new(),
1171 round: 5,
1172 payload: Bytes::from_static(b"hello"),
1173 };
1174
1175 let encoded = encode_plumtree_message(&msg);
1176 assert!(is_plumtree_message(&encoded));
1177
1178 let decoded = decode_plumtree_message(&encoded).unwrap();
1179 assert_eq!(msg, decoded);
1180 }
1181
1182 #[test]
1183 fn test_non_plumtree_message() {
1184 let data = b"regular user message";
1185 assert!(!is_plumtree_message(data));
1186 assert!(decode_plumtree_message(data).is_none());
1187 }
1188
1189 #[test]
1190 fn test_envelope_encode_decode() {
1191 let sender: u64 = 42;
1192 let msg = PlumtreeMessage::Gossip {
1193 id: MessageId::new(),
1194 round: 5,
1195 payload: Bytes::from_static(b"hello"),
1196 };
1197
1198 // Encode with sender ID
1199 let encoded = encode_plumtree_envelope(&sender, &msg);
1200 assert!(is_plumtree_message(&encoded));
1201
1202 // Decode and verify sender + message
1203 let (decoded_sender, decoded_msg): (u64, PlumtreeMessage) =
1204 decode_plumtree_envelope(&encoded).unwrap();
1205 assert_eq!(decoded_sender, sender);
1206 assert_eq!(decoded_msg, msg);
1207 }
1208
1209 #[test]
1210 fn test_envelope_with_string_id() {
1211 let sender = "node-1".to_string();
1212 let msg = PlumtreeMessage::IHave {
1213 message_ids: smallvec::smallvec![MessageId::new()],
1214 round: 3,
1215 };
1216
1217 let encoded = encode_plumtree_envelope(&sender, &msg);
1218 let (decoded_sender, decoded_msg): (String, PlumtreeMessage) =
1219 decode_plumtree_envelope(&encoded).unwrap();
1220
1221 assert_eq!(decoded_sender, sender);
1222 assert_eq!(decoded_msg, msg);
1223 }
1224
1225 #[test]
1226 fn test_envelope_graft_message() {
1227 let sender: u64 = 123;
1228 let msg = PlumtreeMessage::Graft {
1229 message_id: MessageId::new(),
1230 round: 7,
1231 };
1232
1233 let encoded = encode_plumtree_envelope(&sender, &msg);
1234 let (decoded_sender, decoded_msg): (u64, PlumtreeMessage) =
1235 decode_plumtree_envelope(&encoded).unwrap();
1236
1237 assert_eq!(decoded_sender, sender);
1238 assert_eq!(decoded_msg, msg);
1239 }
1240
1241 #[test]
1242 fn test_envelope_prune_message() {
1243 let sender: u64 = 999;
1244 let msg = PlumtreeMessage::Prune;
1245
1246 let encoded = encode_plumtree_envelope(&sender, &msg);
1247 let (decoded_sender, decoded_msg): (u64, PlumtreeMessage) =
1248 decode_plumtree_envelope(&encoded).unwrap();
1249
1250 assert_eq!(decoded_sender, sender);
1251 assert_eq!(decoded_msg, msg);
1252 }
1253
1254 #[test]
1255 fn test_plumtree_memberlist_creation() {
1256 let pm: PlumtreeMemberlist<u64, NoopDelegate> =
1257 PlumtreeMemberlist::new(1u64, PlumtreeConfig::default(), NoopDelegate);
1258
1259 assert_eq!(*pm.plumtree().local_id(), 1u64);
1260 assert!(!pm.is_shutdown());
1261 }
1262
1263 #[tokio::test]
1264 async fn test_plumtree_memberlist_broadcast() {
1265 let pm: PlumtreeMemberlist<u64, NoopDelegate> =
1266 PlumtreeMemberlist::new(1u64, PlumtreeConfig::default(), NoopDelegate);
1267
1268 // Add a peer
1269 pm.add_peer(2u64);
1270
1271 // Broadcast should succeed
1272 let msg_id = pm.broadcast(Bytes::from("test")).await.unwrap();
1273 assert!(msg_id.timestamp() > 0);
1274
1275 // Check stats
1276 let cache_stats = pm.cache_stats();
1277 assert_eq!(cache_stats.entries, 1);
1278 }
1279
1280 #[test]
1281 fn test_plumtree_memberlist_peer_management() {
1282 let pm: PlumtreeMemberlist<u64, NoopDelegate> =
1283 PlumtreeMemberlist::new(1u64, PlumtreeConfig::default(), NoopDelegate);
1284
1285 assert_eq!(pm.peer_stats().total(), 0);
1286
1287 pm.add_peer(2u64);
1288 pm.add_peer(3u64);
1289 assert_eq!(pm.peer_stats().total(), 2);
1290
1291 pm.remove_peer(&2u64);
1292 assert_eq!(pm.peer_stats().total(), 1);
1293 }
1294
1295 #[test]
1296 fn test_envelope_encoded_len() {
1297 let sender: u64 = 42;
1298 let msg = PlumtreeMessage::Gossip {
1299 id: MessageId::new(),
1300 round: 5,
1301 payload: Bytes::from_static(b"hello world"),
1302 };
1303
1304 // Verify encoded_len matches actual encoded length
1305 let calculated_len = envelope_encoded_len(&sender, &msg);
1306 let encoded = encode_plumtree_envelope(&sender, &msg);
1307 assert_eq!(calculated_len, encoded.len());
1308 }
1309
1310 #[test]
1311 fn test_encode_envelope_into_buffer() {
1312 let sender: u64 = 123;
1313 let msg = PlumtreeMessage::IHave {
1314 message_ids: smallvec::smallvec![MessageId::new(), MessageId::new()],
1315 round: 10,
1316 };
1317
1318 // Encode into a pre-allocated buffer
1319 let expected_len = envelope_encoded_len(&sender, &msg);
1320 let mut buf = BytesMut::with_capacity(expected_len);
1321 let written = encode_plumtree_envelope_into(&sender, &msg, &mut buf);
1322
1323 assert_eq!(written, expected_len);
1324 assert_eq!(buf.len(), expected_len);
1325
1326 // Verify it decodes correctly
1327 let (decoded_sender, decoded_msg): (u64, PlumtreeMessage) =
1328 decode_plumtree_envelope(&buf.freeze()).unwrap();
1329 assert_eq!(decoded_sender, sender);
1330 assert_eq!(decoded_msg, msg);
1331 }
1332
1333 #[test]
1334 fn test_zero_copy_encoding_produces_same_result() {
1335 // Verify the optimized encoding produces identical results
1336 let sender: u64 = 999;
1337 let msg = PlumtreeMessage::Gossip {
1338 id: MessageId::new(),
1339 round: 42,
1340 payload: Bytes::from_static(b"test payload"),
1341 };
1342
1343 // Both methods should produce identical output
1344 let encoded1 = encode_plumtree_envelope(&sender, &msg);
1345
1346 let mut buf = BytesMut::with_capacity(envelope_encoded_len(&sender, &msg));
1347 encode_plumtree_envelope_into(&sender, &msg, &mut buf);
1348 let encoded2 = buf.freeze();
1349
1350 assert_eq!(encoded1, encoded2);
1351 }
1352
1353 #[test]
1354 fn test_envelope_encoded_len_all_message_types() {
1355 let sender: u64 = 1;
1356
1357 // Gossip
1358 let gossip = PlumtreeMessage::Gossip {
1359 id: MessageId::new(),
1360 round: 0,
1361 payload: Bytes::from_static(b"test"),
1362 };
1363 assert_eq!(
1364 envelope_encoded_len(&sender, &gossip),
1365 encode_plumtree_envelope(&sender, &gossip).len()
1366 );
1367
1368 // IHave
1369 let ihave = PlumtreeMessage::IHave {
1370 message_ids: smallvec::smallvec![MessageId::new()],
1371 round: 0,
1372 };
1373 assert_eq!(
1374 envelope_encoded_len(&sender, &ihave),
1375 encode_plumtree_envelope(&sender, &ihave).len()
1376 );
1377
1378 // Graft
1379 let graft = PlumtreeMessage::Graft {
1380 message_id: MessageId::new(),
1381 round: 0,
1382 };
1383 assert_eq!(
1384 envelope_encoded_len(&sender, &graft),
1385 encode_plumtree_envelope(&sender, &graft).len()
1386 );
1387
1388 // Prune
1389 let prune = PlumtreeMessage::Prune;
1390 assert_eq!(
1391 envelope_encoded_len(&sender, &prune),
1392 encode_plumtree_envelope(&sender, &prune).len()
1393 );
1394 }
1395}