1#![feature(btree_extract_if)]
2
3use std::{
4 collections::{HashMap, HashSet},
5 io,
6 sync::Arc,
7 time::Duration,
8};
9
10use arrayvec::ArrayVec;
11use circuit_breaker::WriteCircuitBreaker;
12use confirmation::{AtomicWatermark, actor::ConfirmationActor};
13use futures::StreamExt;
14use kameo::{mailbox::Signal, prelude::*};
15use libp2p::{
16 BehaviourBuilderError, Multiaddr, PeerId, Swarm, TransportError, gossipsub,
17 identity::Keypair,
18 mdns, noise,
19 swarm::{NetworkBehaviour, behaviour::toggle::Toggle},
20 tcp, yamux,
21};
22use serde::{Deserialize, Serialize};
23use sierradb::{
24 MAX_REPLICATION_FACTOR, bucket::PartitionId, database::Database, error::WriteError,
25};
26use sierradb_topology::TopologyManager;
27use thiserror::Error;
28use tracing::{error, trace};
29
30use crate::{
31 subscription::SubscriptionManager,
32 write::replicate::{PartitionReplicatorActor, PartitionReplicatorActorArgs},
33};
34
35pub mod circuit_breaker;
36pub mod confirmation;
37pub mod read;
38pub mod subscription;
39pub mod write;
40
41pub type ReplicaRefs = ArrayVec<(RemoteActorRef<ClusterActor>, u64), MAX_REPLICATION_FACTOR>;
42
43const MAX_FORWARDS: u8 = 3;
45
46#[derive(NetworkBehaviour)]
47pub struct Behaviour {
48 pub kameo: remote::Behaviour,
49 pub topology: sierradb_topology::Behaviour<RemoteActorRef<ClusterActor>>,
50 pub mdns: Toggle<mdns::tokio::Behaviour>,
51}
52
53#[derive(RemoteActor)]
54pub struct ClusterActor {
55 local_peer_id: PeerId,
56 database: Database,
57 swarm: Swarm<Behaviour>,
58 replication_factor: u8,
59 confirmation_ref: ActorRef<ConfirmationActor>,
60 watermarks: HashMap<PartitionId, Arc<AtomicWatermark>>,
61 circuit_breaker: Arc<WriteCircuitBreaker>,
62 replicator_refs: HashMap<PartitionId, ActorRef<PartitionReplicatorActor>>,
63 subscription_manager: SubscriptionManager,
64}
65
66impl ClusterActor {
67 fn topology_manager(&self) -> &TopologyManager<RemoteActorRef<Self>> {
68 &self.swarm.behaviour().topology.manager
69 }
70}
71
72pub struct ClusterArgs {
74 pub keypair: Keypair,
76 pub database: Database,
78 pub listen_addrs: Vec<Multiaddr>,
80 pub node_count: usize,
82 pub node_index: usize,
84 pub bucket_count: u16,
86 pub partition_count: u16,
88 pub replication_factor: u8,
90 pub assigned_partitions: HashSet<PartitionId>,
92 pub heartbeat_timeout: Duration,
94 pub heartbeat_interval: Duration,
96 pub replication_buffer_size: usize,
98 pub replication_buffer_timeout: Duration,
100 pub replication_catchup_timeout: Duration,
102 pub mdns: bool,
104}
105
106impl Actor for ClusterActor {
107 type Args = ClusterArgs;
108 type Error = ClusterError;
109
110 async fn on_start(
111 ClusterArgs {
112 keypair,
113 database,
114 listen_addrs,
115 node_count,
116 node_index,
117 bucket_count,
118 partition_count,
119 replication_factor,
120 assigned_partitions,
121 heartbeat_timeout,
122 heartbeat_interval,
123 replication_buffer_size,
124 replication_buffer_timeout,
125 replication_catchup_timeout,
126 mdns: mdns_enabled,
127 }: Self::Args,
128 actor_ref: ActorRef<Self>,
129 ) -> Result<Self, Self::Error> {
130 let local_peer_id = keypair.public().to_peer_id();
131 trace!(
132 %local_peer_id,
133 partitions = %assigned_partitions.len(),
134 "starting swarm actor"
135 );
136
137 let kameo = remote::Behaviour::new(
138 keypair.public().to_peer_id(),
139 remote::messaging::Config::default(),
140 );
141
142 kameo.init_global();
143
144 let cluster_ref = actor_ref.into_remote_ref().await;
145
146 let mut swarm = libp2p::SwarmBuilder::with_existing_identity(keypair)
148 .with_tokio()
149 .with_tcp(
150 tcp::Config::default(),
151 noise::Config::new,
152 yamux::Config::default,
153 )?
154 .with_behaviour(|key| {
155 let gossipsub_config = gossipsub::ConfigBuilder::default()
157 .heartbeat_interval(Duration::from_secs(1))
158 .validation_mode(gossipsub::ValidationMode::Strict)
159 .build()?;
160 let gossipsub = gossipsub::Behaviour::new(
161 gossipsub::MessageAuthenticity::Signed(key.clone()),
162 gossipsub_config,
163 )?;
164
165 let mdns = mdns_enabled
167 .then(|| mdns::tokio::Behaviour::new(mdns::Config::default(), local_peer_id))
168 .transpose()?;
169
170 let manager = TopologyManager::new(
172 cluster_ref,
173 node_index,
174 node_count,
175 partition_count,
176 bucket_count,
177 replication_factor,
178 heartbeat_timeout,
179 );
180
181 let topology =
183 sierradb_topology::Behaviour::new(gossipsub, manager, heartbeat_interval);
184
185 Ok(Behaviour {
186 kameo,
187 topology,
188 mdns: Toggle::from(mdns),
189 })
190 })?
191 .build();
192
193 for addr in listen_addrs {
194 swarm.listen_on(addr)?;
195 }
196
197 let confirmation_actor = ConfirmationActor::new(
198 database.clone(),
199 replication_factor,
200 assigned_partitions.clone(),
201 )
202 .await
203 .map_err(|err| ClusterError::ConfirmationFailure(err.to_string()))?;
204 let watermarks = confirmation_actor.manager.get_watermarks();
205 let broadcaster = confirmation_actor.broadcaster();
206 let confirmation_ref = Actor::spawn(confirmation_actor);
207
208 let replicator_refs = assigned_partitions
209 .iter()
210 .map(|partition_id| {
211 (
212 *partition_id,
213 PartitionReplicatorActor::spawn_with_mailbox(
214 PartitionReplicatorActorArgs {
215 partition_id: *partition_id,
216 database: database.clone(),
217 confirmation_ref: confirmation_ref.clone(),
218 buffer_size: replication_buffer_size,
219 buffer_timeout: replication_buffer_timeout,
220 catchup_timeout: replication_catchup_timeout,
221 },
222 mailbox::bounded(1_000),
223 ),
224 )
225 })
226 .collect();
227
228 let subscription_manager = SubscriptionManager::new(
229 database.clone(),
230 Arc::new(assigned_partitions),
231 Arc::new(watermarks.clone()),
232 partition_count,
233 broadcaster,
234 );
235
236 let circuit_breaker = Arc::new(WriteCircuitBreaker::with_defaults());
237
238 Ok(ClusterActor {
239 local_peer_id,
240 database,
241 swarm,
242 replication_factor,
243 confirmation_ref,
244 watermarks,
245 circuit_breaker,
246 replicator_refs,
247 subscription_manager,
248 })
249 }
250
251 async fn next(
252 &mut self,
253 _actor_ref: WeakActorRef<Self>,
254 mailbox_rx: &mut MailboxReceiver<Self>,
255 ) -> Option<Signal<Self>> {
256 loop {
257 tokio::select! {
258 signal = mailbox_rx.recv() => return signal,
259 _event = self.swarm.select_next_some() => {},
260 }
261 }
262 }
263}
264
265pub struct ResetCluster {
267 pub database: Database,
268}
269
270impl Message<ResetCluster> for ClusterActor {
271 type Reply = Result<(), ClusterError>;
272
273 async fn handle(
274 &mut self,
275 ResetCluster { database }: ResetCluster,
276 _ctx: &mut Context<Self, Self::Reply>,
277 ) -> Self::Reply {
278 let assigned_partitions = self.topology_manager().assigned_partitions.clone();
279 let partition_count = self.topology_manager().num_partitions;
280
281 let confirmation_actor = ConfirmationActor::new(
282 database.clone(),
283 self.replication_factor,
284 assigned_partitions.clone(),
285 )
286 .await
287 .map_err(|err| ClusterError::ConfirmationFailure(err.to_string()))?;
288 let watermarks = confirmation_actor.manager.get_watermarks();
289 let broadcaster = confirmation_actor.broadcaster();
290 let confirmation_ref = Actor::spawn(confirmation_actor);
291
292 let replicator_refs = assigned_partitions
293 .iter()
294 .map(|partition_id| {
295 (
296 *partition_id,
297 PartitionReplicatorActor::spawn_with_mailbox(
298 PartitionReplicatorActorArgs {
299 partition_id: *partition_id,
300 database: database.clone(),
301 confirmation_ref: confirmation_ref.clone(),
302 buffer_size: 1_000,
303 buffer_timeout: Duration::from_millis(8_000),
304 catchup_timeout: Duration::from_millis(1_000),
305 },
306 mailbox::bounded(1_000),
307 ),
308 )
309 })
310 .collect();
311
312 let subscription_manager = SubscriptionManager::new(
313 database.clone(),
314 Arc::new(assigned_partitions),
315 Arc::new(watermarks.clone()),
316 partition_count,
317 broadcaster,
318 );
319
320 let circuit_breaker = Arc::new(WriteCircuitBreaker::with_defaults());
321
322 self.replicator_refs = replicator_refs;
323 self.confirmation_ref = confirmation_ref;
324 self.watermarks = watermarks;
325 self.subscription_manager = subscription_manager;
326 self.circuit_breaker = circuit_breaker;
327 self.database = database;
328
329 Ok(())
330 }
331}
332
333#[derive(Debug, Error, Serialize, Deserialize)]
334pub enum ClusterError {
335 #[error("insufficient partitions alive for quorum ({alive}/{required})")]
336 InsufficientPartitionsForQuorum { alive: u8, required: u8 },
337 #[error("insufficient partitions alive")]
338 NoAvailablePartitions,
339 #[error("partition unavailable")]
345 PartitionUnavailable,
346 #[error("no available leaders")]
347 NoAvailableLeaders,
348 #[error("quorum not achieved")]
349 QuorumNotAchieved { confirmed: u8, required: u8 },
350 #[error("write timed out")]
351 WriteTimeout,
352 #[error("too many forwards")]
353 TooManyForwards,
354 #[error("circuit breaker open: estimated recovery time: {estimated_recovery_time:?}")]
355 CircuitBreakerOpen {
356 estimated_recovery_time: Option<Duration>,
357 },
358 #[error("failed to write confirmation count: {0}")]
359 ConfirmationFailure(String),
360 #[error("read error: {0}")]
361 Read(String),
362 #[error("write error: {0}")]
363 Write(String),
364 #[error(transparent)]
367 RemoteSend(#[from] RemoteSendError),
368 #[error(transparent)]
369 #[serde(skip)]
370 Noise(#[from] noise::Error),
371 #[error(transparent)]
372 #[serde(skip)]
373 Transport(#[from] TransportError<io::Error>),
374 #[error(transparent)]
375 #[serde(skip)]
376 SwarmBuilder(#[from] BehaviourBuilderError),
377}
378
379impl From<WriteError> for ClusterError {
380 fn from(err: WriteError) -> Self {
381 ClusterError::Write(err.to_string())
382 }
383}
384
385impl From<RemoteSendError<ClusterError>> for ClusterError {
386 fn from(err: RemoteSendError<ClusterError>) -> Self {
387 match err {
388 RemoteSendError::ActorNotRunning => {
389 ClusterError::RemoteSend(RemoteSendError::ActorNotRunning)
390 }
391 RemoteSendError::ActorStopped => {
392 ClusterError::RemoteSend(RemoteSendError::ActorStopped)
393 }
394 RemoteSendError::UnknownActor { actor_remote_id } => {
395 ClusterError::RemoteSend(RemoteSendError::UnknownActor { actor_remote_id })
396 }
397 RemoteSendError::UnknownMessage {
398 actor_remote_id,
399 message_remote_id,
400 } => ClusterError::RemoteSend(RemoteSendError::UnknownMessage {
401 actor_remote_id,
402 message_remote_id,
403 }),
404 RemoteSendError::BadActorType => {
405 ClusterError::RemoteSend(RemoteSendError::BadActorType)
406 }
407 RemoteSendError::MailboxFull => ClusterError::RemoteSend(RemoteSendError::MailboxFull),
408 RemoteSendError::ReplyTimeout => {
409 ClusterError::RemoteSend(RemoteSendError::ReplyTimeout)
410 }
411 RemoteSendError::HandlerError(err) => err,
412 RemoteSendError::SerializeMessage(err) => {
413 ClusterError::RemoteSend(RemoteSendError::SerializeMessage(err))
414 }
415 RemoteSendError::DeserializeMessage(err) => {
416 ClusterError::RemoteSend(RemoteSendError::DeserializeMessage(err))
417 }
418 RemoteSendError::SerializeReply(err) => {
419 ClusterError::RemoteSend(RemoteSendError::SerializeReply(err))
420 }
421 RemoteSendError::SerializeHandlerError(err) => {
422 ClusterError::RemoteSend(RemoteSendError::SerializeHandlerError(err))
423 }
424 RemoteSendError::DeserializeHandlerError(err) => {
425 ClusterError::RemoteSend(RemoteSendError::DeserializeHandlerError(err))
426 }
427 RemoteSendError::SwarmNotBootstrapped => {
428 ClusterError::RemoteSend(RemoteSendError::SwarmNotBootstrapped)
429 }
430 RemoteSendError::DialFailure => ClusterError::RemoteSend(RemoteSendError::DialFailure),
431 RemoteSendError::NetworkTimeout => {
432 ClusterError::RemoteSend(RemoteSendError::NetworkTimeout)
433 }
434 RemoteSendError::ConnectionClosed => {
435 ClusterError::RemoteSend(RemoteSendError::ConnectionClosed)
436 }
437 RemoteSendError::UnsupportedProtocols => {
438 ClusterError::RemoteSend(RemoteSendError::UnsupportedProtocols)
439 }
440 RemoteSendError::Io(err) => ClusterError::RemoteSend(RemoteSendError::Io(err)),
441 }
442 }
443}