memberlist_plumtree/
integration.rs

1//! Integration with memberlist delegate system.
2//!
3//! This module provides the glue between Plumtree and memberlist,
4//! handling message routing and membership synchronization.
5//!
6//! # Sender Identity
7//!
8//! Since memberlist's broadcast mechanism doesn't provide sender information,
9//! Plumtree wraps messages in a `NetworkEnvelope` that includes the sender's
10//! node ID. This is critical for proper protocol operation because:
11//!
12//! - **IHave**: Sender must be known to promote to Eager and send Graft
13//! - **Graft**: Requester must be known to send data directly to them
14//! - **Prune**: Sender must be known to demote to Lazy
15//!
16//! The envelope is transparent to users - encoding/decoding happens automatically.
17
18use bytes::{Buf, BufMut, Bytes, BytesMut};
19use memberlist_core::{
20    delegate::{
21        AliveDelegate, ConflictDelegate, Delegate, EventDelegate, MergeDelegate, NodeDelegate,
22        PingDelegate,
23    },
24    proto::{Meta, NodeState},
25};
26use nodecraft::{CheapClone, Id};
27use std::{borrow::Cow, fmt::Debug, hash::Hash, marker::PhantomData, sync::Arc};
28
29use crate::{
30    config::PlumtreeConfig,
31    error::Result,
32    message::{MessageId, PlumtreeMessage},
33    peer_state::PeerState,
34    plumtree::{Plumtree, PlumtreeDelegate, PlumtreeHandle},
35};
36
37/// Trait for encoding and decoding node IDs for network transmission.
38///
39/// This trait must be implemented for any node ID type used with
40/// `PlumtreeMemberlist` to enable sender identity tracking.
41///
42/// # Example
43///
44/// ```rust
45/// use memberlist_plumtree::IdCodec;
46/// use bytes::{Buf, BufMut};
47///
48/// #[derive(Clone, Debug, PartialEq)]
49/// struct MyNodeId(u64);
50///
51/// impl IdCodec for MyNodeId {
52///     fn encode_id(&self, buf: &mut impl BufMut) {
53///         buf.put_u64(self.0);
54///     }
55///
56///     fn decode_id(buf: &mut impl Buf) -> Option<Self> {
57///         if buf.remaining() >= 8 {
58///             Some(MyNodeId(buf.get_u64()))
59///         } else {
60///             None
61///         }
62///     }
63///
64///     fn encoded_id_len(&self) -> usize {
65///         8
66///     }
67/// }
68/// ```
69pub trait IdCodec: Sized {
70    /// Encode this ID into the buffer.
71    fn encode_id(&self, buf: &mut impl BufMut);
72
73    /// Decode an ID from the buffer.
74    fn decode_id(buf: &mut impl Buf) -> Option<Self>;
75
76    /// Get the encoded length of this ID in bytes.
77    fn encoded_id_len(&self) -> usize;
78}
79
80// Implement IdCodec for common integer types
81impl IdCodec for u8 {
82    fn encode_id(&self, buf: &mut impl BufMut) {
83        buf.put_u8(*self);
84    }
85
86    fn decode_id(buf: &mut impl Buf) -> Option<Self> {
87        if buf.remaining() >= 1 {
88            Some(buf.get_u8())
89        } else {
90            None
91        }
92    }
93
94    fn encoded_id_len(&self) -> usize {
95        1
96    }
97}
98
99impl IdCodec for u16 {
100    fn encode_id(&self, buf: &mut impl BufMut) {
101        buf.put_u16(*self);
102    }
103
104    fn decode_id(buf: &mut impl Buf) -> Option<Self> {
105        if buf.remaining() >= 2 {
106            Some(buf.get_u16())
107        } else {
108            None
109        }
110    }
111
112    fn encoded_id_len(&self) -> usize {
113        2
114    }
115}
116
117impl IdCodec for u32 {
118    fn encode_id(&self, buf: &mut impl BufMut) {
119        buf.put_u32(*self);
120    }
121
122    fn decode_id(buf: &mut impl Buf) -> Option<Self> {
123        if buf.remaining() >= 4 {
124            Some(buf.get_u32())
125        } else {
126            None
127        }
128    }
129
130    fn encoded_id_len(&self) -> usize {
131        4
132    }
133}
134
135impl IdCodec for u64 {
136    fn encode_id(&self, buf: &mut impl BufMut) {
137        buf.put_u64(*self);
138    }
139
140    fn decode_id(buf: &mut impl Buf) -> Option<Self> {
141        if buf.remaining() >= 8 {
142            Some(buf.get_u64())
143        } else {
144            None
145        }
146    }
147
148    fn encoded_id_len(&self) -> usize {
149        8
150    }
151}
152
153impl IdCodec for u128 {
154    fn encode_id(&self, buf: &mut impl BufMut) {
155        buf.put_u128(*self);
156    }
157
158    fn decode_id(buf: &mut impl Buf) -> Option<Self> {
159        if buf.remaining() >= 16 {
160            Some(buf.get_u128())
161        } else {
162            None
163        }
164    }
165
166    fn encoded_id_len(&self) -> usize {
167        16
168    }
169}
170
171// Implement IdCodec for String (variable-length)
172impl IdCodec for String {
173    fn encode_id(&self, buf: &mut impl BufMut) {
174        let bytes = self.as_bytes();
175        buf.put_u16(bytes.len() as u16);
176        buf.put_slice(bytes);
177    }
178
179    fn decode_id(buf: &mut impl Buf) -> Option<Self> {
180        if buf.remaining() < 2 {
181            return None;
182        }
183        let len = buf.get_u16() as usize;
184        if buf.remaining() < len {
185            return None;
186        }
187        let bytes = buf.copy_to_bytes(len);
188        String::from_utf8(bytes.to_vec()).ok()
189    }
190
191    fn encoded_id_len(&self) -> usize {
192        2 + self.len()
193    }
194}
195
196/// Magic byte prefix for Plumtree messages.
197///
198/// Used to distinguish Plumtree protocol messages from user messages.
199const PLUMTREE_MAGIC: u8 = 0x50;
200
201/// Outgoing broadcast envelope (unencooded).
202///
203/// This represents a Plumtree message that should be broadcast to all cluster
204/// members via memberlist's gossip/broadcast mechanism.
205///
206/// Use [`encode()`](Self::encode) to serialize for network transmission.
207#[derive(Debug, Clone)]
208pub struct BroadcastEnvelope<I> {
209    /// Sender's node ID (included in envelope for protocol operation).
210    pub sender: I,
211    /// The Plumtree protocol message.
212    pub message: PlumtreeMessage,
213}
214
215impl<I: IdCodec> BroadcastEnvelope<I> {
216    /// Encode this envelope for network transmission.
217    ///
218    /// Returns bytes that can be passed to memberlist's broadcast API.
219    pub fn encode(&self) -> Bytes {
220        encode_plumtree_envelope(&self.sender, &self.message)
221    }
222}
223
224/// Outgoing unicast envelope (unencooded).
225///
226/// Used internally to defer serialization until the message is actually
227/// sent over the network, reducing unnecessary allocations.
228#[derive(Debug, Clone)]
229pub(crate) struct UnicastEnvelope<I> {
230    /// Sender's node ID (included in envelope for protocol operation).
231    sender: I,
232    /// Target peer to send to.
233    target: I,
234    /// The Plumtree protocol message.
235    message: PlumtreeMessage,
236}
237
238impl<I: IdCodec> UnicastEnvelope<I> {
239    /// Encode this envelope for network transmission.
240    fn encode(&self) -> Bytes {
241        encode_plumtree_envelope(&self.sender, &self.message)
242    }
243}
244
245/// Combined Plumtree + Memberlist system.
246///
247/// Wraps a memberlist instance and provides efficient O(n) broadcast
248/// via the Plumtree protocol while using memberlist for membership.
249///
250/// # Recommended Usage (High-Performance Integration)
251///
252/// For production, it is recommended to combine Plumtree with a reliable unicast
253/// transport (like QUIC) and a pooling layer for backpressure control.
254///
255/// ```ignore
256/// use memberlist_plumtree::{
257///     PlumtreeMemberlist, PlumtreeConfig, NoopDelegate,
258///     QuicTransport, QuicConfig, PooledTransport, PoolConfig,
259///     MapPeerResolver, decode_plumtree_envelope
260/// };
261/// use std::sync::Arc;
262///
263/// // 1. Setup QUIC Transport with Connection Pooling
264/// // MapPeerResolver maps Node IDs to actual Socket Addresses
265/// let resolver = Arc::new(MapPeerResolver::new(local_addr));
266/// let quic_transport = QuicTransport::new(QuicConfig::default(), resolver, quinn_endpoint);
267///
268/// // 2. Wrap with PooledTransport for Concurrency & Queueing control
269/// let pool_config = PoolConfig::default()
270///     .with_max_concurrent_global(100)
271///     .with_max_queue_per_peer(512);
272/// let transport = PooledTransport::new(quic_transport, pool_config);
273///
274/// // 3. Initialize PlumtreeMemberlist
275/// let pm = Arc::new(PlumtreeMemberlist::new(local_id, PlumtreeConfig::lan(), NoopDelegate));
276///
277/// // 4. Start the runners
278/// // run_with_transport automatically drives the internal PlumtreeRunner (runner.rs)
279/// // and pumps Unicast messages (Graft/Prune) through your PooledTransport.
280/// let pm_run = pm.clone();
281/// let transport_clone = transport.clone();
282/// tokio::spawn(async move {
283///     pm_run.run_with_transport(transport_clone).await
284/// });
285///
286/// // Handle incoming message processor
287/// let pm_proc = pm.clone();
288/// tokio::spawn(async move { pm_proc.run_incoming_processor().await });
289///
290/// // 5. Bridge Broadcasts to Memberlist Gossip
291/// // While Unicast uses QUIC, common Gossip/IHave still use memberlist's UDP channel.
292/// let pm_out = pm.clone();
293/// let ml = memberlist_instance.clone();
294/// tokio::spawn(async move {
295///     let mut broadcast_rx = pm_out.outgoing_receiver_raw();
296///     while let Ok(envelope) = broadcast_rx.recv().await {
297///         ml.broadcast(envelope.encode()).await;
298///     }
299/// });
300///
301/// // 6. Inbound Injection (Inside your NodeDelegate or Transport Acceptor)
302/// // if let Some((sender_id, message)) = decode_plumtree_envelope::<u64>(&raw_data) {
303/// //     pm.incoming_sender().send((sender_id, message)).await.ok();
304/// // }
305/// ```
306pub struct PlumtreeMemberlist<I, PD>
307where
308    I: Id,
309{
310    /// Plumtree broadcast layer.
311    plumtree: Plumtree<I, PlumtreeEventHandler<I, PD>>,
312    /// Handle for message I/O.
313    handle: PlumtreeHandle<I>,
314    /// Channel for sending incoming Plumtree messages to be processed.
315    incoming_tx: async_channel::Sender<(I, PlumtreeMessage)>,
316    /// Channel for receiving incoming Plumtree messages from memberlist.
317    incoming_rx: async_channel::Receiver<(I, PlumtreeMessage)>,
318    /// Channel for outgoing broadcast messages (unencooded, serialization deferred).
319    outgoing_rx: async_channel::Receiver<BroadcastEnvelope<I>>,
320    /// Sender for outgoing broadcast messages.
321    outgoing_tx: async_channel::Sender<BroadcastEnvelope<I>>,
322    /// Channel for outgoing unicast messages (unencooded, serialization deferred).
323    /// These MUST be sent directly to the target peer, NOT broadcast!
324    unicast_rx: async_channel::Receiver<UnicastEnvelope<I>>,
325    /// Sender for outgoing unicast messages.
326    unicast_tx: async_channel::Sender<UnicastEnvelope<I>>,
327}
328
329impl<I, PD> PlumtreeMemberlist<I, PD>
330where
331    I: Id + IdCodec + Clone + Eq + Hash + Debug + Send + Sync + 'static,
332    PD: PlumtreeDelegate<I>,
333{
334    /// Create a new PlumtreeMemberlist instance.
335    ///
336    /// # Arguments
337    ///
338    /// * `local_id` - The local node's identifier
339    /// * `config` - Plumtree configuration (hash ring is automatically enabled)
340    /// * `delegate` - Application delegate for message delivery
341    ///
342    /// # Note
343    ///
344    /// Hash ring topology is automatically enabled for `PlumtreeMemberlist` to ensure
345    /// deterministic peer selection and Z≥2 redundancy guarantees.
346    pub fn new(local_id: I, config: PlumtreeConfig, delegate: PD) -> Self {
347        // Create channels for communication
348        let (incoming_tx, incoming_rx) = async_channel::bounded(1024);
349        let (outgoing_tx, outgoing_rx) = async_channel::bounded(1024);
350        let (unicast_tx, unicast_rx) = async_channel::bounded(1024);
351
352        // Create the event handler that wraps the user delegate
353        let event_handler = PlumtreeEventHandler::new(delegate);
354
355        // Force hash ring topology for PlumtreeMemberlist
356        let config = config.with_hash_ring(true);
357
358        // Create the Plumtree instance
359        let (plumtree, handle) = Plumtree::new(local_id, config, event_handler);
360
361        Self {
362            plumtree,
363            handle,
364            incoming_tx,
365            incoming_rx,
366            outgoing_rx,
367            outgoing_tx,
368            unicast_rx,
369            unicast_tx,
370        }
371    }
372
373    /// Get the sender for incoming Plumtree messages.
374    ///
375    /// Pass this to `PlumtreeNodeDelegate` to forward received messages.
376    pub fn incoming_sender(&self) -> async_channel::Sender<(I, PlumtreeMessage)> {
377        self.incoming_tx.clone()
378    }
379
380    /// Get the receiver for outgoing broadcast envelopes.
381    ///
382    /// These messages have no specific target and should be disseminated
383    /// via memberlist's gossip/broadcast mechanism.
384    ///
385    /// **Note**: Messages are unencooded. Call `.encode()` on the envelope
386    /// to get bytes for transmission.
387    ///
388    /// Pass this to `PlumtreeNodeDelegate` for memberlist broadcast.
389    #[allow(dead_code)]
390    pub(crate) fn outgoing_receiver_raw(&self) -> async_channel::Receiver<BroadcastEnvelope<I>> {
391        self.outgoing_rx.clone()
392    }
393
394    /// Get the receiver for outgoing unicast envelopes.
395    ///
396    /// **CRITICAL**: These messages MUST be sent directly to the specified
397    /// target peer using memberlist's direct send API (e.g., `send_reliable`,
398    /// `send_to`, or similar). DO NOT broadcast these messages!
399    ///
400    /// **Note**: Messages are unencooded. Call `.encode()` on the envelope
401    /// to get bytes, and use `.target` to get the destination peer.
402    ///
403    /// # Example
404    ///
405    /// ```ignore
406    /// let unicast_rx = pm.unicast_receiver_raw();
407    /// while let Ok(envelope) = unicast_rx.recv().await {
408    ///     let target = envelope.target.clone();
409    ///     let encoded = envelope.encode();
410    ///     // Use memberlist's direct send, NOT broadcast!
411    ///     memberlist.send_reliable(&target, encoded).await;
412    /// }
413    /// ```
414    #[allow(dead_code)]
415    pub(crate) fn unicast_receiver_raw(&self) -> async_channel::Receiver<UnicastEnvelope<I>> {
416        self.unicast_rx.clone()
417    }
418
419    /// Get the outgoing broadcast envelope sender.
420    #[allow(dead_code)]
421    pub(crate) fn outgoing_sender_raw(&self) -> async_channel::Sender<BroadcastEnvelope<I>> {
422        self.outgoing_tx.clone()
423    }
424
425    /// Get the outgoing unicast envelope sender.
426    #[allow(dead_code)]
427    pub(crate) fn unicast_sender_raw(&self) -> async_channel::Sender<UnicastEnvelope<I>> {
428        self.unicast_tx.clone()
429    }
430
431    /// Broadcast a message to all nodes in the cluster.
432    ///
433    /// Uses Plumtree's efficient O(n) spanning tree broadcast.
434    pub async fn broadcast(&self, payload: impl Into<Bytes>) -> Result<MessageId> {
435        self.plumtree.broadcast(payload).await
436    }
437
438    /// Handle an incoming Plumtree message from the network.
439    ///
440    /// This should be called when a Plumtree message is received via memberlist.
441    pub async fn handle_message(&self, from: I, message: PlumtreeMessage) -> Result<()> {
442        self.plumtree.handle_message(from, message).await
443    }
444
445    /// Get a reference to the underlying Plumtree instance.
446    pub fn plumtree(&self) -> &Plumtree<I, PlumtreeEventHandler<I, PD>> {
447        &self.plumtree
448    }
449
450    /// Get a reference to the Plumtree handle.
451    pub fn handle(&self) -> &PlumtreeHandle<I> {
452        &self.handle
453    }
454
455    /// Get the Plumtree configuration.
456    pub fn config(&self) -> &PlumtreeConfig {
457        self.plumtree.config()
458    }
459
460    /// Get peer statistics.
461    pub fn peer_stats(&self) -> crate::peer_state::PeerStats {
462        self.plumtree.peer_stats()
463    }
464
465    /// Get cache statistics.
466    pub fn cache_stats(&self) -> crate::message::CacheStats {
467        self.plumtree.cache_stats()
468    }
469
470    /// Add a peer to the Plumtree overlay with automatic classification.
471    ///
472    /// Called when a node joins the memberlist cluster.
473    /// The peer is automatically classified as eager or lazy based on
474    /// the current state and configuration.
475    ///
476    /// Returns the result of the add operation.
477    pub fn add_peer(&self, peer: I) -> crate::peer_state::AddPeerResult {
478        self.plumtree.add_peer(peer)
479    }
480
481    /// Add a peer to the lazy set only (traditional behavior).
482    ///
483    /// This bypasses the `max_peers` limit check and auto-classification.
484    pub fn add_peer_lazy(&self, peer: I) {
485        self.plumtree.add_peer_lazy(peer);
486    }
487
488    /// Remove a peer from the Plumtree overlay.
489    ///
490    /// Called when a node leaves the memberlist cluster.
491    pub fn remove_peer(&self, peer: &I) {
492        self.plumtree.remove_peer(peer);
493    }
494
495    /// Get a reference to the shared peer state.
496    ///
497    /// This returns the Plumtree's internal peer state, which tracks
498    /// eager and lazy peers for message routing.
499    pub fn peers(&self) -> &Arc<PeerState<I>> {
500        self.plumtree.peers()
501    }
502
503    /// Wrap a memberlist delegate with Plumtree integration.
504    ///
505    /// This creates a `PlumtreeNodeDelegate` that wraps your delegate and
506    /// automatically synchronizes peer state when nodes join or leave the cluster.
507    ///
508    /// The wrapped delegate should be passed to memberlist when creating it.
509    ///
510    /// # Arguments
511    ///
512    /// * `delegate` - Your memberlist delegate implementation
513    ///
514    /// # Example
515    ///
516    /// ```ignore
517    /// use memberlist_plumtree::{PlumtreeMemberlist, PlumtreeConfig};
518    ///
519    /// // Create PlumtreeMemberlist
520    /// let pm = PlumtreeMemberlist::new(node_id, PlumtreeConfig::default(), my_plumtree_delegate);
521    ///
522    /// // Wrap your memberlist delegate
523    /// let wrapped = pm.wrap_delegate(my_memberlist_delegate);
524    ///
525    /// // Create memberlist with the wrapped delegate
526    /// let memberlist = Memberlist::new(wrapped, memberlist_config).await?;
527    ///
528    /// // Run PlumtreeMemberlist with transport
529    /// pm.run_with_transport(transport).await;
530    /// ```
531    pub fn wrap_delegate<A, D>(&self, delegate: D) -> PlumtreeNodeDelegate<I, A, D>
532    where
533        A: Send + Sync + 'static,
534    {
535        let config = self.plumtree.config();
536        PlumtreeNodeDelegate::new(
537            delegate,
538            self.incoming_tx.clone(),
539            self.outgoing_rx.clone(),
540            self.plumtree.peers().clone(),
541            config.eager_fanout,
542            config.max_peers,
543        )
544    }
545
546    /// Run the Plumtree background tasks (manual unicast handling).
547    ///
548    /// This runs the IHave scheduler, Graft timer, and outgoing processor.
549    /// Unicast messages are sent to `unicast_receiver()` channel.
550    ///
551    /// **WARNING**: You MUST process messages from `unicast_receiver()` and send them
552    /// directly to the target peer. Failing to do so will break the protocol.
553    ///
554    /// For safer usage, prefer [`run_with_transport`](Self::run_with_transport) which
555    /// handles unicast delivery automatically.
556    pub async fn run(&self) {
557        futures::future::join4(
558            self.plumtree.run_ihave_scheduler(),
559            self.plumtree.run_graft_timer(),
560            self.plumtree.run_seen_cleanup(),
561            self.run_outgoing_processor(),
562        )
563        .await;
564    }
565
566    /// Run the Plumtree background tasks with automatic unicast handling.
567    ///
568    /// This is the **recommended** way to run PlumtreeMemberlist. It handles
569    /// unicast message delivery automatically using the provided transport,
570    /// preventing protocol failures from forgotten unicast handling.
571    ///
572    /// # Arguments
573    ///
574    /// * `transport` - Implementation of [`Transport`](crate::Transport) for unicast delivery
575    ///
576    /// # Example
577    ///
578    /// ```ignore
579    /// use memberlist_plumtree::{PlumtreeMemberlist, Transport};
580    ///
581    /// struct MyTransport { memberlist: Memberlist }
582    ///
583    /// impl Transport<NodeId> for MyTransport {
584    ///     type Error = MyError;
585    ///     async fn send_to(&self, target: &NodeId, data: Bytes) -> Result<(), Self::Error> {
586    ///         self.memberlist.send_reliable(target, data).await
587    ///     }
588    /// }
589    ///
590    /// // No need to handle unicast_receiver() - it's done automatically!
591    /// let transport = MyTransport { memberlist };
592    /// pm.run_with_transport(transport).await;
593    /// ```
594    pub async fn run_with_transport<T>(&self, transport: T)
595    where
596        T: crate::Transport<I>,
597    {
598        futures::future::join5(
599            self.plumtree.run_ihave_scheduler(),
600            self.plumtree.run_graft_timer(),
601            self.plumtree.run_seen_cleanup(),
602            self.run_outgoing_processor(),
603            self.run_unicast_sender(transport),
604        )
605        .await;
606    }
607
608    /// Internal task that sends unicast messages via the provided transport.
609    ///
610    /// Serialization is deferred to this point to minimize allocations.
611    async fn run_unicast_sender<T>(&self, transport: T)
612    where
613        T: crate::Transport<I>,
614    {
615        while let Ok(envelope) = self.unicast_rx.recv().await {
616            // Encode at the last moment before network transmission
617            let encoded = envelope.encode();
618            if let Err(e) = transport.send_to(&envelope.target, encoded).await {
619                tracing::warn!(
620                    "failed to send unicast message to {:?}: {}",
621                    envelope.target,
622                    e
623                );
624            }
625        }
626    }
627
628    /// Run the outgoing message processor.
629    ///
630    /// Routes outgoing messages to the appropriate channel:
631    /// - Unicast messages (with target) -> unicast_tx
632    /// - Broadcast messages (no target) -> outgoing_tx
633    ///
634    /// Messages are passed as unencooded envelopes. Serialization is deferred
635    /// until the message is actually sent over the network to minimize
636    /// unnecessary allocations.
637    async fn run_outgoing_processor(&self) {
638        let local_id = self.plumtree.local_id().clone();
639
640        while let Some(outgoing) = self.handle.next_outgoing().await {
641            if let Some(target) = outgoing.target {
642                // Unicast: create envelope with target (encoding deferred)
643                let envelope = UnicastEnvelope {
644                    sender: local_id.clone(),
645                    target,
646                    message: outgoing.message,
647                };
648                if self.unicast_tx.send(envelope).await.is_err() {
649                    // Channel closed
650                    break;
651                }
652            } else {
653                // Broadcast: create envelope (encoding deferred)
654                let envelope = BroadcastEnvelope {
655                    sender: local_id.clone(),
656                    message: outgoing.message,
657                };
658                if self.outgoing_tx.send(envelope).await.is_err() {
659                    // Channel closed
660                    break;
661                }
662            }
663        }
664    }
665
666    /// Process incoming messages from the incoming channel.
667    ///
668    /// Should be spawned as a background task if using channels.
669    pub async fn run_incoming_processor(&self) {
670        while let Ok((from, message)) = self.incoming_rx.recv().await {
671            if let Err(e) = self.plumtree.handle_message(from, message).await {
672                tracing::warn!("failed to handle plumtree message: {}", e);
673            }
674        }
675    }
676
677    /// Shutdown the Plumtree layer.
678    pub fn shutdown(&self) {
679        self.plumtree.shutdown();
680        self.outgoing_tx.close();
681        self.unicast_tx.close();
682    }
683
684    /// Check if shutdown has been requested.
685    pub fn is_shutdown(&self) -> bool {
686        self.plumtree.is_shutdown()
687    }
688}
689
690/// Delegate that intercepts messages for Plumtree protocol.
691///
692/// Wraps a user delegate and handles Plumtree message routing.
693/// Use this when creating a memberlist to integrate Plumtree.
694pub struct PlumtreeNodeDelegate<I, A, D> {
695    /// Inner user delegate.
696    inner: D,
697    /// Channel to send received Plumtree messages to PlumtreeMemberlist.
698    plumtree_tx: async_channel::Sender<(I, PlumtreeMessage)>,
699    /// Outgoing Plumtree messages to broadcast via memberlist (unencooded).
700    /// Serialization is deferred until messages are sent over the network.
701    outgoing_rx: async_channel::Receiver<BroadcastEnvelope<I>>,
702    /// Shared peer state for synchronizing membership changes.
703    /// When memberlist detects a node join/leave, this is updated to keep
704    /// Plumtree's peer state in sync.
705    peers: Arc<PeerState<I>>,
706    /// Target number of eager peers for auto-classification.
707    eager_fanout: usize,
708    /// Maximum total peers allowed (None = unlimited).
709    max_peers: Option<usize>,
710    /// Marker.
711    _marker: PhantomData<A>,
712}
713
714impl<I, A, D> PlumtreeNodeDelegate<I, A, D> {
715    /// Create a new Plumtree node delegate.
716    ///
717    /// This wraps a user's memberlist delegate to automatically synchronize
718    /// peer state when nodes join or leave the cluster.
719    ///
720    /// # Arguments
721    ///
722    /// * `inner` - The user's delegate implementation
723    /// * `plumtree_tx` - Sender for forwarding received Plumtree messages
724    /// * `outgoing_rx` - Receiver for outgoing Plumtree messages to broadcast
725    /// * `peers` - Shared peer state for synchronizing membership changes
726    /// * `eager_fanout` - Target number of eager peers
727    /// * `max_peers` - Maximum total peers allowed (None = unlimited)
728    ///
729    /// # Example
730    ///
731    /// ```ignore
732    /// // Create PlumtreeMemberlist first
733    /// let pm = PlumtreeMemberlist::new(node_id, config, delegate);
734    ///
735    /// // Wrap your memberlist delegate with PlumtreeNodeDelegate
736    /// let wrapped_delegate = pm.wrap_delegate(your_memberlist_delegate);
737    ///
738    /// // Use wrapped_delegate when creating memberlist
739    /// let memberlist = Memberlist::new(wrapped_delegate, memberlist_config).await?;
740    /// ```
741    pub fn new(
742        inner: D,
743        plumtree_tx: async_channel::Sender<(I, PlumtreeMessage)>,
744        outgoing_rx: async_channel::Receiver<BroadcastEnvelope<I>>,
745        peers: Arc<PeerState<I>>,
746        eager_fanout: usize,
747        max_peers: Option<usize>,
748    ) -> Self {
749        Self {
750            inner,
751            plumtree_tx,
752            outgoing_rx,
753            peers,
754            eager_fanout,
755            max_peers,
756            _marker: PhantomData,
757        }
758    }
759
760    /// Get a reference to the inner delegate.
761    pub fn inner(&self) -> &D {
762        &self.inner
763    }
764
765    /// Get a clone of the plumtree message sender.
766    ///
767    /// This can be used to forward Plumtree messages from external sources.
768    pub fn plumtree_sender(&self) -> async_channel::Sender<(I, PlumtreeMessage)> {
769        self.plumtree_tx.clone()
770    }
771
772    /// Get a clone of the outgoing envelope receiver.
773    ///
774    /// This can be used to receive outgoing Plumtree messages from external sources.
775    /// Messages are unencooded - call `.encode()` on the envelope to get bytes.
776    #[allow(dead_code)]
777    pub(crate) fn outgoing_receiver(&self) -> async_channel::Receiver<BroadcastEnvelope<I>> {
778        self.outgoing_rx.clone()
779    }
780}
781
782impl<I, A, D> NodeDelegate for PlumtreeNodeDelegate<I, A, D>
783where
784    I: Id + IdCodec + Clone + Send + Sync + 'static,
785    A: CheapClone + Send + Sync + 'static,
786    D: NodeDelegate,
787{
788    async fn node_meta(&self, limit: usize) -> Meta {
789        self.inner.node_meta(limit).await
790    }
791
792    async fn notify_message(&self, msg: Cow<'_, [u8]>) {
793        // Check if this is a Plumtree message
794        if msg.len() > 1 && msg[0] == PLUMTREE_MAGIC {
795            // Decode Plumtree envelope (sender_id + message)
796            if let Some((sender, plumtree_msg)) = decode_plumtree_envelope::<I>(&msg) {
797                tracing::trace!(
798                    "received plumtree {:?} from {:?}",
799                    plumtree_msg.tag(),
800                    sender
801                );
802                // Forward to Plumtree for proper handling with sender identity
803                if self.plumtree_tx.send((sender, plumtree_msg)).await.is_err() {
804                    tracing::warn!("plumtree channel closed, cannot forward message");
805                }
806            } else {
807                tracing::warn!("failed to decode plumtree envelope");
808            }
809        } else {
810            // Forward non-Plumtree messages to inner delegate
811            self.inner.notify_message(msg).await;
812        }
813    }
814
815    async fn broadcast_messages<F>(
816        &self,
817        limit: usize,
818        encoded_len: F,
819    ) -> impl Iterator<Item = Bytes> + Send
820    where
821        F: Fn(Bytes) -> (usize, Bytes) + Send + Sync + 'static,
822    {
823        // Use proportional limiting to prevent Plumtree from starving user messages.
824        // Reserve at least 30% of bandwidth for user messages to ensure they always
825        // have a chance to be sent, even under heavy Plumtree control traffic.
826        const PLUMTREE_MAX_RATIO: usize = 70;
827        let plumtree_limit = (limit * PLUMTREE_MAX_RATIO) / 100;
828        let user_reserved = limit - plumtree_limit;
829
830        // Collect Plumtree outgoing messages up to proportional limit
831        // Serialization is deferred to here - encode just before network transmission
832        let mut plumtree_msgs = Vec::new();
833        let mut plumtree_used = 0;
834
835        while let Ok(envelope) = self.outgoing_rx.try_recv() {
836            // Encode at the last moment before network transmission
837            let encoded = envelope.encode();
838            let (len, msg) = encoded_len(encoded);
839            if plumtree_used + len <= plumtree_limit {
840                plumtree_used += len;
841                plumtree_msgs.push(msg);
842            } else {
843                // Plumtree hit its proportional limit, stop collecting
844                break;
845            }
846        }
847
848        // User gets reserved space plus any unused Plumtree quota
849        let user_limit = user_reserved + (plumtree_limit - plumtree_used);
850        let user_msgs = self.inner.broadcast_messages(user_limit, encoded_len).await;
851
852        // Combine both iterators
853        plumtree_msgs.into_iter().chain(user_msgs)
854    }
855
856    async fn local_state(&self, join: bool) -> Bytes {
857        self.inner.local_state(join).await
858    }
859
860    async fn merge_remote_state(&self, buf: &[u8], join: bool) {
861        self.inner.merge_remote_state(buf, join).await
862    }
863}
864
865impl<I, A, D> PingDelegate for PlumtreeNodeDelegate<I, A, D>
866where
867    I: Id + IdCodec + Clone + Send + Sync + 'static,
868    A: CheapClone + Send + Sync + 'static,
869    D: PingDelegate<Id = I, Address = A>,
870{
871    type Id = I;
872    type Address = A;
873
874    async fn ack_payload(&self) -> Bytes {
875        self.inner.ack_payload().await
876    }
877
878    async fn notify_ping_complete(
879        &self,
880        node: Arc<NodeState<Self::Id, Self::Address>>,
881        rtt: std::time::Duration,
882        payload: Bytes,
883    ) {
884        self.inner.notify_ping_complete(node, rtt, payload).await
885    }
886
887    fn disable_reliable_pings(&self, target: &Self::Id) -> bool {
888        self.inner.disable_reliable_pings(target)
889    }
890}
891
892impl<I, A, D> EventDelegate for PlumtreeNodeDelegate<I, A, D>
893where
894    I: Id + IdCodec + Clone + Ord + Send + Sync + 'static,
895    A: CheapClone + Send + Sync + 'static,
896    D: EventDelegate<Id = I, Address = A>,
897{
898    type Id = I;
899    type Address = A;
900
901    async fn notify_join(&self, node: Arc<NodeState<Self::Id, Self::Address>>) {
902        // Sync: Add to Plumtree peers with hash ring-based auto-classification
903        // This ensures new peers are placed correctly in the topology
904        self.peers
905            .add_peer_auto(node.id().clone(), self.max_peers, self.eager_fanout);
906
907        self.inner.notify_join(node).await;
908    }
909
910    async fn notify_leave(&self, node: Arc<NodeState<Self::Id, Self::Address>>) {
911        // Sync: Remove from Plumtree peers immediately to stop sending to dead node
912        let result = self.peers.remove_peer(node.id());
913
914        // If an eager peer was removed, rebalance to promote a lazy peer
915        // This maintains tree connectivity when eager peers leave
916        if result.was_eager() {
917            self.peers.rebalance(self.eager_fanout);
918        }
919
920        self.inner.notify_leave(node).await;
921    }
922
923    async fn notify_update(&self, node: Arc<NodeState<Self::Id, Self::Address>>) {
924        self.inner.notify_update(node).await;
925    }
926}
927
928impl<I, A, D> ConflictDelegate for PlumtreeNodeDelegate<I, A, D>
929where
930    I: Id + IdCodec + Clone + Send + Sync + 'static,
931    A: CheapClone + Send + Sync + 'static,
932    D: ConflictDelegate<Id = I, Address = A>,
933{
934    type Id = I;
935    type Address = A;
936
937    async fn notify_conflict(
938        &self,
939        existing: Arc<NodeState<Self::Id, Self::Address>>,
940        other: Arc<NodeState<Self::Id, Self::Address>>,
941    ) {
942        self.inner.notify_conflict(existing, other).await;
943    }
944}
945
946impl<I, A, D> AliveDelegate for PlumtreeNodeDelegate<I, A, D>
947where
948    I: Id + IdCodec + Clone + Send + Sync + 'static,
949    A: CheapClone + Send + Sync + 'static,
950    D: AliveDelegate<Id = I, Address = A>,
951{
952    type Error = D::Error;
953    type Id = I;
954    type Address = A;
955
956    async fn notify_alive(
957        &self,
958        peer: Arc<NodeState<Self::Id, Self::Address>>,
959    ) -> std::result::Result<(), Self::Error> {
960        self.inner.notify_alive(peer).await
961    }
962}
963
964impl<I, A, D> MergeDelegate for PlumtreeNodeDelegate<I, A, D>
965where
966    I: Id + IdCodec + Clone + Send + Sync + 'static,
967    A: CheapClone + Send + Sync + 'static,
968    D: MergeDelegate<Id = I, Address = A>,
969{
970    type Error = D::Error;
971    type Id = I;
972    type Address = A;
973
974    async fn notify_merge(
975        &self,
976        peers: Arc<[NodeState<Self::Id, Self::Address>]>,
977    ) -> std::result::Result<(), Self::Error> {
978        self.inner.notify_merge(peers).await
979    }
980}
981
982impl<I, A, D> Delegate for PlumtreeNodeDelegate<I, A, D>
983where
984    I: Id + IdCodec + Clone + Send + Sync + 'static,
985    A: CheapClone + Send + Sync + 'static,
986    D: Delegate<Id = I, Address = A>,
987{
988    type Id = I;
989    type Address = A;
990}
991
992/// Event handler that synchronizes memberlist events to Plumtree.
993///
994/// Wraps a user's PlumtreeDelegate to forward events.
995pub struct PlumtreeEventHandler<I, PD> {
996    /// Inner Plumtree delegate for application events.
997    inner: PD,
998    /// Marker for I type parameter.
999    _marker: std::marker::PhantomData<I>,
1000}
1001
1002impl<I, PD> PlumtreeEventHandler<I, PD> {
1003    /// Create a new event handler.
1004    pub fn new(inner: PD) -> Self {
1005        Self {
1006            inner,
1007            _marker: std::marker::PhantomData,
1008        }
1009    }
1010}
1011
1012impl<I, PD> PlumtreeDelegate<I> for PlumtreeEventHandler<I, PD>
1013where
1014    I: Clone + Eq + Hash + Send + Sync + 'static,
1015    PD: PlumtreeDelegate<I>,
1016{
1017    fn on_deliver(&self, message_id: MessageId, payload: Bytes) {
1018        self.inner.on_deliver(message_id, payload);
1019    }
1020
1021    fn on_eager_promotion(&self, peer: &I) {
1022        self.inner.on_eager_promotion(peer);
1023    }
1024
1025    fn on_lazy_demotion(&self, peer: &I) {
1026        self.inner.on_lazy_demotion(peer);
1027    }
1028
1029    fn on_graft_sent(&self, peer: &I, message_id: &MessageId) {
1030        self.inner.on_graft_sent(peer, message_id);
1031    }
1032
1033    fn on_prune_sent(&self, peer: &I) {
1034        self.inner.on_prune_sent(peer);
1035    }
1036
1037    fn on_graft_failed(&self, message_id: &MessageId, peer: &I) {
1038        self.inner.on_graft_failed(message_id, peer);
1039    }
1040}
1041
1042/// Encode a Plumtree message for transmission via memberlist.
1043///
1044/// Format: `[MAGIC][message]`
1045///
1046/// **Note**: This function does NOT include sender identity. For proper protocol
1047/// operation, use [`encode_plumtree_envelope`] which includes the sender ID.
1048pub fn encode_plumtree_message(msg: &PlumtreeMessage) -> Bytes {
1049    let encoded = msg.encode_to_bytes();
1050    let mut buf = BytesMut::with_capacity(1 + encoded.len());
1051    buf.put_u8(PLUMTREE_MAGIC);
1052    buf.extend_from_slice(&encoded);
1053    buf.freeze()
1054}
1055
1056/// Encode a Plumtree message with sender identity for transmission.
1057///
1058/// Format: `[MAGIC][sender_id][message]`
1059///
1060/// This is the **recommended** encoding function as it includes the sender's
1061/// node ID, which is critical for proper Plumtree protocol operation.
1062///
1063/// # Zero-Copy Optimization
1064///
1065/// This function encodes directly into a single pre-sized buffer, avoiding
1066/// intermediate allocations. The buffer size is calculated upfront using
1067/// [`envelope_encoded_len`] to ensure a single allocation.
1068pub fn encode_plumtree_envelope<I: IdCodec>(sender: &I, msg: &PlumtreeMessage) -> Bytes {
1069    let id_len = sender.encoded_id_len();
1070    let msg_len = msg.encoded_len();
1071    let mut buf = BytesMut::with_capacity(1 + id_len + msg_len);
1072    buf.put_u8(PLUMTREE_MAGIC);
1073    sender.encode_id(&mut buf);
1074    // Encode message directly into buffer (zero-copy, no intermediate allocation)
1075    msg.encode(&mut buf);
1076    buf.freeze()
1077}
1078
1079/// Encode a Plumtree envelope directly into an existing buffer.
1080///
1081/// This is useful for buffer pooling scenarios where you want to reuse
1082/// buffers to avoid allocation overhead.
1083///
1084/// Format: `[MAGIC][sender_id][message]`
1085///
1086/// # Arguments
1087///
1088/// * `sender` - The sender's node ID
1089/// * `msg` - The Plumtree message to encode
1090/// * `buf` - The buffer to encode into (must have sufficient capacity)
1091///
1092/// # Returns
1093///
1094/// The number of bytes written to the buffer.
1095pub fn encode_plumtree_envelope_into<I: IdCodec>(
1096    sender: &I,
1097    msg: &PlumtreeMessage,
1098    buf: &mut impl BufMut,
1099) -> usize {
1100    let start_len = 1 + sender.encoded_id_len() + msg.encoded_len();
1101    buf.put_u8(PLUMTREE_MAGIC);
1102    sender.encode_id(buf);
1103    msg.encode(buf);
1104    start_len
1105}
1106
1107/// Calculate the encoded length of a Plumtree envelope.
1108///
1109/// Use this to pre-allocate buffers of the correct size.
1110///
1111/// # Arguments
1112///
1113/// * `sender` - The sender's node ID
1114/// * `msg` - The Plumtree message
1115///
1116/// # Returns
1117///
1118/// The total number of bytes the encoded envelope will occupy.
1119pub fn envelope_encoded_len<I: IdCodec>(sender: &I, msg: &PlumtreeMessage) -> usize {
1120    1 + sender.encoded_id_len() + msg.encoded_len()
1121}
1122
1123/// Decode a Plumtree envelope extracting sender ID and message.
1124///
1125/// Format: `[MAGIC][sender_id][message]`
1126///
1127/// Returns `Some((sender, message))` on success, `None` on decode failure.
1128pub fn decode_plumtree_envelope<I: IdCodec>(data: &[u8]) -> Option<(I, PlumtreeMessage)> {
1129    if data.is_empty() || data[0] != PLUMTREE_MAGIC {
1130        return None;
1131    }
1132
1133    let mut cursor = std::io::Cursor::new(&data[1..]);
1134
1135    // Decode sender ID
1136    let sender = I::decode_id(&mut cursor)?;
1137
1138    // Decode message from remaining bytes
1139    let msg = PlumtreeMessage::decode(&mut cursor)?;
1140
1141    Some((sender, msg))
1142}
1143
1144/// Try to decode a Plumtree message from received bytes (without sender).
1145///
1146/// Use this for simple testing or when sender is tracked separately.
1147/// For production use, prefer [`decode_plumtree_envelope`] which extracts
1148/// the sender identity needed for proper protocol operation.
1149pub fn decode_plumtree_message(data: &[u8]) -> Option<PlumtreeMessage> {
1150    if data.len() > 1 && data[0] == PLUMTREE_MAGIC {
1151        PlumtreeMessage::decode_from_slice(&data[1..])
1152    } else {
1153        None
1154    }
1155}
1156
1157/// Check if data is a Plumtree message.
1158pub fn is_plumtree_message(data: &[u8]) -> bool {
1159    !data.is_empty() && data[0] == PLUMTREE_MAGIC
1160}
1161
1162#[cfg(test)]
1163mod tests {
1164    use super::*;
1165    use crate::NoopDelegate;
1166
1167    #[test]
1168    fn test_encode_decode() {
1169        let msg = PlumtreeMessage::Gossip {
1170            id: MessageId::new(),
1171            round: 5,
1172            payload: Bytes::from_static(b"hello"),
1173        };
1174
1175        let encoded = encode_plumtree_message(&msg);
1176        assert!(is_plumtree_message(&encoded));
1177
1178        let decoded = decode_plumtree_message(&encoded).unwrap();
1179        assert_eq!(msg, decoded);
1180    }
1181
1182    #[test]
1183    fn test_non_plumtree_message() {
1184        let data = b"regular user message";
1185        assert!(!is_plumtree_message(data));
1186        assert!(decode_plumtree_message(data).is_none());
1187    }
1188
1189    #[test]
1190    fn test_envelope_encode_decode() {
1191        let sender: u64 = 42;
1192        let msg = PlumtreeMessage::Gossip {
1193            id: MessageId::new(),
1194            round: 5,
1195            payload: Bytes::from_static(b"hello"),
1196        };
1197
1198        // Encode with sender ID
1199        let encoded = encode_plumtree_envelope(&sender, &msg);
1200        assert!(is_plumtree_message(&encoded));
1201
1202        // Decode and verify sender + message
1203        let (decoded_sender, decoded_msg): (u64, PlumtreeMessage) =
1204            decode_plumtree_envelope(&encoded).unwrap();
1205        assert_eq!(decoded_sender, sender);
1206        assert_eq!(decoded_msg, msg);
1207    }
1208
1209    #[test]
1210    fn test_envelope_with_string_id() {
1211        let sender = "node-1".to_string();
1212        let msg = PlumtreeMessage::IHave {
1213            message_ids: smallvec::smallvec![MessageId::new()],
1214            round: 3,
1215        };
1216
1217        let encoded = encode_plumtree_envelope(&sender, &msg);
1218        let (decoded_sender, decoded_msg): (String, PlumtreeMessage) =
1219            decode_plumtree_envelope(&encoded).unwrap();
1220
1221        assert_eq!(decoded_sender, sender);
1222        assert_eq!(decoded_msg, msg);
1223    }
1224
1225    #[test]
1226    fn test_envelope_graft_message() {
1227        let sender: u64 = 123;
1228        let msg = PlumtreeMessage::Graft {
1229            message_id: MessageId::new(),
1230            round: 7,
1231        };
1232
1233        let encoded = encode_plumtree_envelope(&sender, &msg);
1234        let (decoded_sender, decoded_msg): (u64, PlumtreeMessage) =
1235            decode_plumtree_envelope(&encoded).unwrap();
1236
1237        assert_eq!(decoded_sender, sender);
1238        assert_eq!(decoded_msg, msg);
1239    }
1240
1241    #[test]
1242    fn test_envelope_prune_message() {
1243        let sender: u64 = 999;
1244        let msg = PlumtreeMessage::Prune;
1245
1246        let encoded = encode_plumtree_envelope(&sender, &msg);
1247        let (decoded_sender, decoded_msg): (u64, PlumtreeMessage) =
1248            decode_plumtree_envelope(&encoded).unwrap();
1249
1250        assert_eq!(decoded_sender, sender);
1251        assert_eq!(decoded_msg, msg);
1252    }
1253
1254    #[test]
1255    fn test_plumtree_memberlist_creation() {
1256        let pm: PlumtreeMemberlist<u64, NoopDelegate> =
1257            PlumtreeMemberlist::new(1u64, PlumtreeConfig::default(), NoopDelegate);
1258
1259        assert_eq!(*pm.plumtree().local_id(), 1u64);
1260        assert!(!pm.is_shutdown());
1261    }
1262
1263    #[tokio::test]
1264    async fn test_plumtree_memberlist_broadcast() {
1265        let pm: PlumtreeMemberlist<u64, NoopDelegate> =
1266            PlumtreeMemberlist::new(1u64, PlumtreeConfig::default(), NoopDelegate);
1267
1268        // Add a peer
1269        pm.add_peer(2u64);
1270
1271        // Broadcast should succeed
1272        let msg_id = pm.broadcast(Bytes::from("test")).await.unwrap();
1273        assert!(msg_id.timestamp() > 0);
1274
1275        // Check stats
1276        let cache_stats = pm.cache_stats();
1277        assert_eq!(cache_stats.entries, 1);
1278    }
1279
1280    #[test]
1281    fn test_plumtree_memberlist_peer_management() {
1282        let pm: PlumtreeMemberlist<u64, NoopDelegate> =
1283            PlumtreeMemberlist::new(1u64, PlumtreeConfig::default(), NoopDelegate);
1284
1285        assert_eq!(pm.peer_stats().total(), 0);
1286
1287        pm.add_peer(2u64);
1288        pm.add_peer(3u64);
1289        assert_eq!(pm.peer_stats().total(), 2);
1290
1291        pm.remove_peer(&2u64);
1292        assert_eq!(pm.peer_stats().total(), 1);
1293    }
1294
1295    #[test]
1296    fn test_envelope_encoded_len() {
1297        let sender: u64 = 42;
1298        let msg = PlumtreeMessage::Gossip {
1299            id: MessageId::new(),
1300            round: 5,
1301            payload: Bytes::from_static(b"hello world"),
1302        };
1303
1304        // Verify encoded_len matches actual encoded length
1305        let calculated_len = envelope_encoded_len(&sender, &msg);
1306        let encoded = encode_plumtree_envelope(&sender, &msg);
1307        assert_eq!(calculated_len, encoded.len());
1308    }
1309
1310    #[test]
1311    fn test_encode_envelope_into_buffer() {
1312        let sender: u64 = 123;
1313        let msg = PlumtreeMessage::IHave {
1314            message_ids: smallvec::smallvec![MessageId::new(), MessageId::new()],
1315            round: 10,
1316        };
1317
1318        // Encode into a pre-allocated buffer
1319        let expected_len = envelope_encoded_len(&sender, &msg);
1320        let mut buf = BytesMut::with_capacity(expected_len);
1321        let written = encode_plumtree_envelope_into(&sender, &msg, &mut buf);
1322
1323        assert_eq!(written, expected_len);
1324        assert_eq!(buf.len(), expected_len);
1325
1326        // Verify it decodes correctly
1327        let (decoded_sender, decoded_msg): (u64, PlumtreeMessage) =
1328            decode_plumtree_envelope(&buf.freeze()).unwrap();
1329        assert_eq!(decoded_sender, sender);
1330        assert_eq!(decoded_msg, msg);
1331    }
1332
1333    #[test]
1334    fn test_zero_copy_encoding_produces_same_result() {
1335        // Verify the optimized encoding produces identical results
1336        let sender: u64 = 999;
1337        let msg = PlumtreeMessage::Gossip {
1338            id: MessageId::new(),
1339            round: 42,
1340            payload: Bytes::from_static(b"test payload"),
1341        };
1342
1343        // Both methods should produce identical output
1344        let encoded1 = encode_plumtree_envelope(&sender, &msg);
1345
1346        let mut buf = BytesMut::with_capacity(envelope_encoded_len(&sender, &msg));
1347        encode_plumtree_envelope_into(&sender, &msg, &mut buf);
1348        let encoded2 = buf.freeze();
1349
1350        assert_eq!(encoded1, encoded2);
1351    }
1352
1353    #[test]
1354    fn test_envelope_encoded_len_all_message_types() {
1355        let sender: u64 = 1;
1356
1357        // Gossip
1358        let gossip = PlumtreeMessage::Gossip {
1359            id: MessageId::new(),
1360            round: 0,
1361            payload: Bytes::from_static(b"test"),
1362        };
1363        assert_eq!(
1364            envelope_encoded_len(&sender, &gossip),
1365            encode_plumtree_envelope(&sender, &gossip).len()
1366        );
1367
1368        // IHave
1369        let ihave = PlumtreeMessage::IHave {
1370            message_ids: smallvec::smallvec![MessageId::new()],
1371            round: 0,
1372        };
1373        assert_eq!(
1374            envelope_encoded_len(&sender, &ihave),
1375            encode_plumtree_envelope(&sender, &ihave).len()
1376        );
1377
1378        // Graft
1379        let graft = PlumtreeMessage::Graft {
1380            message_id: MessageId::new(),
1381            round: 0,
1382        };
1383        assert_eq!(
1384            envelope_encoded_len(&sender, &graft),
1385            encode_plumtree_envelope(&sender, &graft).len()
1386        );
1387
1388        // Prune
1389        let prune = PlumtreeMessage::Prune;
1390        assert_eq!(
1391            envelope_encoded_len(&sender, &prune),
1392            encode_plumtree_envelope(&sender, &prune).len()
1393        );
1394    }
1395}