1use std::collections::{BTreeSet, VecDeque};
2use std::rc::Rc;
3use std::task::{Context, Poll};
4use std::time::Duration;
5
6use bytes::Bytes;
7use libp2p::core::Endpoint;
8use libp2p::identity::PeerId;
9use libp2p::swarm::behaviour::ConnectionEstablished;
10use libp2p::swarm::{
11 AddressChange, ConnectionClosed, ConnectionDenied, ConnectionHandler, ConnectionId,
12 DialFailure, FromSwarm, ListenFailure, NetworkBehaviour, NotifyHandler, PollParameters,
13 THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
14};
15use libp2p::Multiaddr;
16
17use libp2p_pubsub_common::service::{BufferedContext, ServiceContext};
18
19use crate::config::Config;
20use crate::conn_handler::{Command as HandlerCommand, Event as HandlerEvent, Handler};
21use crate::event::Event;
22use crate::framing::{Message as FrameMessage, SubscriptionAction};
23use crate::message::Message;
24use crate::protocol::{
25 Protocol, ProtocolRouterConnectionEvent, ProtocolRouterInEvent, ProtocolRouterOutEvent,
26 ProtocolRouterSubscriptionEvent,
27};
28use crate::services::connections::{
29 ConnectionsInEvent, ConnectionsOutEvent, ConnectionsService, ConnectionsSwarmEvent,
30};
31use crate::services::framing::{
32 FramingDownstreamInEvent, FramingDownstreamOutEvent, FramingInEvent, FramingOutEvent,
33 FramingServiceContext, FramingUpstreamInEvent, FramingUpstreamOutEvent,
34};
35use crate::services::message_cache::{
36 MessageCacheInEvent, MessageCacheService, MessageCacheSubscriptionEvent,
37};
38use crate::services::subscriptions::{
39 SubscriptionsInEvent, SubscriptionsOutEvent, SubscriptionsPeerConnectionEvent,
40 SubscriptionsService,
41};
42use crate::subscription::Subscription;
43use crate::topic::{Hasher, Topic, TopicHash};
44
45pub struct Behaviour<P: Protocol> {
46 config: Config,
48
49 connections_service: BufferedContext<ConnectionsService>,
51
52 subscriptions_service: BufferedContext<SubscriptionsService>,
54
55 message_cache_service: BufferedContext<MessageCacheService>,
57
58 protocol_router_service: BufferedContext<P::RouterService>,
60
61 framing_service: FramingServiceContext,
63
64 conn_handler_mailbox: VecDeque<ToSwarm<Event, HandlerCommand>>,
68
69 behaviour_output_mailbox: VecDeque<ToSwarm<Event, HandlerCommand>>,
74}
75
76impl<P: Protocol> Behaviour<P> {
78 pub fn new(config: Config, protocol: P) -> Self {
80 let message_cache_service = BufferedContext::new(MessageCacheService::new(
81 config.message_cache_capacity(),
82 config.message_cache_ttl(),
83 config.heartbeat_interval(),
84 Duration::from_secs(0),
85 ));
86 let protocol_router_service = BufferedContext::new(protocol.router());
87
88 Self {
89 config,
90 connections_service: Default::default(),
91 subscriptions_service: Default::default(),
92 message_cache_service,
93 protocol_router_service,
94 framing_service: Default::default(),
95 conn_handler_mailbox: Default::default(),
96 behaviour_output_mailbox: Default::default(),
97 }
98 }
99
100 pub fn connections(&self) -> &ConnectionsService {
102 &self.connections_service
103 }
104
105 pub fn subscriptions(&self) -> &BTreeSet<TopicHash> {
107 self.subscriptions_service.subscriptions()
108 }
109
110 pub fn peer_subscriptions(&self, peer_id: &PeerId) -> Option<&BTreeSet<TopicHash>> {
112 self.subscriptions_service.peer_subscriptions(peer_id)
113 }
114
115 pub fn subscribe(&mut self, sub: impl Into<Subscription>) -> anyhow::Result<bool> {
120 let sub = sub.into();
121
122 tracing::debug!(?sub, "Subscribing to topic");
123
124 if self.subscriptions_service.is_subscribed(&sub.topic) {
125 return Ok(false);
126 }
127
128 self.subscriptions_service
130 .do_send(SubscriptionsInEvent::SubscriptionRequest(sub));
131
132 Ok(true)
133 }
134
135 pub fn unsubscribe<H: Hasher>(&mut self, topic: &Topic<H>) -> anyhow::Result<bool> {
140 tracing::debug!(sub = %topic, "Unsubscribing from topic");
141
142 let topic = topic.hash();
143
144 if !self.subscriptions_service.is_subscribed(&topic) {
145 return Ok(false);
146 }
147
148 self.subscriptions_service
150 .do_send(SubscriptionsInEvent::UnsubscriptionRequest(topic));
151
152 Ok(true)
153 }
154
155 pub fn publish(&mut self, message: Message) -> anyhow::Result<()> {
157 let topic = message.topic.clone();
158
159 tracing::debug!(%topic, "Publishing message");
160
161 if !self.subscriptions_service.is_subscribed(&topic) {
163 return Err(anyhow::anyhow!("Not subscribed to topic"));
164 }
165
166 if self.connections_service.active_peers_count() == 0 {
168 return Err(anyhow::anyhow!("No active connections"));
169 }
170
171 let message = FrameMessage::from(message);
172
173 if self.message_cache_service.contains(&message) {
175 return Err(anyhow::anyhow!("Message already published"));
176 }
177
178 let message = Rc::new(message);
179
180 self.message_cache_service
182 .do_send(MessageCacheInEvent::MessagePublished(message.clone()));
183
184 self.protocol_router_service
186 .do_send(ProtocolRouterInEvent::MessagePublished(message));
187
188 Ok(())
189 }
190}
191
192impl<P: Protocol> Behaviour<P> {
194 fn send_frame(&mut self, dest: PeerId, frame: Bytes) {
199 tracing::trace!(%dest, "Sending frame");
200
201 if frame.len() > self.config.max_frame_size() {
203 tracing::warn!(%dest, "Frame size exceeds maximum allowed size");
204 return;
205 }
206
207 self.conn_handler_mailbox.push_back(ToSwarm::NotifyHandler {
208 peer_id: dest,
209 handler: NotifyHandler::Any,
210 event: HandlerCommand::SendFrame(frame),
211 });
212 }
213}
214
215impl<P> NetworkBehaviour for Behaviour<P>
216where
217 P: Protocol + 'static,
218{
219 type ConnectionHandler = Handler<P::Upgrade>;
220 type ToSwarm = Event;
221
222 fn handle_established_inbound_connection(
223 &mut self,
224 connection_id: ConnectionId,
225 peer_id: PeerId,
226 local_addr: &Multiaddr,
227 remote_addr: &Multiaddr,
228 ) -> Result<THandler<Self>, ConnectionDenied> {
229 self.connections_service
231 .do_send(ConnectionsInEvent::EstablishedInboundConnection {
232 connection_id,
233 peer_id,
234 local_addr: local_addr.clone(),
235 remote_addr: remote_addr.clone(),
236 });
237
238 Ok(Handler::new(
239 P::upgrade(),
240 self.config.max_frame_size(),
241 self.config.connection_idle_timeout(),
242 ))
243 }
244
245 fn handle_established_outbound_connection(
246 &mut self,
247 connection_id: ConnectionId,
248 peer_id: PeerId,
249 remote_addr: &Multiaddr,
250 _role_override: Endpoint,
251 ) -> Result<THandler<Self>, ConnectionDenied> {
252 self.connections_service
254 .do_send(ConnectionsInEvent::EstablishedOutboundConnection {
255 connection_id,
256 peer_id,
257 remote_addr: remote_addr.clone(),
258 });
259
260 Ok(Handler::new(
261 P::upgrade(),
262 self.config.max_frame_size(),
263 self.config.connection_idle_timeout(),
264 ))
265 }
266
267 fn on_swarm_event(&mut self, event: FromSwarm<Self::ConnectionHandler>) {
268 match event {
269 FromSwarm::ConnectionEstablished(ev) => {
270 self.connections_service
271 .do_send(ConnectionsInEvent::from_swarm_event(ev));
272 }
273 FromSwarm::ConnectionClosed(ev) => {
274 self.connections_service
275 .do_send(ConnectionsInEvent::from_swarm_event(ev));
276 }
277 FromSwarm::AddressChange(ev) => {
278 self.connections_service
279 .do_send(ConnectionsInEvent::from_swarm_event(ev));
280 }
281 FromSwarm::DialFailure(ev) => {
282 self.connections_service
283 .do_send(ConnectionsInEvent::from_swarm_event(ev));
284 }
285 FromSwarm::ListenFailure(ev) => {
286 self.connections_service
287 .do_send(ConnectionsInEvent::from_swarm_event(ev));
288 }
289 _ => {}
290 }
291 }
292
293 fn on_connection_handler_event(
294 &mut self,
295 peer_id: PeerId,
296 _connection_id: ConnectionId,
297 event: THandlerOutEvent<Self>,
298 ) {
299 match event {
300 HandlerEvent::FrameReceived(frame) => {
301 self.framing_service.do_send(FramingInEvent::Upstream(
303 FramingUpstreamInEvent::RawFrameReceived {
304 src: peer_id,
305 frame,
306 },
307 ));
308 }
309 HandlerEvent::FrameSent => {}
310 }
311 }
312
313 fn poll(
314 &mut self,
315 cx: &mut Context<'_>,
316 _params: &mut impl PollParameters,
317 ) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
318 while let Poll::Ready(conn_event) = self.connections_service.poll(cx) {
320 self.subscriptions_service
322 .do_send(SubscriptionsInEvent::from_peer_connection_event(
323 conn_event.clone(),
324 ));
325
326 self.protocol_router_service.do_send(match conn_event {
328 ConnectionsOutEvent::NewPeerConnected(peer) => {
329 ProtocolRouterInEvent::ConnectionEvent(
330 ProtocolRouterConnectionEvent::PeerConnected(peer),
331 )
332 }
333 ConnectionsOutEvent::PeerDisconnected(peer) => {
334 ProtocolRouterInEvent::ConnectionEvent(
335 ProtocolRouterConnectionEvent::PeerDisconnected(peer),
336 )
337 }
338 });
339 }
340
341 while let Poll::Ready(sub_event) = self.subscriptions_service.poll(cx) {
343 match sub_event {
344 SubscriptionsOutEvent::Subscribed(sub) => {
345 self.message_cache_service
347 .do_send(MessageCacheInEvent::SubscriptionEvent(
348 MessageCacheSubscriptionEvent::Subscribed {
349 topic: sub.topic.clone(),
350 message_id_fn: sub.message_id_fn.clone(),
351 },
352 ));
353
354 self.protocol_router_service
356 .do_send(ProtocolRouterInEvent::SubscriptionEvent(
357 ProtocolRouterSubscriptionEvent::Subscribed(sub.clone()),
358 ));
359
360 tracing::debug!(topic = %sub.topic, "Sending subscription update");
362
363 let sub_action = SubscriptionAction::Subscribe(sub.topic);
364 for dest in self.connections_service.active_peers() {
365 self.framing_service.do_send(FramingInEvent::Downstream(
367 FramingDownstreamInEvent::SendSubscriptionRequest {
368 dest,
369 actions: vec![sub_action.clone()],
370 },
371 ));
372 }
373 }
374 SubscriptionsOutEvent::Unsubscribed(topic) => {
375 self.message_cache_service
377 .do_send(MessageCacheInEvent::SubscriptionEvent(
378 MessageCacheSubscriptionEvent::Unsubscribed(topic.clone()),
379 ));
380
381 self.protocol_router_service
383 .do_send(ProtocolRouterInEvent::SubscriptionEvent(
384 ProtocolRouterSubscriptionEvent::Unsubscribed(topic.clone()),
385 ));
386
387 tracing::debug!(%topic, "Sending subscription update");
389
390 let sub_action = SubscriptionAction::Unsubscribe(topic);
391 for dest in self.connections_service.active_peers() {
392 self.framing_service.do_send(FramingInEvent::Downstream(
394 FramingDownstreamInEvent::SendSubscriptionRequest {
395 dest,
396 actions: vec![sub_action.clone()],
397 },
398 ));
399 }
400 }
401 SubscriptionsOutEvent::PeerSubscribed { peer, topic } => {
402 tracing::debug!(src = %peer, %topic, "Peer subscribed");
403
404 self.protocol_router_service
406 .do_send(ProtocolRouterInEvent::SubscriptionEvent(
407 ProtocolRouterSubscriptionEvent::PeerSubscribed { peer, topic },
408 ));
409 }
410 SubscriptionsOutEvent::PeerUnsubscribed { peer, topic } => {
411 tracing::debug!(src = %peer, %topic, "Peer unsubscribed");
412
413 self.protocol_router_service
415 .do_send(ProtocolRouterInEvent::SubscriptionEvent(
416 ProtocolRouterSubscriptionEvent::PeerUnsubscribed { peer, topic },
417 ));
418 }
419 SubscriptionsOutEvent::SendSubscriptions { dest, topics } => {
420 tracing::debug!(%dest, ?topics, "Sending subscriptions");
422
423 let actions = topics
424 .into_iter()
425 .map(SubscriptionAction::Subscribe)
426 .collect::<Vec<_>>();
427 self.framing_service.do_send(FramingInEvent::Downstream(
428 FramingDownstreamInEvent::SendSubscriptionRequest { dest, actions },
429 ));
430 }
431 }
432 }
433
434 let _ = self.message_cache_service.poll(cx);
436
437 while let Poll::Ready(event) = self.protocol_router_service.poll(cx) {
439 match event {
440 ProtocolRouterOutEvent::ForwardMessage { message, dest } => {
441 for dest in dest {
442 self.framing_service.do_send(FramingInEvent::Downstream(
444 FramingDownstreamInEvent::ForwardMessage {
445 dest,
446 message: message.clone(),
447 },
448 ));
449 }
450 }
451 }
452 }
453
454 while let Poll::Ready(event) = self.framing_service.poll(cx) {
456 match event {
457 FramingOutEvent::Downstream(FramingDownstreamOutEvent::SendFrame {
458 dest,
459 frame,
460 }) => {
461 self.send_frame(dest, frame);
463 }
464 FramingOutEvent::Upstream(ev) => match ev {
465 FramingUpstreamOutEvent::MessageReceived { src, message } => {
466 if !self.subscriptions_service.is_subscribed(&message.topic())
469 || self.message_cache_service.contains(&message)
470 {
471 continue;
472 }
473
474 self.message_cache_service
476 .do_send(MessageCacheInEvent::MessageReceived(message.clone()));
477
478 self.behaviour_output_mailbox
480 .push_back(ToSwarm::GenerateEvent(Event::MessageReceived {
481 src,
482 message: (*message).clone().into(),
483 }));
484
485 self.protocol_router_service
487 .do_send(ProtocolRouterInEvent::MessageReceived { src, message });
488 }
489 FramingUpstreamOutEvent::SubscriptionRequestReceived { src, action } => {
490 match &action {
491 SubscriptionAction::Subscribe(topic)
492 if !self.subscriptions_service.is_peer_subscribed(&src, topic) =>
493 {
494 self.subscriptions_service.do_send(
496 SubscriptionsInEvent::PeerSubscriptionRequest { src, action },
497 );
498 }
499 SubscriptionAction::Unsubscribe(topic)
500 if self.subscriptions_service.is_peer_subscribed(&src, topic) =>
501 {
502 self.subscriptions_service.do_send(
504 SubscriptionsInEvent::PeerSubscriptionRequest { src, action },
505 );
506 }
507 _ => {}
508 }
509 }
510 },
511 }
512 }
513
514 if let Some(event) = self.conn_handler_mailbox.pop_front() {
516 return Poll::Ready(event);
517 }
518
519 if let Some(event) = self.behaviour_output_mailbox.pop_front() {
521 return Poll::Ready(event);
522 }
523
524 Poll::Pending
525 }
526}
527
528impl From<ConnectionEstablished<'_>> for ConnectionsSwarmEvent {
529 fn from(ev: ConnectionEstablished) -> Self {
530 Self::ConnectionEstablished {
531 connection_id: ev.connection_id,
532 peer_id: ev.peer_id,
533 }
534 }
535}
536
537impl<H: ConnectionHandler> From<ConnectionClosed<'_, H>> for ConnectionsSwarmEvent {
538 fn from(ev: ConnectionClosed<H>) -> Self {
539 Self::ConnectionClosed {
540 connection_id: ev.connection_id,
541 peer_id: ev.peer_id,
542 }
543 }
544}
545
546impl From<AddressChange<'_>> for ConnectionsSwarmEvent {
547 fn from(ev: AddressChange) -> Self {
548 Self::AddressChange {
549 connection_id: ev.connection_id,
550 peer_id: ev.peer_id,
551 old: ev.old.clone(),
552 new: ev.new.clone(),
553 }
554 }
555}
556
557impl From<DialFailure<'_>> for ConnectionsSwarmEvent {
558 fn from(ev: DialFailure) -> Self {
559 Self::DialFailure {
560 connection_id: ev.connection_id,
561 peer_id: ev.peer_id,
562 error: ev.error.to_string(), }
564 }
565}
566
567impl From<ListenFailure<'_>> for ConnectionsSwarmEvent {
568 fn from(ev: ListenFailure) -> Self {
569 Self::ListenFailure {
570 connection_id: ev.connection_id,
571 local_addr: ev.local_addr.clone(),
572 send_back_addr: ev.send_back_addr.clone(),
573 error: ev.error.to_string(), }
575 }
576}
577
578impl From<ConnectionsOutEvent> for SubscriptionsPeerConnectionEvent {
579 fn from(ev: ConnectionsOutEvent) -> Self {
580 match ev {
581 ConnectionsOutEvent::NewPeerConnected(peer) => {
582 SubscriptionsPeerConnectionEvent::NewPeerConnected(peer)
583 }
584 ConnectionsOutEvent::PeerDisconnected(peer) => {
585 SubscriptionsPeerConnectionEvent::PeerDisconnected(peer)
586 }
587 }
588 }
589}
590
591impl From<Message> for FrameMessage {
592 fn from(message: Message) -> Self {
593 let mut msg = Self::new(message.topic, message.data);
594 msg.set_sequence_number(message.sequence_number);
595 msg.set_key(message.key);
596 msg.set_source(message.from);
597 msg.set_signature(message.signature);
598 msg
599 }
600}
601
602impl From<FrameMessage> for Message {
603 fn from(message: FrameMessage) -> Self {
604 Self {
605 topic: message.topic(),
606 data: message.data().to_vec(),
607 sequence_number: message.sequence_number(),
608 key: message.key().map(ToOwned::to_owned),
609 from: message.source(),
610 signature: message.signature().map(ToOwned::to_owned),
611 }
612 }
613}