sierradb_topology/
behaviour.rs

1use std::collections::{HashMap, HashSet, VecDeque};
2use std::task;
3use std::time::Duration;
4
5use arrayvec::ArrayVec;
6use libp2p::core::Endpoint;
7use libp2p::core::transport::PortUse;
8use libp2p::gossipsub::{self, IdentTopic, PublishError};
9use libp2p::swarm::{
10    ConnectionDenied, ConnectionId, FromSwarm, NetworkBehaviour, THandler, THandlerInEvent,
11    THandlerOutEvent, ToSwarm,
12};
13use libp2p::{Multiaddr, PeerId};
14use sierradb::MAX_REPLICATION_FACTOR;
15use sierradb::bucket::PartitionId;
16use tokio::time::Interval;
17use tracing::{debug, error, info, trace, warn};
18
19use crate::ClusterKey;
20
21use super::manager::TopologyManager;
22use super::messages::{Heartbeat, OwnershipMessage};
23
24// Topic names
25const HEARTBEAT_TOPIC: &str = "sierra/heartbeat";
26const OWNERSHIP_TOPIC: &str = "sierra/ownership";
27
28pub struct Behaviour<T: ClusterKey> {
29    pub gossipsub: gossipsub::Behaviour,
30    pub manager: TopologyManager<T>,
31    pub heartbeat_interval: Interval,
32    pub timeout_check_interval: Interval,
33    pub heartbeat_topic: gossipsub::IdentTopic,
34    pub ownership_topic: gossipsub::IdentTopic,
35    pub pending_events: VecDeque<PartitionBehaviourEvent<T>>,
36    pub heartbeat_bytes: Box<[u8]>,
37}
38
39impl<T: ClusterKey> Behaviour<T> {
40    pub fn new(
41        mut gossipsub: gossipsub::Behaviour,
42        manager: TopologyManager<T>,
43        heartbeat_interval_duration: Duration,
44    ) -> Self {
45        // Set up intervals for periodic tasks
46        let heartbeat_interval = tokio::time::interval(heartbeat_interval_duration);
47        let timeout_check_interval = tokio::time::interval(heartbeat_interval_duration / 2);
48
49        // Create topics
50        let heartbeat_topic = IdentTopic::new(HEARTBEAT_TOPIC);
51        let ownership_topic = IdentTopic::new(OWNERSHIP_TOPIC);
52
53        // Subscribe to both topics
54        let _ = gossipsub.subscribe(&heartbeat_topic);
55        let _ = gossipsub.subscribe(&ownership_topic);
56
57        // Pre-encode heartbeat message
58        let heartbeat_bytes = bincode::encode_to_vec(
59            Heartbeat {
60                cluster_ref: bincode::serde::Compat(manager.local_cluster_ref.clone()),
61                owned_partitions: manager.assigned_partitions.clone(),
62                alive_since: manager.alive_since,
63                node_index: manager.local_node_index,
64                total_node_count: manager.total_node_count,
65            },
66            bincode::config::standard(),
67        )
68        .unwrap()
69        .into();
70
71        info!(
72            "node {} owns {}/{} partitions as replicas",
73            manager.local_node_index,
74            manager.assigned_partitions.len(),
75            manager.num_partitions,
76        );
77        trace!(
78            "node {} is replica for partitions: {:?}",
79            manager.local_node_index, manager.assigned_partitions,
80        );
81
82        Self {
83            gossipsub,
84            manager,
85            heartbeat_interval,
86            timeout_check_interval,
87            heartbeat_topic,
88            ownership_topic,
89            pending_events: VecDeque::new(),
90            heartbeat_bytes,
91        }
92    }
93
94    pub fn add_explicit_peer(&mut self, peer_id: PeerId) {
95        // Add peer to gossipsub
96        self.gossipsub.add_explicit_peer(&peer_id);
97
98        // Send an ownership request to the new peer
99        let request = OwnershipMessage::OwnershipRequest {
100            cluster_ref: self.manager.local_cluster_ref.clone(),
101            owned_partitions: self.manager.assigned_partitions.clone(),
102            alive_since: self.manager.alive_since,
103            node_index: self.manager.local_node_index,
104            total_node_count: self.manager.total_node_count,
105        };
106
107        self.send_ownership_message(request);
108    }
109
110    fn handle_gossipsub_event(&mut self, event: &gossipsub::Event) {
111        if let gossipsub::Event::Message { message, .. } = event {
112            match message.topic.as_str() {
113                HEARTBEAT_TOPIC => {
114                    self.handle_heartbeat_message(&message.data);
115                }
116                OWNERSHIP_TOPIC => {
117                    self.handle_ownership_message(&message.data);
118                }
119                topic => {
120                    warn!("received message on unknown topic: {topic}")
121                }
122            }
123        }
124    }
125
126    fn handle_heartbeat_message(&mut self, data: &[u8]) {
127        match bincode::decode_from_slice::<Heartbeat<bincode::serde::Compat<T>>, _>(
128            data,
129            bincode::config::standard(),
130        ) {
131            Ok((
132                Heartbeat {
133                    cluster_ref: bincode::serde::Compat(cluster_ref),
134                    owned_partitions,
135                    alive_since,
136                    node_index,
137                    total_node_count,
138                },
139                _,
140            )) => {
141                let cluster_id = cluster_ref.id();
142                trace!("received heartbeat from {cluster_id} (index: {node_index})");
143
144                // Skip our own heartbeats
145                if cluster_ref.id() == self.manager.local_cluster_ref.id() {
146                    return;
147                }
148
149                // Check if the node was previously inactive
150                let was_inactive = !self
151                    .manager
152                    .active_nodes
153                    .contains_key(cluster_ref.id().peer_id().unwrap());
154
155                // Update heartbeat and ownership
156                let status_changed = self.manager.on_heartbeat(
157                    cluster_ref.clone(),
158                    &owned_partitions,
159                    alive_since,
160                    node_index,
161                    total_node_count,
162                );
163
164                // Make sure our own partitions are still registered
165                self.manager.ensure_local_partitions();
166
167                // If this is a new node, or it was inactive
168                if status_changed || was_inactive {
169                    info!(
170                        "node {cluster_id} (index: {node_index}) is now active with {} replica partitions",
171                        owned_partitions.len()
172                    );
173
174                    self.pending_events
175                        .push_back(PartitionBehaviourEvent::NodeStatusChanged {
176                            peer_id: *cluster_ref.id().peer_id().unwrap(),
177                            is_active: true,
178                            owned_partitions: owned_partitions.clone(),
179                        });
180
181                    self.pending_events.push_back(
182                        PartitionBehaviourEvent::ReplicaAssignmentUpdated {
183                            partition_replicas: self.manager.partition_replicas.clone(),
184                            active_nodes: self.manager.active_nodes.clone(),
185                        },
186                    );
187                }
188            }
189            Err(err) => {
190                error!("failed to decode heartbeat message: {}", err);
191            }
192        }
193    }
194
195    fn handle_ownership_message(&mut self, data: &[u8]) {
196        match bincode::decode_from_slice::<OwnershipMessage<bincode::serde::Compat<T>>, _>(
197            data,
198            bincode::config::standard(),
199        ) {
200            Ok((msg, _)) => {
201                self.handle_partition_message(msg.map(|bincode::serde::Compat(key)| key));
202            }
203            Err(err) => {
204                error!("failed to decode ownership message: {err}");
205            }
206        }
207    }
208
209    fn handle_partition_message(&mut self, message: OwnershipMessage<T>) {
210        match message {
211            OwnershipMessage::OwnershipRequest {
212                cluster_ref,
213                owned_partitions,
214                alive_since,
215                node_index,
216                total_node_count,
217            } => {
218                // Skip our own requests
219                if cluster_ref.id() == self.manager.local_cluster_ref.id() {
220                    return;
221                }
222
223                info!(
224                    "received ownership request from {} (index: {}) with {} replica partitions",
225                    cluster_ref.id(),
226                    node_index,
227                    owned_partitions.len()
228                );
229
230                // Process the node connection
231                if let Some(response) = self.manager.on_node_connected(
232                    cluster_ref.clone(),
233                    &owned_partitions,
234                    alive_since,
235                    node_index,
236                    total_node_count,
237                ) {
238                    // Send a response with our full replica assignment information
239                    self.send_ownership_message(response);
240
241                    // Notify application
242                    self.pending_events
243                        .push_back(PartitionBehaviourEvent::NodeStatusChanged {
244                            peer_id: *cluster_ref.id().peer_id().unwrap(),
245                            is_active: true,
246                            owned_partitions: owned_partitions.clone(),
247                        });
248
249                    self.pending_events.push_back(
250                        PartitionBehaviourEvent::ReplicaAssignmentUpdated {
251                            partition_replicas: self.manager.partition_replicas.clone(),
252                            active_nodes: self.manager.active_nodes.clone(),
253                        },
254                    );
255                }
256            }
257            OwnershipMessage::OwnershipResponse {
258                partition_replicas,
259                active_nodes,
260            } => {
261                info!(
262                    "received ownership response with {} partition replica assignments and {} active nodes",
263                    partition_replicas.len(),
264                    active_nodes.len()
265                );
266
267                // Convert the response format back to our internal format
268                let partition_replicas = partition_replicas
269                    .into_iter()
270                    .flat_map(|(cluster_ref, partition_ids)| {
271                        partition_ids
272                            .into_iter()
273                            .map(move |partition_id| (partition_id, cluster_ref.clone()))
274                    })
275                    .fold(
276                        HashMap::<PartitionId, ArrayVec<T, MAX_REPLICATION_FACTOR>>::new(),
277                        |mut acc, (partition_id, cluster_ref)| {
278                            acc.entry(partition_id).or_default().push(cluster_ref);
279                            acc
280                        },
281                    );
282
283                // Update our replica assignments from the response
284                self.manager
285                    .handle_ownership_response(&partition_replicas, active_nodes.clone());
286
287                // Make sure our partitions are still registered correctly
288                self.manager.ensure_local_partitions();
289
290                // Notify application
291                self.pending_events
292                    .push_back(PartitionBehaviourEvent::ReplicaAssignmentUpdated {
293                        partition_replicas: self.manager.partition_replicas.clone(),
294                        active_nodes: self.manager.active_nodes.clone(),
295                    });
296            }
297        }
298    }
299
300    fn send_ownership_message(&mut self, message: OwnershipMessage<T>) {
301        match bincode::encode_to_vec(
302            message.map(bincode::serde::Compat),
303            bincode::config::standard(),
304        ) {
305            Ok(encoded) => {
306                if let Err(err) = self.gossipsub.publish(self.ownership_topic.hash(), encoded)
307                    && !matches!(err, PublishError::NoPeersSubscribedToTopic)
308                {
309                    error!("error publishing ownership message: {err}");
310                }
311            }
312            Err(err) => {
313                error!("failed to encode ownership message: {err}");
314            }
315        }
316    }
317
318    fn send_heartbeat(&mut self) {
319        // Always make sure our partitions are still registered correctly
320        self.manager.ensure_local_partitions();
321
322        trace!(
323            "sending heartbeat with {} replica partitions",
324            self.manager.assigned_partitions.len()
325        );
326
327        if let Err(err) = self
328            .gossipsub
329            .publish(self.heartbeat_topic.hash(), self.heartbeat_bytes.clone())
330            && !matches!(err, PublishError::NoPeersSubscribedToTopic)
331        {
332            error!("error publishing heartbeat: {err}");
333        }
334    }
335}
336
337#[derive(Debug)]
338pub enum PartitionBehaviourEvent<T> {
339    Gossipsub(<gossipsub::Behaviour as NetworkBehaviour>::ToSwarm),
340    NodeStatusChanged {
341        peer_id: PeerId,
342        is_active: bool,
343        owned_partitions: HashSet<PartitionId>, // Partitions this node is a replica for
344    },
345    ReplicaAssignmentUpdated {
346        partition_replicas: HashMap<PartitionId, ArrayVec<T, MAX_REPLICATION_FACTOR>>,
347        active_nodes: HashMap<PeerId, (u64, usize)>, // PeerId -> (alive_since, node_index)
348    },
349}
350
351impl<T: ClusterKey> NetworkBehaviour for Behaviour<T> {
352    type ConnectionHandler = THandler<gossipsub::Behaviour>;
353    type ToSwarm = PartitionBehaviourEvent<T>;
354
355    fn handle_pending_inbound_connection(
356        &mut self,
357        connection_id: ConnectionId,
358        local_addr: &Multiaddr,
359        remote_addr: &Multiaddr,
360    ) -> Result<(), ConnectionDenied> {
361        NetworkBehaviour::handle_pending_inbound_connection(
362            &mut self.gossipsub,
363            connection_id,
364            local_addr,
365            remote_addr,
366        )
367    }
368
369    fn handle_established_inbound_connection(
370        &mut self,
371        connection_id: ConnectionId,
372        peer: PeerId,
373        local_addr: &Multiaddr,
374        remote_addr: &Multiaddr,
375    ) -> Result<THandler<Self>, ConnectionDenied> {
376        debug!("inbound connection established from {}", peer);
377        self.gossipsub.handle_established_inbound_connection(
378            connection_id,
379            peer,
380            local_addr,
381            remote_addr,
382        )
383    }
384
385    fn handle_pending_outbound_connection(
386        &mut self,
387        connection_id: ConnectionId,
388        maybe_peer: Option<PeerId>,
389        addresses: &[Multiaddr],
390        effective_role: Endpoint,
391    ) -> Result<Vec<Multiaddr>, ConnectionDenied> {
392        NetworkBehaviour::handle_pending_outbound_connection(
393            &mut self.gossipsub,
394            connection_id,
395            maybe_peer,
396            addresses,
397            effective_role,
398        )
399    }
400
401    fn handle_established_outbound_connection(
402        &mut self,
403        connection_id: ConnectionId,
404        peer: PeerId,
405        addr: &Multiaddr,
406        role_override: Endpoint,
407        port_use: PortUse,
408    ) -> Result<THandler<Self>, ConnectionDenied> {
409        debug!("outbound connection established to {}", peer);
410        self.gossipsub.handle_established_outbound_connection(
411            connection_id,
412            peer,
413            addr,
414            role_override,
415            port_use,
416        )
417    }
418
419    fn on_swarm_event(&mut self, event: FromSwarm) {
420        match &event {
421            FromSwarm::ConnectionEstablished(ev) => {
422                info!("connection established with {}", ev.peer_id);
423                // Add the peer to our system
424                self.add_explicit_peer(ev.peer_id);
425            }
426            FromSwarm::ConnectionClosed(ev) => {
427                info!("connection closed with {}", ev.peer_id);
428                // Remove from gossipsub
429                self.gossipsub.remove_explicit_peer(&ev.peer_id);
430
431                // Process node disconnection
432                self.manager.on_node_disconnected(&ev.peer_id);
433
434                // Notify application
435                self.pending_events
436                    .push_back(PartitionBehaviourEvent::NodeStatusChanged {
437                        peer_id: ev.peer_id,
438                        is_active: false,
439                        owned_partitions: HashSet::new(), // No partitions when inactive
440                    });
441
442                self.pending_events
443                    .push_back(PartitionBehaviourEvent::ReplicaAssignmentUpdated {
444                        partition_replicas: self.manager.partition_replicas.clone(),
445                        active_nodes: self.manager.active_nodes.clone(),
446                    });
447            }
448            _ => {}
449        }
450
451        self.gossipsub.on_swarm_event(event);
452    }
453
454    fn on_connection_handler_event(
455        &mut self,
456        peer_id: PeerId,
457        connection_id: ConnectionId,
458        event: THandlerOutEvent<Self>,
459    ) {
460        NetworkBehaviour::on_connection_handler_event(
461            &mut self.gossipsub,
462            peer_id,
463            connection_id,
464            event,
465        )
466    }
467
468    fn poll(
469        &mut self,
470        cx: &mut task::Context,
471    ) -> task::Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
472        if let Some(ev) = self.pending_events.pop_front() {
473            return task::Poll::Ready(ToSwarm::GenerateEvent(ev));
474        }
475
476        if let task::Poll::Ready(ev) = NetworkBehaviour::poll(&mut self.gossipsub, cx) {
477            if let ToSwarm::GenerateEvent(ev) = &ev {
478                self.handle_gossipsub_event(ev);
479            }
480
481            return task::Poll::Ready(ev.map_out(PartitionBehaviourEvent::Gossipsub));
482        }
483
484        // Send heartbeats periodically
485        if self.heartbeat_interval.poll_tick(cx).is_ready() {
486            self.send_heartbeat();
487        }
488
489        // Check for node timeouts
490        if self.timeout_check_interval.poll_tick(cx).is_ready() {
491            let status_changed = self.manager.check_heartbeat_timeouts();
492
493            if status_changed {
494                // Notify application
495                self.pending_events
496                    .push_back(PartitionBehaviourEvent::ReplicaAssignmentUpdated {
497                        partition_replicas: self.manager.partition_replicas.clone(),
498                        active_nodes: self.manager.active_nodes.clone(),
499                    });
500            }
501        }
502
503        task::Poll::Pending
504    }
505}