1use std::collections::{HashMap, HashSet, VecDeque};
2use std::task;
3use std::time::Duration;
4
5use arrayvec::ArrayVec;
6use libp2p::core::Endpoint;
7use libp2p::core::transport::PortUse;
8use libp2p::gossipsub::{self, IdentTopic, PublishError};
9use libp2p::swarm::{
10 ConnectionDenied, ConnectionId, FromSwarm, NetworkBehaviour, THandler, THandlerInEvent,
11 THandlerOutEvent, ToSwarm,
12};
13use libp2p::{Multiaddr, PeerId};
14use sierradb::MAX_REPLICATION_FACTOR;
15use sierradb::bucket::PartitionId;
16use tokio::time::Interval;
17use tracing::{debug, error, info, trace, warn};
18
19use crate::ClusterKey;
20
21use super::manager::TopologyManager;
22use super::messages::{Heartbeat, OwnershipMessage};
23
24const HEARTBEAT_TOPIC: &str = "sierra/heartbeat";
26const OWNERSHIP_TOPIC: &str = "sierra/ownership";
27
28pub struct Behaviour<T: ClusterKey> {
29 pub gossipsub: gossipsub::Behaviour,
30 pub manager: TopologyManager<T>,
31 pub heartbeat_interval: Interval,
32 pub timeout_check_interval: Interval,
33 pub heartbeat_topic: gossipsub::IdentTopic,
34 pub ownership_topic: gossipsub::IdentTopic,
35 pub pending_events: VecDeque<PartitionBehaviourEvent<T>>,
36 pub heartbeat_bytes: Box<[u8]>,
37}
38
39impl<T: ClusterKey> Behaviour<T> {
40 pub fn new(
41 mut gossipsub: gossipsub::Behaviour,
42 manager: TopologyManager<T>,
43 heartbeat_interval_duration: Duration,
44 ) -> Self {
45 let heartbeat_interval = tokio::time::interval(heartbeat_interval_duration);
47 let timeout_check_interval = tokio::time::interval(heartbeat_interval_duration / 2);
48
49 let heartbeat_topic = IdentTopic::new(HEARTBEAT_TOPIC);
51 let ownership_topic = IdentTopic::new(OWNERSHIP_TOPIC);
52
53 let _ = gossipsub.subscribe(&heartbeat_topic);
55 let _ = gossipsub.subscribe(&ownership_topic);
56
57 let heartbeat_bytes = bincode::encode_to_vec(
59 Heartbeat {
60 cluster_ref: bincode::serde::Compat(manager.local_cluster_ref.clone()),
61 owned_partitions: manager.assigned_partitions.clone(),
62 alive_since: manager.alive_since,
63 node_index: manager.local_node_index,
64 total_node_count: manager.total_node_count,
65 },
66 bincode::config::standard(),
67 )
68 .unwrap()
69 .into();
70
71 info!(
72 "node {} owns {}/{} partitions as replicas",
73 manager.local_node_index,
74 manager.assigned_partitions.len(),
75 manager.num_partitions,
76 );
77 trace!(
78 "node {} is replica for partitions: {:?}",
79 manager.local_node_index, manager.assigned_partitions,
80 );
81
82 Self {
83 gossipsub,
84 manager,
85 heartbeat_interval,
86 timeout_check_interval,
87 heartbeat_topic,
88 ownership_topic,
89 pending_events: VecDeque::new(),
90 heartbeat_bytes,
91 }
92 }
93
94 pub fn add_explicit_peer(&mut self, peer_id: PeerId) {
95 self.gossipsub.add_explicit_peer(&peer_id);
97
98 let request = OwnershipMessage::OwnershipRequest {
100 cluster_ref: self.manager.local_cluster_ref.clone(),
101 owned_partitions: self.manager.assigned_partitions.clone(),
102 alive_since: self.manager.alive_since,
103 node_index: self.manager.local_node_index,
104 total_node_count: self.manager.total_node_count,
105 };
106
107 self.send_ownership_message(request);
108 }
109
110 fn handle_gossipsub_event(&mut self, event: &gossipsub::Event) {
111 if let gossipsub::Event::Message { message, .. } = event {
112 match message.topic.as_str() {
113 HEARTBEAT_TOPIC => {
114 self.handle_heartbeat_message(&message.data);
115 }
116 OWNERSHIP_TOPIC => {
117 self.handle_ownership_message(&message.data);
118 }
119 topic => {
120 warn!("received message on unknown topic: {topic}")
121 }
122 }
123 }
124 }
125
126 fn handle_heartbeat_message(&mut self, data: &[u8]) {
127 match bincode::decode_from_slice::<Heartbeat<bincode::serde::Compat<T>>, _>(
128 data,
129 bincode::config::standard(),
130 ) {
131 Ok((
132 Heartbeat {
133 cluster_ref: bincode::serde::Compat(cluster_ref),
134 owned_partitions,
135 alive_since,
136 node_index,
137 total_node_count,
138 },
139 _,
140 )) => {
141 let cluster_id = cluster_ref.id();
142 trace!("received heartbeat from {cluster_id} (index: {node_index})");
143
144 if cluster_ref.id() == self.manager.local_cluster_ref.id() {
146 return;
147 }
148
149 let was_inactive = !self
151 .manager
152 .active_nodes
153 .contains_key(cluster_ref.id().peer_id().unwrap());
154
155 let status_changed = self.manager.on_heartbeat(
157 cluster_ref.clone(),
158 &owned_partitions,
159 alive_since,
160 node_index,
161 total_node_count,
162 );
163
164 self.manager.ensure_local_partitions();
166
167 if status_changed || was_inactive {
169 info!(
170 "node {cluster_id} (index: {node_index}) is now active with {} replica partitions",
171 owned_partitions.len()
172 );
173
174 self.pending_events
175 .push_back(PartitionBehaviourEvent::NodeStatusChanged {
176 peer_id: *cluster_ref.id().peer_id().unwrap(),
177 is_active: true,
178 owned_partitions: owned_partitions.clone(),
179 });
180
181 self.pending_events.push_back(
182 PartitionBehaviourEvent::ReplicaAssignmentUpdated {
183 partition_replicas: self.manager.partition_replicas.clone(),
184 active_nodes: self.manager.active_nodes.clone(),
185 },
186 );
187 }
188 }
189 Err(err) => {
190 error!("failed to decode heartbeat message: {}", err);
191 }
192 }
193 }
194
195 fn handle_ownership_message(&mut self, data: &[u8]) {
196 match bincode::decode_from_slice::<OwnershipMessage<bincode::serde::Compat<T>>, _>(
197 data,
198 bincode::config::standard(),
199 ) {
200 Ok((msg, _)) => {
201 self.handle_partition_message(msg.map(|bincode::serde::Compat(key)| key));
202 }
203 Err(err) => {
204 error!("failed to decode ownership message: {err}");
205 }
206 }
207 }
208
209 fn handle_partition_message(&mut self, message: OwnershipMessage<T>) {
210 match message {
211 OwnershipMessage::OwnershipRequest {
212 cluster_ref,
213 owned_partitions,
214 alive_since,
215 node_index,
216 total_node_count,
217 } => {
218 if cluster_ref.id() == self.manager.local_cluster_ref.id() {
220 return;
221 }
222
223 info!(
224 "received ownership request from {} (index: {}) with {} replica partitions",
225 cluster_ref.id(),
226 node_index,
227 owned_partitions.len()
228 );
229
230 if let Some(response) = self.manager.on_node_connected(
232 cluster_ref.clone(),
233 &owned_partitions,
234 alive_since,
235 node_index,
236 total_node_count,
237 ) {
238 self.send_ownership_message(response);
240
241 self.pending_events
243 .push_back(PartitionBehaviourEvent::NodeStatusChanged {
244 peer_id: *cluster_ref.id().peer_id().unwrap(),
245 is_active: true,
246 owned_partitions: owned_partitions.clone(),
247 });
248
249 self.pending_events.push_back(
250 PartitionBehaviourEvent::ReplicaAssignmentUpdated {
251 partition_replicas: self.manager.partition_replicas.clone(),
252 active_nodes: self.manager.active_nodes.clone(),
253 },
254 );
255 }
256 }
257 OwnershipMessage::OwnershipResponse {
258 partition_replicas,
259 active_nodes,
260 } => {
261 info!(
262 "received ownership response with {} partition replica assignments and {} active nodes",
263 partition_replicas.len(),
264 active_nodes.len()
265 );
266
267 let partition_replicas = partition_replicas
269 .into_iter()
270 .flat_map(|(cluster_ref, partition_ids)| {
271 partition_ids
272 .into_iter()
273 .map(move |partition_id| (partition_id, cluster_ref.clone()))
274 })
275 .fold(
276 HashMap::<PartitionId, ArrayVec<T, MAX_REPLICATION_FACTOR>>::new(),
277 |mut acc, (partition_id, cluster_ref)| {
278 acc.entry(partition_id).or_default().push(cluster_ref);
279 acc
280 },
281 );
282
283 self.manager
285 .handle_ownership_response(&partition_replicas, active_nodes.clone());
286
287 self.manager.ensure_local_partitions();
289
290 self.pending_events
292 .push_back(PartitionBehaviourEvent::ReplicaAssignmentUpdated {
293 partition_replicas: self.manager.partition_replicas.clone(),
294 active_nodes: self.manager.active_nodes.clone(),
295 });
296 }
297 }
298 }
299
300 fn send_ownership_message(&mut self, message: OwnershipMessage<T>) {
301 match bincode::encode_to_vec(
302 message.map(bincode::serde::Compat),
303 bincode::config::standard(),
304 ) {
305 Ok(encoded) => {
306 if let Err(err) = self.gossipsub.publish(self.ownership_topic.hash(), encoded)
307 && !matches!(err, PublishError::NoPeersSubscribedToTopic)
308 {
309 error!("error publishing ownership message: {err}");
310 }
311 }
312 Err(err) => {
313 error!("failed to encode ownership message: {err}");
314 }
315 }
316 }
317
318 fn send_heartbeat(&mut self) {
319 self.manager.ensure_local_partitions();
321
322 trace!(
323 "sending heartbeat with {} replica partitions",
324 self.manager.assigned_partitions.len()
325 );
326
327 if let Err(err) = self
328 .gossipsub
329 .publish(self.heartbeat_topic.hash(), self.heartbeat_bytes.clone())
330 && !matches!(err, PublishError::NoPeersSubscribedToTopic)
331 {
332 error!("error publishing heartbeat: {err}");
333 }
334 }
335}
336
337#[derive(Debug)]
338pub enum PartitionBehaviourEvent<T> {
339 Gossipsub(<gossipsub::Behaviour as NetworkBehaviour>::ToSwarm),
340 NodeStatusChanged {
341 peer_id: PeerId,
342 is_active: bool,
343 owned_partitions: HashSet<PartitionId>, },
345 ReplicaAssignmentUpdated {
346 partition_replicas: HashMap<PartitionId, ArrayVec<T, MAX_REPLICATION_FACTOR>>,
347 active_nodes: HashMap<PeerId, (u64, usize)>, },
349}
350
351impl<T: ClusterKey> NetworkBehaviour for Behaviour<T> {
352 type ConnectionHandler = THandler<gossipsub::Behaviour>;
353 type ToSwarm = PartitionBehaviourEvent<T>;
354
355 fn handle_pending_inbound_connection(
356 &mut self,
357 connection_id: ConnectionId,
358 local_addr: &Multiaddr,
359 remote_addr: &Multiaddr,
360 ) -> Result<(), ConnectionDenied> {
361 NetworkBehaviour::handle_pending_inbound_connection(
362 &mut self.gossipsub,
363 connection_id,
364 local_addr,
365 remote_addr,
366 )
367 }
368
369 fn handle_established_inbound_connection(
370 &mut self,
371 connection_id: ConnectionId,
372 peer: PeerId,
373 local_addr: &Multiaddr,
374 remote_addr: &Multiaddr,
375 ) -> Result<THandler<Self>, ConnectionDenied> {
376 debug!("inbound connection established from {}", peer);
377 self.gossipsub.handle_established_inbound_connection(
378 connection_id,
379 peer,
380 local_addr,
381 remote_addr,
382 )
383 }
384
385 fn handle_pending_outbound_connection(
386 &mut self,
387 connection_id: ConnectionId,
388 maybe_peer: Option<PeerId>,
389 addresses: &[Multiaddr],
390 effective_role: Endpoint,
391 ) -> Result<Vec<Multiaddr>, ConnectionDenied> {
392 NetworkBehaviour::handle_pending_outbound_connection(
393 &mut self.gossipsub,
394 connection_id,
395 maybe_peer,
396 addresses,
397 effective_role,
398 )
399 }
400
401 fn handle_established_outbound_connection(
402 &mut self,
403 connection_id: ConnectionId,
404 peer: PeerId,
405 addr: &Multiaddr,
406 role_override: Endpoint,
407 port_use: PortUse,
408 ) -> Result<THandler<Self>, ConnectionDenied> {
409 debug!("outbound connection established to {}", peer);
410 self.gossipsub.handle_established_outbound_connection(
411 connection_id,
412 peer,
413 addr,
414 role_override,
415 port_use,
416 )
417 }
418
419 fn on_swarm_event(&mut self, event: FromSwarm) {
420 match &event {
421 FromSwarm::ConnectionEstablished(ev) => {
422 info!("connection established with {}", ev.peer_id);
423 self.add_explicit_peer(ev.peer_id);
425 }
426 FromSwarm::ConnectionClosed(ev) => {
427 info!("connection closed with {}", ev.peer_id);
428 self.gossipsub.remove_explicit_peer(&ev.peer_id);
430
431 self.manager.on_node_disconnected(&ev.peer_id);
433
434 self.pending_events
436 .push_back(PartitionBehaviourEvent::NodeStatusChanged {
437 peer_id: ev.peer_id,
438 is_active: false,
439 owned_partitions: HashSet::new(), });
441
442 self.pending_events
443 .push_back(PartitionBehaviourEvent::ReplicaAssignmentUpdated {
444 partition_replicas: self.manager.partition_replicas.clone(),
445 active_nodes: self.manager.active_nodes.clone(),
446 });
447 }
448 _ => {}
449 }
450
451 self.gossipsub.on_swarm_event(event);
452 }
453
454 fn on_connection_handler_event(
455 &mut self,
456 peer_id: PeerId,
457 connection_id: ConnectionId,
458 event: THandlerOutEvent<Self>,
459 ) {
460 NetworkBehaviour::on_connection_handler_event(
461 &mut self.gossipsub,
462 peer_id,
463 connection_id,
464 event,
465 )
466 }
467
468 fn poll(
469 &mut self,
470 cx: &mut task::Context,
471 ) -> task::Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
472 if let Some(ev) = self.pending_events.pop_front() {
473 return task::Poll::Ready(ToSwarm::GenerateEvent(ev));
474 }
475
476 if let task::Poll::Ready(ev) = NetworkBehaviour::poll(&mut self.gossipsub, cx) {
477 if let ToSwarm::GenerateEvent(ev) = &ev {
478 self.handle_gossipsub_event(ev);
479 }
480
481 return task::Poll::Ready(ev.map_out(PartitionBehaviourEvent::Gossipsub));
482 }
483
484 if self.heartbeat_interval.poll_tick(cx).is_ready() {
486 self.send_heartbeat();
487 }
488
489 if self.timeout_check_interval.poll_tick(cx).is_ready() {
491 let status_changed = self.manager.check_heartbeat_timeouts();
492
493 if status_changed {
494 self.pending_events
496 .push_back(PartitionBehaviourEvent::ReplicaAssignmentUpdated {
497 partition_replicas: self.manager.partition_replicas.clone(),
498 active_nodes: self.manager.active_nodes.clone(),
499 });
500 }
501 }
502
503 task::Poll::Pending
504 }
505}