sierradb_cluster/
lib.rs

1#![feature(btree_extract_if)]
2
3use std::{
4    collections::{HashMap, HashSet},
5    io,
6    sync::Arc,
7    time::Duration,
8};
9
10use arrayvec::ArrayVec;
11use circuit_breaker::WriteCircuitBreaker;
12use confirmation::{AtomicWatermark, actor::ConfirmationActor};
13use futures::StreamExt;
14use kameo::{mailbox::Signal, prelude::*};
15use libp2p::{
16    BehaviourBuilderError, Multiaddr, PeerId, Swarm, TransportError, gossipsub,
17    identity::Keypair,
18    mdns, noise,
19    swarm::{NetworkBehaviour, behaviour::toggle::Toggle},
20    tcp, yamux,
21};
22use serde::{Deserialize, Serialize};
23use sierradb::{
24    MAX_REPLICATION_FACTOR, bucket::PartitionId, database::Database, error::WriteError,
25};
26use sierradb_topology::TopologyManager;
27use thiserror::Error;
28use tracing::{error, trace};
29
30use crate::{
31    subscription::SubscriptionManager,
32    write::replicate::{PartitionReplicatorActor, PartitionReplicatorActorArgs},
33};
34
35pub mod circuit_breaker;
36pub mod confirmation;
37pub mod read;
38pub mod subscription;
39pub mod write;
40
41pub type ReplicaRefs = ArrayVec<(RemoteActorRef<ClusterActor>, u64), MAX_REPLICATION_FACTOR>;
42
43/// Maximum number of request forwards allowed to prevent loops
44const MAX_FORWARDS: u8 = 3;
45
46#[derive(NetworkBehaviour)]
47pub struct Behaviour {
48    pub kameo: remote::Behaviour,
49    pub topology: sierradb_topology::Behaviour<RemoteActorRef<ClusterActor>>,
50    pub mdns: Toggle<mdns::tokio::Behaviour>,
51}
52
53#[derive(RemoteActor)]
54pub struct ClusterActor {
55    local_peer_id: PeerId,
56    database: Database,
57    swarm: Swarm<Behaviour>,
58    replication_factor: u8,
59    confirmation_ref: ActorRef<ConfirmationActor>,
60    watermarks: HashMap<PartitionId, Arc<AtomicWatermark>>,
61    circuit_breaker: Arc<WriteCircuitBreaker>,
62    replicator_refs: HashMap<PartitionId, ActorRef<PartitionReplicatorActor>>,
63    subscription_manager: SubscriptionManager,
64}
65
66impl ClusterActor {
67    fn topology_manager(&self) -> &TopologyManager<RemoteActorRef<Self>> {
68        &self.swarm.behaviour().topology.manager
69    }
70}
71
72/// Configuration parameters for creating a new Swarm actor
73pub struct ClusterArgs {
74    /// Identity key for this node
75    pub keypair: Keypair,
76    /// Reference to the local database instance
77    pub database: Database,
78    /// List of addresses to listen on
79    pub listen_addrs: Vec<Multiaddr>,
80    /// Total number of nodes in the cluster
81    pub node_count: usize,
82    /// Zero-based index of this node in the cluster
83    pub node_index: usize,
84    /// Total number of buckets in the system
85    pub bucket_count: u16,
86    /// Total number of partitions in the system
87    pub partition_count: u16,
88    /// Number of replicas to maintain for each partition
89    pub replication_factor: u8,
90    /// Partitions assigned to this node
91    pub assigned_partitions: HashSet<PartitionId>,
92    /// Maximum time to wait for a heartbeat before considering a peer down
93    pub heartbeat_timeout: Duration,
94    /// Interval between heartbeat messages
95    pub heartbeat_interval: Duration,
96    /// Maximum number of out-of-order writes to buffer per partition
97    pub replication_buffer_size: usize,
98    /// Maximum time to keep buffered writes before timing out
99    pub replication_buffer_timeout: Duration,
100    /// Maximum time before requesting a catchup
101    pub replication_catchup_timeout: Duration,
102    /// Whether mdns is enabled
103    pub mdns: bool,
104}
105
106impl Actor for ClusterActor {
107    type Args = ClusterArgs;
108    type Error = ClusterError;
109
110    async fn on_start(
111        ClusterArgs {
112            keypair,
113            database,
114            listen_addrs,
115            node_count,
116            node_index,
117            bucket_count,
118            partition_count,
119            replication_factor,
120            assigned_partitions,
121            heartbeat_timeout,
122            heartbeat_interval,
123            replication_buffer_size,
124            replication_buffer_timeout,
125            replication_catchup_timeout,
126            mdns: mdns_enabled,
127        }: Self::Args,
128        actor_ref: ActorRef<Self>,
129    ) -> Result<Self, Self::Error> {
130        let local_peer_id = keypair.public().to_peer_id();
131        trace!(
132            %local_peer_id,
133            partitions = %assigned_partitions.len(),
134            "starting swarm actor"
135        );
136
137        let kameo = remote::Behaviour::new(
138            keypair.public().to_peer_id(),
139            remote::messaging::Config::default(),
140        );
141
142        kameo.init_global();
143
144        let cluster_ref = actor_ref.into_remote_ref().await;
145
146        // Build the libp2p swarm with all required behaviors
147        let mut swarm = libp2p::SwarmBuilder::with_existing_identity(keypair)
148            .with_tokio()
149            .with_tcp(
150                tcp::Config::default(),
151                noise::Config::new,
152                yamux::Config::default,
153            )?
154            .with_behaviour(|key| {
155                // Configure gossipsub for partition ownership messages
156                let gossipsub_config = gossipsub::ConfigBuilder::default()
157                    .heartbeat_interval(Duration::from_secs(1))
158                    .validation_mode(gossipsub::ValidationMode::Strict)
159                    .build()?;
160                let gossipsub = gossipsub::Behaviour::new(
161                    gossipsub::MessageAuthenticity::Signed(key.clone()),
162                    gossipsub_config,
163                )?;
164
165                // Configure mDNS for peer discovery
166                let mdns = mdns_enabled
167                    .then(|| mdns::tokio::Behaviour::new(mdns::Config::default(), local_peer_id))
168                    .transpose()?;
169
170                // Create partition manager to track partition ownership
171                let manager = TopologyManager::new(
172                    cluster_ref,
173                    node_index,
174                    node_count,
175                    partition_count,
176                    bucket_count,
177                    replication_factor,
178                    heartbeat_timeout,
179                );
180
181                // Create partition ownership behavior
182                let topology =
183                    sierradb_topology::Behaviour::new(gossipsub, manager, heartbeat_interval);
184
185                Ok(Behaviour {
186                    kameo,
187                    topology,
188                    mdns: Toggle::from(mdns),
189                })
190            })?
191            .build();
192
193        for addr in listen_addrs {
194            swarm.listen_on(addr)?;
195        }
196
197        let confirmation_actor = ConfirmationActor::new(
198            database.clone(),
199            replication_factor,
200            assigned_partitions.clone(),
201        )
202        .await
203        .map_err(|err| ClusterError::ConfirmationFailure(err.to_string()))?;
204        let watermarks = confirmation_actor.manager.get_watermarks();
205        let broadcaster = confirmation_actor.broadcaster();
206        let confirmation_ref = Actor::spawn(confirmation_actor);
207
208        let replicator_refs = assigned_partitions
209            .iter()
210            .map(|partition_id| {
211                (
212                    *partition_id,
213                    PartitionReplicatorActor::spawn_with_mailbox(
214                        PartitionReplicatorActorArgs {
215                            partition_id: *partition_id,
216                            database: database.clone(),
217                            confirmation_ref: confirmation_ref.clone(),
218                            buffer_size: replication_buffer_size,
219                            buffer_timeout: replication_buffer_timeout,
220                            catchup_timeout: replication_catchup_timeout,
221                        },
222                        mailbox::bounded(1_000),
223                    ),
224                )
225            })
226            .collect();
227
228        let subscription_manager = SubscriptionManager::new(
229            database.clone(),
230            Arc::new(assigned_partitions),
231            Arc::new(watermarks.clone()),
232            partition_count,
233            broadcaster,
234        );
235
236        let circuit_breaker = Arc::new(WriteCircuitBreaker::with_defaults());
237
238        Ok(ClusterActor {
239            local_peer_id,
240            database,
241            swarm,
242            replication_factor,
243            confirmation_ref,
244            watermarks,
245            circuit_breaker,
246            replicator_refs,
247            subscription_manager,
248        })
249    }
250
251    async fn next(
252        &mut self,
253        _actor_ref: WeakActorRef<Self>,
254        mailbox_rx: &mut MailboxReceiver<Self>,
255    ) -> Option<Signal<Self>> {
256        loop {
257            tokio::select! {
258                signal = mailbox_rx.recv() => return signal,
259                _event = self.swarm.select_next_some() => {},
260            }
261        }
262    }
263}
264
265/// For testing purposes.
266pub struct ResetCluster {
267    pub database: Database,
268}
269
270impl Message<ResetCluster> for ClusterActor {
271    type Reply = Result<(), ClusterError>;
272
273    async fn handle(
274        &mut self,
275        ResetCluster { database }: ResetCluster,
276        _ctx: &mut Context<Self, Self::Reply>,
277    ) -> Self::Reply {
278        let assigned_partitions = self.topology_manager().assigned_partitions.clone();
279        let partition_count = self.topology_manager().num_partitions;
280
281        let confirmation_actor = ConfirmationActor::new(
282            database.clone(),
283            self.replication_factor,
284            assigned_partitions.clone(),
285        )
286        .await
287        .map_err(|err| ClusterError::ConfirmationFailure(err.to_string()))?;
288        let watermarks = confirmation_actor.manager.get_watermarks();
289        let broadcaster = confirmation_actor.broadcaster();
290        let confirmation_ref = Actor::spawn(confirmation_actor);
291
292        let replicator_refs = assigned_partitions
293            .iter()
294            .map(|partition_id| {
295                (
296                    *partition_id,
297                    PartitionReplicatorActor::spawn_with_mailbox(
298                        PartitionReplicatorActorArgs {
299                            partition_id: *partition_id,
300                            database: database.clone(),
301                            confirmation_ref: confirmation_ref.clone(),
302                            buffer_size: 1_000,
303                            buffer_timeout: Duration::from_millis(8_000),
304                            catchup_timeout: Duration::from_millis(1_000),
305                        },
306                        mailbox::bounded(1_000),
307                    ),
308                )
309            })
310            .collect();
311
312        let subscription_manager = SubscriptionManager::new(
313            database.clone(),
314            Arc::new(assigned_partitions),
315            Arc::new(watermarks.clone()),
316            partition_count,
317            broadcaster,
318        );
319
320        let circuit_breaker = Arc::new(WriteCircuitBreaker::with_defaults());
321
322        self.replicator_refs = replicator_refs;
323        self.confirmation_ref = confirmation_ref;
324        self.watermarks = watermarks;
325        self.subscription_manager = subscription_manager;
326        self.circuit_breaker = circuit_breaker;
327        self.database = database;
328
329        Ok(())
330    }
331}
332
333#[derive(Debug, Error, Serialize, Deserialize)]
334pub enum ClusterError {
335    #[error("insufficient partitions alive for quorum ({alive}/{required})")]
336    InsufficientPartitionsForQuorum { alive: u8, required: u8 },
337    #[error("insufficient partitions alive")]
338    NoAvailablePartitions,
339    // #[error(transparent)]
340    // #[serde(skip)]
341    // OutboundFailure(#[from] OutboundFailure),
342    // #[error("partition actor not found")]
343    // PartitionActorNotFound { partition_id: PartitionId },
344    #[error("partition unavailable")]
345    PartitionUnavailable,
346    #[error("no available leaders")]
347    NoAvailableLeaders,
348    #[error("quorum not achieved")]
349    QuorumNotAchieved { confirmed: u8, required: u8 },
350    #[error("write timed out")]
351    WriteTimeout,
352    #[error("too many forwards")]
353    TooManyForwards,
354    #[error("circuit breaker open: estimated recovery time: {estimated_recovery_time:?}")]
355    CircuitBreakerOpen {
356        estimated_recovery_time: Option<Duration>,
357    },
358    #[error("failed to write confirmation count: {0}")]
359    ConfirmationFailure(String),
360    #[error("read error: {0}")]
361    Read(String),
362    #[error("write error: {0}")]
363    Write(String),
364    // #[error("wrong leader node for append")]
365    // WrongLeaderNode { correct_leader: PeerId },
366    #[error(transparent)]
367    RemoteSend(#[from] RemoteSendError),
368    #[error(transparent)]
369    #[serde(skip)]
370    Noise(#[from] noise::Error),
371    #[error(transparent)]
372    #[serde(skip)]
373    Transport(#[from] TransportError<io::Error>),
374    #[error(transparent)]
375    #[serde(skip)]
376    SwarmBuilder(#[from] BehaviourBuilderError),
377}
378
379impl From<WriteError> for ClusterError {
380    fn from(err: WriteError) -> Self {
381        ClusterError::Write(err.to_string())
382    }
383}
384
385impl From<RemoteSendError<ClusterError>> for ClusterError {
386    fn from(err: RemoteSendError<ClusterError>) -> Self {
387        match err {
388            RemoteSendError::ActorNotRunning => {
389                ClusterError::RemoteSend(RemoteSendError::ActorNotRunning)
390            }
391            RemoteSendError::ActorStopped => {
392                ClusterError::RemoteSend(RemoteSendError::ActorStopped)
393            }
394            RemoteSendError::UnknownActor { actor_remote_id } => {
395                ClusterError::RemoteSend(RemoteSendError::UnknownActor { actor_remote_id })
396            }
397            RemoteSendError::UnknownMessage {
398                actor_remote_id,
399                message_remote_id,
400            } => ClusterError::RemoteSend(RemoteSendError::UnknownMessage {
401                actor_remote_id,
402                message_remote_id,
403            }),
404            RemoteSendError::BadActorType => {
405                ClusterError::RemoteSend(RemoteSendError::BadActorType)
406            }
407            RemoteSendError::MailboxFull => ClusterError::RemoteSend(RemoteSendError::MailboxFull),
408            RemoteSendError::ReplyTimeout => {
409                ClusterError::RemoteSend(RemoteSendError::ReplyTimeout)
410            }
411            RemoteSendError::HandlerError(err) => err,
412            RemoteSendError::SerializeMessage(err) => {
413                ClusterError::RemoteSend(RemoteSendError::SerializeMessage(err))
414            }
415            RemoteSendError::DeserializeMessage(err) => {
416                ClusterError::RemoteSend(RemoteSendError::DeserializeMessage(err))
417            }
418            RemoteSendError::SerializeReply(err) => {
419                ClusterError::RemoteSend(RemoteSendError::SerializeReply(err))
420            }
421            RemoteSendError::SerializeHandlerError(err) => {
422                ClusterError::RemoteSend(RemoteSendError::SerializeHandlerError(err))
423            }
424            RemoteSendError::DeserializeHandlerError(err) => {
425                ClusterError::RemoteSend(RemoteSendError::DeserializeHandlerError(err))
426            }
427            RemoteSendError::SwarmNotBootstrapped => {
428                ClusterError::RemoteSend(RemoteSendError::SwarmNotBootstrapped)
429            }
430            RemoteSendError::DialFailure => ClusterError::RemoteSend(RemoteSendError::DialFailure),
431            RemoteSendError::NetworkTimeout => {
432                ClusterError::RemoteSend(RemoteSendError::NetworkTimeout)
433            }
434            RemoteSendError::ConnectionClosed => {
435                ClusterError::RemoteSend(RemoteSendError::ConnectionClosed)
436            }
437            RemoteSendError::UnsupportedProtocols => {
438                ClusterError::RemoteSend(RemoteSendError::UnsupportedProtocols)
439            }
440            RemoteSendError::Io(err) => ClusterError::RemoteSend(RemoteSendError::Io(err)),
441        }
442    }
443}