libp2p_pubsub_core/
behaviour.rs

1use std::collections::{BTreeSet, VecDeque};
2use std::rc::Rc;
3use std::task::{Context, Poll};
4use std::time::Duration;
5
6use bytes::Bytes;
7use libp2p::core::Endpoint;
8use libp2p::identity::PeerId;
9use libp2p::swarm::behaviour::ConnectionEstablished;
10use libp2p::swarm::{
11    AddressChange, ConnectionClosed, ConnectionDenied, ConnectionHandler, ConnectionId,
12    DialFailure, FromSwarm, ListenFailure, NetworkBehaviour, NotifyHandler, PollParameters,
13    THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
14};
15use libp2p::Multiaddr;
16
17use libp2p_pubsub_common::service::{BufferedContext, ServiceContext};
18
19use crate::config::Config;
20use crate::conn_handler::{Command as HandlerCommand, Event as HandlerEvent, Handler};
21use crate::event::Event;
22use crate::framing::{Message as FrameMessage, SubscriptionAction};
23use crate::message::Message;
24use crate::protocol::{
25    Protocol, ProtocolRouterConnectionEvent, ProtocolRouterInEvent, ProtocolRouterOutEvent,
26    ProtocolRouterSubscriptionEvent,
27};
28use crate::services::connections::{
29    ConnectionsInEvent, ConnectionsOutEvent, ConnectionsService, ConnectionsSwarmEvent,
30};
31use crate::services::framing::{
32    FramingDownstreamInEvent, FramingDownstreamOutEvent, FramingInEvent, FramingOutEvent,
33    FramingServiceContext, FramingUpstreamInEvent, FramingUpstreamOutEvent,
34};
35use crate::services::message_cache::{
36    MessageCacheInEvent, MessageCacheService, MessageCacheSubscriptionEvent,
37};
38use crate::services::subscriptions::{
39    SubscriptionsInEvent, SubscriptionsOutEvent, SubscriptionsPeerConnectionEvent,
40    SubscriptionsService,
41};
42use crate::subscription::Subscription;
43use crate::topic::{Hasher, Topic, TopicHash};
44
45pub struct Behaviour<P: Protocol> {
46    /// The behaviour's configuration.
47    config: Config,
48
49    /// Peer connections tracking and management service.
50    connections_service: BufferedContext<ConnectionsService>,
51
52    /// Peer subscriptions tracking and management service.
53    subscriptions_service: BufferedContext<SubscriptionsService>,
54
55    /// Message cache and deduplication service.
56    message_cache_service: BufferedContext<MessageCacheService>,
57
58    /// The pubsub protocol router service.
59    protocol_router_service: BufferedContext<P::RouterService>,
60
61    /// The frame encoder and decoder service.
62    framing_service: FramingServiceContext,
63
64    /// Connection handler's mailbox.
65    ///
66    /// It should only contain [`ToSwarm::NotifyHandler`] events to send to the connection handler.
67    conn_handler_mailbox: VecDeque<ToSwarm<Event, HandlerCommand>>,
68
69    /// Behaviour output events mailbox.
70    ///
71    /// It should only contain [`ToSwarm::GenerateEvent`] events to send out of the behaviour, to
72    /// the application.
73    behaviour_output_mailbox: VecDeque<ToSwarm<Event, HandlerCommand>>,
74}
75
76/// Public API.
77impl<P: Protocol> Behaviour<P> {
78    /// Creates a new `Behaviour` from the given configuration and protocol.
79    pub fn new(config: Config, protocol: P) -> Self {
80        let message_cache_service = BufferedContext::new(MessageCacheService::new(
81            config.message_cache_capacity(),
82            config.message_cache_ttl(),
83            config.heartbeat_interval(),
84            Duration::from_secs(0),
85        ));
86        let protocol_router_service = BufferedContext::new(protocol.router());
87
88        Self {
89            config,
90            connections_service: Default::default(),
91            subscriptions_service: Default::default(),
92            message_cache_service,
93            protocol_router_service,
94            framing_service: Default::default(),
95            conn_handler_mailbox: Default::default(),
96            behaviour_output_mailbox: Default::default(),
97        }
98    }
99
100    /// Get a reference to the connections service.
101    pub fn connections(&self) -> &ConnectionsService {
102        &self.connections_service
103    }
104
105    /// Get local node topic subscriptions.
106    pub fn subscriptions(&self) -> &BTreeSet<TopicHash> {
107        self.subscriptions_service.subscriptions()
108    }
109
110    /// Get peer topic subscriptions.
111    pub fn peer_subscriptions(&self, peer_id: &PeerId) -> Option<&BTreeSet<TopicHash>> {
112        self.subscriptions_service.peer_subscriptions(peer_id)
113    }
114
115    /// Subscribe to topic.
116    ///
117    /// Returns `Ok(true)` if the subscription was successful, `Ok(false)` if we were already
118    /// subscribed to the topic.
119    pub fn subscribe(&mut self, sub: impl Into<Subscription>) -> anyhow::Result<bool> {
120        let sub = sub.into();
121
122        tracing::debug!(?sub, "Subscribing to topic");
123
124        if self.subscriptions_service.is_subscribed(&sub.topic) {
125            return Ok(false);
126        }
127
128        // Notify the subscriptions service of the subscription request.
129        self.subscriptions_service
130            .do_send(SubscriptionsInEvent::SubscriptionRequest(sub));
131
132        Ok(true)
133    }
134
135    /// Unsubscribe from topic.
136    ///
137    /// Returns `Ok(true)` if the unsubscription was successful, `Ok(false)` if we were not
138    /// subscribed to the topic.
139    pub fn unsubscribe<H: Hasher>(&mut self, topic: &Topic<H>) -> anyhow::Result<bool> {
140        tracing::debug!(sub = %topic, "Unsubscribing from topic");
141
142        let topic = topic.hash();
143
144        if !self.subscriptions_service.is_subscribed(&topic) {
145            return Ok(false);
146        }
147
148        // Notify the subscriptions service of the unsubscription request.
149        self.subscriptions_service
150            .do_send(SubscriptionsInEvent::UnsubscriptionRequest(topic));
151
152        Ok(true)
153    }
154
155    /// Publish a message to the network.
156    pub fn publish(&mut self, message: Message) -> anyhow::Result<()> {
157        let topic = message.topic.clone();
158
159        tracing::debug!(%topic, "Publishing message");
160
161        // Check if we are subscribed to the topic.
162        if !self.subscriptions_service.is_subscribed(&topic) {
163            return Err(anyhow::anyhow!("Not subscribed to topic"));
164        }
165
166        // Check if we have connections to publish the message.
167        if self.connections_service.active_peers_count() == 0 {
168            return Err(anyhow::anyhow!("No active connections"));
169        }
170
171        let message = FrameMessage::from(message);
172
173        // Check if message was already published.
174        if self.message_cache_service.contains(&message) {
175            return Err(anyhow::anyhow!("Message already published"));
176        }
177
178        let message = Rc::new(message);
179
180        // Notify the message cache service to add the message to the message cache.
181        self.message_cache_service
182            .do_send(MessageCacheInEvent::MessagePublished(message.clone()));
183
184        // Emit a message published event to the protocol service.
185        self.protocol_router_service
186            .do_send(ProtocolRouterInEvent::MessagePublished(message));
187
188        Ok(())
189    }
190}
191
192/// Internal API.
193impl<P: Protocol> Behaviour<P> {
194    /// Send a pubsub frame to a `dst` peer.
195    ///
196    /// This method checks if the frame size is within the allowed limits and queues a connection
197    /// handler event to send the frame to the peer.
198    fn send_frame(&mut self, dest: PeerId, frame: Bytes) {
199        tracing::trace!(%dest, "Sending frame");
200
201        // Check if the frame size exceeds the maximum allowed size. If so, drop the frame.
202        if frame.len() > self.config.max_frame_size() {
203            tracing::warn!(%dest, "Frame size exceeds maximum allowed size");
204            return;
205        }
206
207        self.conn_handler_mailbox.push_back(ToSwarm::NotifyHandler {
208            peer_id: dest,
209            handler: NotifyHandler::Any,
210            event: HandlerCommand::SendFrame(frame),
211        });
212    }
213}
214
215impl<P> NetworkBehaviour for Behaviour<P>
216where
217    P: Protocol + 'static,
218{
219    type ConnectionHandler = Handler<P::Upgrade>;
220    type ToSwarm = Event;
221
222    fn handle_established_inbound_connection(
223        &mut self,
224        connection_id: ConnectionId,
225        peer_id: PeerId,
226        local_addr: &Multiaddr,
227        remote_addr: &Multiaddr,
228    ) -> Result<THandler<Self>, ConnectionDenied> {
229        // Emit an event to the connections service.
230        self.connections_service
231            .do_send(ConnectionsInEvent::EstablishedInboundConnection {
232                connection_id,
233                peer_id,
234                local_addr: local_addr.clone(),
235                remote_addr: remote_addr.clone(),
236            });
237
238        Ok(Handler::new(
239            P::upgrade(),
240            self.config.max_frame_size(),
241            self.config.connection_idle_timeout(),
242        ))
243    }
244
245    fn handle_established_outbound_connection(
246        &mut self,
247        connection_id: ConnectionId,
248        peer_id: PeerId,
249        remote_addr: &Multiaddr,
250        _role_override: Endpoint,
251    ) -> Result<THandler<Self>, ConnectionDenied> {
252        // Emit an event to the connections service.
253        self.connections_service
254            .do_send(ConnectionsInEvent::EstablishedOutboundConnection {
255                connection_id,
256                peer_id,
257                remote_addr: remote_addr.clone(),
258            });
259
260        Ok(Handler::new(
261            P::upgrade(),
262            self.config.max_frame_size(),
263            self.config.connection_idle_timeout(),
264        ))
265    }
266
267    fn on_swarm_event(&mut self, event: FromSwarm<Self::ConnectionHandler>) {
268        match event {
269            FromSwarm::ConnectionEstablished(ev) => {
270                self.connections_service
271                    .do_send(ConnectionsInEvent::from_swarm_event(ev));
272            }
273            FromSwarm::ConnectionClosed(ev) => {
274                self.connections_service
275                    .do_send(ConnectionsInEvent::from_swarm_event(ev));
276            }
277            FromSwarm::AddressChange(ev) => {
278                self.connections_service
279                    .do_send(ConnectionsInEvent::from_swarm_event(ev));
280            }
281            FromSwarm::DialFailure(ev) => {
282                self.connections_service
283                    .do_send(ConnectionsInEvent::from_swarm_event(ev));
284            }
285            FromSwarm::ListenFailure(ev) => {
286                self.connections_service
287                    .do_send(ConnectionsInEvent::from_swarm_event(ev));
288            }
289            _ => {}
290        }
291    }
292
293    fn on_connection_handler_event(
294        &mut self,
295        peer_id: PeerId,
296        _connection_id: ConnectionId,
297        event: THandlerOutEvent<Self>,
298    ) {
299        match event {
300            HandlerEvent::FrameReceived(frame) => {
301                // Notify the framing service of the received frame handler event.
302                self.framing_service.do_send(FramingInEvent::Upstream(
303                    FramingUpstreamInEvent::RawFrameReceived {
304                        src: peer_id,
305                        frame,
306                    },
307                ));
308            }
309            HandlerEvent::FrameSent => {}
310        }
311    }
312
313    fn poll(
314        &mut self,
315        cx: &mut Context<'_>,
316        _params: &mut impl PollParameters,
317    ) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
318        // Poll the connections service.
319        while let Poll::Ready(conn_event) = self.connections_service.poll(cx) {
320            // Notify the subscriptions service of the connection event.
321            self.subscriptions_service
322                .do_send(SubscriptionsInEvent::from_peer_connection_event(
323                    conn_event.clone(),
324                ));
325
326            // Notify the protocol's routing service of the connection event.
327            self.protocol_router_service.do_send(match conn_event {
328                ConnectionsOutEvent::NewPeerConnected(peer) => {
329                    ProtocolRouterInEvent::ConnectionEvent(
330                        ProtocolRouterConnectionEvent::PeerConnected(peer),
331                    )
332                }
333                ConnectionsOutEvent::PeerDisconnected(peer) => {
334                    ProtocolRouterInEvent::ConnectionEvent(
335                        ProtocolRouterConnectionEvent::PeerDisconnected(peer),
336                    )
337                }
338            });
339        }
340
341        // Poll the subscriptions service.
342        while let Poll::Ready(sub_event) = self.subscriptions_service.poll(cx) {
343            match sub_event {
344                SubscriptionsOutEvent::Subscribed(sub) => {
345                    // Notify the message cache service of the subscription.
346                    self.message_cache_service
347                        .do_send(MessageCacheInEvent::SubscriptionEvent(
348                            MessageCacheSubscriptionEvent::Subscribed {
349                                topic: sub.topic.clone(),
350                                message_id_fn: sub.message_id_fn.clone(),
351                            },
352                        ));
353
354                    // Notify the protocol's routing service of the subscription event.
355                    self.protocol_router_service
356                        .do_send(ProtocolRouterInEvent::SubscriptionEvent(
357                            ProtocolRouterSubscriptionEvent::Subscribed(sub.clone()),
358                        ));
359
360                    // Send the subscription update to all active peers.
361                    tracing::debug!(topic = %sub.topic, "Sending subscription update");
362
363                    let sub_action = SubscriptionAction::Subscribe(sub.topic);
364                    for dest in self.connections_service.active_peers() {
365                        // Notify the framing service of the subscription update request.
366                        self.framing_service.do_send(FramingInEvent::Downstream(
367                            FramingDownstreamInEvent::SendSubscriptionRequest {
368                                dest,
369                                actions: vec![sub_action.clone()],
370                            },
371                        ));
372                    }
373                }
374                SubscriptionsOutEvent::Unsubscribed(topic) => {
375                    // Notify the message cache service of the unsubscription.
376                    self.message_cache_service
377                        .do_send(MessageCacheInEvent::SubscriptionEvent(
378                            MessageCacheSubscriptionEvent::Unsubscribed(topic.clone()),
379                        ));
380
381                    // Notify the protocol's service of the unsubscription event.
382                    self.protocol_router_service
383                        .do_send(ProtocolRouterInEvent::SubscriptionEvent(
384                            ProtocolRouterSubscriptionEvent::Unsubscribed(topic.clone()),
385                        ));
386
387                    // Send the subscription updates to all active peers.
388                    tracing::debug!(%topic, "Sending subscription update");
389
390                    let sub_action = SubscriptionAction::Unsubscribe(topic);
391                    for dest in self.connections_service.active_peers() {
392                        // Notify the framing service of the subscription update request.
393                        self.framing_service.do_send(FramingInEvent::Downstream(
394                            FramingDownstreamInEvent::SendSubscriptionRequest {
395                                dest,
396                                actions: vec![sub_action.clone()],
397                            },
398                        ));
399                    }
400                }
401                SubscriptionsOutEvent::PeerSubscribed { peer, topic } => {
402                    tracing::debug!(src = %peer, %topic, "Peer subscribed");
403
404                    // Notify the protocol's service of the peer subscription event.
405                    self.protocol_router_service
406                        .do_send(ProtocolRouterInEvent::SubscriptionEvent(
407                            ProtocolRouterSubscriptionEvent::PeerSubscribed { peer, topic },
408                        ));
409                }
410                SubscriptionsOutEvent::PeerUnsubscribed { peer, topic } => {
411                    tracing::debug!(src = %peer, %topic, "Peer unsubscribed");
412
413                    // Notify the protocol's service of the peer unsubscription event.
414                    self.protocol_router_service
415                        .do_send(ProtocolRouterInEvent::SubscriptionEvent(
416                            ProtocolRouterSubscriptionEvent::PeerUnsubscribed { peer, topic },
417                        ));
418                }
419                SubscriptionsOutEvent::SendSubscriptions { dest, topics } => {
420                    // Send the subscriptions to the peer.
421                    tracing::debug!(%dest, ?topics, "Sending subscriptions");
422
423                    let actions = topics
424                        .into_iter()
425                        .map(SubscriptionAction::Subscribe)
426                        .collect::<Vec<_>>();
427                    self.framing_service.do_send(FramingInEvent::Downstream(
428                        FramingDownstreamInEvent::SendSubscriptionRequest { dest, actions },
429                    ));
430                }
431            }
432        }
433
434        // Poll the message cache service.
435        let _ = self.message_cache_service.poll(cx);
436
437        // Poll the protocol service.
438        while let Poll::Ready(event) = self.protocol_router_service.poll(cx) {
439            match event {
440                ProtocolRouterOutEvent::ForwardMessage { message, dest } => {
441                    for dest in dest {
442                        // Notify the framing service of the message to send.
443                        self.framing_service.do_send(FramingInEvent::Downstream(
444                            FramingDownstreamInEvent::ForwardMessage {
445                                dest,
446                                message: message.clone(),
447                            },
448                        ));
449                    }
450                }
451            }
452        }
453
454        // Poll the framing service.
455        while let Poll::Ready(event) = self.framing_service.poll(cx) {
456            match event {
457                FramingOutEvent::Downstream(FramingDownstreamOutEvent::SendFrame {
458                    dest,
459                    frame,
460                }) => {
461                    // Send the frame to the peer.
462                    self.send_frame(dest, frame);
463                }
464                FramingOutEvent::Upstream(ev) => match ev {
465                    FramingUpstreamOutEvent::MessageReceived { src, message } => {
466                        // Skip the message if we are not subscribed to the topic or the message
467                        // was already received.
468                        if !self.subscriptions_service.is_subscribed(&message.topic())
469                            || self.message_cache_service.contains(&message)
470                        {
471                            continue;
472                        }
473
474                        // Notify the message cache service of the received message.
475                        self.message_cache_service
476                            .do_send(MessageCacheInEvent::MessageReceived(message.clone()));
477
478                        // Notify the behaviour output mailbox of the received message.
479                        self.behaviour_output_mailbox
480                            .push_back(ToSwarm::GenerateEvent(Event::MessageReceived {
481                                src,
482                                message: (*message).clone().into(),
483                            }));
484
485                        // Notify the protocol's routing service of the received message.
486                        self.protocol_router_service
487                            .do_send(ProtocolRouterInEvent::MessageReceived { src, message });
488                    }
489                    FramingUpstreamOutEvent::SubscriptionRequestReceived { src, action } => {
490                        match &action {
491                            SubscriptionAction::Subscribe(topic)
492                                if !self.subscriptions_service.is_peer_subscribed(&src, topic) =>
493                            {
494                                // Notify the subscriptions service of the subscription request.
495                                self.subscriptions_service.do_send(
496                                    SubscriptionsInEvent::PeerSubscriptionRequest { src, action },
497                                );
498                            }
499                            SubscriptionAction::Unsubscribe(topic)
500                                if self.subscriptions_service.is_peer_subscribed(&src, topic) =>
501                            {
502                                // Notify the subscriptions service of the unsubscription request.
503                                self.subscriptions_service.do_send(
504                                    SubscriptionsInEvent::PeerSubscriptionRequest { src, action },
505                                );
506                            }
507                            _ => {}
508                        }
509                    }
510                },
511            }
512        }
513
514        // Process the connection handler mailbox.
515        if let Some(event) = self.conn_handler_mailbox.pop_front() {
516            return Poll::Ready(event);
517        }
518
519        // Process the behaviour output events mailbox.
520        if let Some(event) = self.behaviour_output_mailbox.pop_front() {
521            return Poll::Ready(event);
522        }
523
524        Poll::Pending
525    }
526}
527
528impl From<ConnectionEstablished<'_>> for ConnectionsSwarmEvent {
529    fn from(ev: ConnectionEstablished) -> Self {
530        Self::ConnectionEstablished {
531            connection_id: ev.connection_id,
532            peer_id: ev.peer_id,
533        }
534    }
535}
536
537impl<H: ConnectionHandler> From<ConnectionClosed<'_, H>> for ConnectionsSwarmEvent {
538    fn from(ev: ConnectionClosed<H>) -> Self {
539        Self::ConnectionClosed {
540            connection_id: ev.connection_id,
541            peer_id: ev.peer_id,
542        }
543    }
544}
545
546impl From<AddressChange<'_>> for ConnectionsSwarmEvent {
547    fn from(ev: AddressChange) -> Self {
548        Self::AddressChange {
549            connection_id: ev.connection_id,
550            peer_id: ev.peer_id,
551            old: ev.old.clone(),
552            new: ev.new.clone(),
553        }
554    }
555}
556
557impl From<DialFailure<'_>> for ConnectionsSwarmEvent {
558    fn from(ev: DialFailure) -> Self {
559        Self::DialFailure {
560            connection_id: ev.connection_id,
561            peer_id: ev.peer_id,
562            error: ev.error.to_string(), // TODO: Use a custom error type.
563        }
564    }
565}
566
567impl From<ListenFailure<'_>> for ConnectionsSwarmEvent {
568    fn from(ev: ListenFailure) -> Self {
569        Self::ListenFailure {
570            connection_id: ev.connection_id,
571            local_addr: ev.local_addr.clone(),
572            send_back_addr: ev.send_back_addr.clone(),
573            error: ev.error.to_string(), // TODO: Use a custom error type.
574        }
575    }
576}
577
578impl From<ConnectionsOutEvent> for SubscriptionsPeerConnectionEvent {
579    fn from(ev: ConnectionsOutEvent) -> Self {
580        match ev {
581            ConnectionsOutEvent::NewPeerConnected(peer) => {
582                SubscriptionsPeerConnectionEvent::NewPeerConnected(peer)
583            }
584            ConnectionsOutEvent::PeerDisconnected(peer) => {
585                SubscriptionsPeerConnectionEvent::PeerDisconnected(peer)
586            }
587        }
588    }
589}
590
591impl From<Message> for FrameMessage {
592    fn from(message: Message) -> Self {
593        let mut msg = Self::new(message.topic, message.data);
594        msg.set_sequence_number(message.sequence_number);
595        msg.set_key(message.key);
596        msg.set_source(message.from);
597        msg.set_signature(message.signature);
598        msg
599    }
600}
601
602impl From<FrameMessage> for Message {
603    fn from(message: FrameMessage) -> Self {
604        Self {
605            topic: message.topic(),
606            data: message.data().to_vec(),
607            sequence_number: message.sequence_number(),
608            key: message.key().map(ToOwned::to_owned),
609            from: message.source(),
610            signature: message.signature().map(ToOwned::to_owned),
611        }
612    }
613}