datacake_node/
lib.rs

1//! # Datacake Node
2//!
3//! The core membership system used within Datacake.
4//!
5//! This system allows you to build cluster extensions on top of this core functionality giving you access to
6//! the live membership watchers, node selectors, cluster clock, etc...
7//!
8//! A good example of this is the `datacake-eventual-consistency` crate, it simply implements the `ClusterExtension` crate
9//! which lets it be added at runtime without issue.
10//!
11//! ## Features
12//! - Zero-copy RPC framework which allows for runtime adding and removing of services.
13//! - Changeable node selector used for picking nodes out of a live membership to handle tasks.
14//! - Pre-built data-center aware node selector for prioritisation of nodes in other availability zones.
15//! - Distributed clock used for keeping an effective wall clock which respects causality.
16//!
17//! ## Getting Started
18//!
19//! To get started we'll begin by creating our cluster:
20//!
21//! ```rust
22//! use std::net::SocketAddr;
23//!
24//! use datacake_node::{ConnectionConfig, DCAwareSelector, DatacakeNodeBuilder};
25//!
26//! #[tokio::main]
27//! async fn main() -> anyhow::Result<()> {
28//!     let bind_addr = "127.0.0.1:8000".parse::<SocketAddr>().unwrap();
29//!
30//!     // We setup our connection config for the node passing in the bind address, public address and seed nodes.
31//!     // Here we're just using the bind address as our public address with no seed, but in the real world
32//!     // this will be a different value when deployed across several servers with seeds to contact.
33//!     let connection_cfg = ConnectionConfig::new(bind_addr, bind_addr, Vec::<String>::new());
34//!
35//!     // Our builder lets us configure the node.
36//!     //
37//!     // We can configure the node selector, data center of the node, cluster ID, etc...
38//!     let _my_node = DatacakeNodeBuilder::<DCAwareSelector>::new(1, connection_cfg).connect().await?;
39//!
40//!     // Now we're connected we can add any extensions at runtime, our RPC server will already be
41//!     // running and setup.
42//!     //
43//!     // Check out the `datacake-eventual-consistency` implementation for a demo.
44//!
45//!     Ok(())
46//! }
47//! ```
48//!
49//! #### Creating A Extension
50//!
51//! Creating a cluster extension is really simple, it's one trait and it can do just about anything:
52//!
53//! ```rust
54//! use datacake_node::{ClusterExtension, DatacakeNode};
55//! use async_trait::async_trait;
56//!
57//! pub struct MyExtension;
58//!
59//! #[async_trait]
60//! impl ClusterExtension for MyExtension {
61//!     type Output = ();
62//!     type Error = MyError;
63//!
64//!     async fn init_extension(
65//!         self,
66//!         node: &DatacakeNode,
67//!     ) -> Result<Self::Output, Self::Error> {
68//!         // In here we can setup our system using the live node.
69//!         // This gives us things like the cluster clock and RPC server:
70//!
71//!         println!("Creating my extension!");
72//!
73//!         let timestamp = node.clock().get_time().await;
74//!         println!("My timestamp: {timestamp}");
75//!
76//!         Ok(())
77//!     }
78//! }
79//!
80//! pub struct MyError;
81//! ```
82
83pub(crate) mod clock;
84mod error;
85mod extension;
86mod node;
87mod nodes_selector;
88mod rpc;
89mod statistics;
90
91use std::borrow::Cow;
92use std::collections::{BTreeMap, BTreeSet};
93use std::fmt::Display;
94use std::net::SocketAddr;
95use std::sync::atomic::Ordering;
96use std::time::Duration;
97
98use chitchat::transport::Transport;
99use chitchat::FailureDetectorConfig;
100pub use clock::Clock;
101use datacake_rpc::{RpcService, Server};
102pub use error::NodeError;
103pub use extension::ClusterExtension;
104use futures::StreamExt;
105pub use node::{ChitchatNode, ClusterMember};
106pub use nodes_selector::{
107    Consistency,
108    ConsistencyError,
109    DCAwareSelector,
110    NodeSelector,
111    NodeSelectorHandle,
112    Nodes,
113};
114pub use rpc::network::RpcNetwork;
115pub use statistics::ClusterStatistics;
116use tokio::sync::watch;
117use tokio_stream::wrappers::WatchStream;
118use tracing::info;
119
120use crate::node::NodeMembership;
121use crate::rpc::chitchat_transport::ChitchatTransport;
122use crate::rpc::services::chitchat_impl::ChitchatService;
123
124pub static DEFAULT_CLUSTER_ID: &str = "datacake-cluster-unknown";
125pub static DEFAULT_DATA_CENTER: &str = "datacake-dc-unknown";
126pub type NodeId = u8;
127
128/// Build a Datacake node using provided settings.
129pub struct DatacakeNodeBuilder<S = DCAwareSelector> {
130    node_id: NodeId,
131    connection_cfg: ConnectionConfig,
132    cluster_id: String,
133    data_center: Cow<'static, str>,
134    node_selector: S,
135}
136
137impl<S> DatacakeNodeBuilder<S>
138where
139    S: NodeSelector + Send + 'static,
140{
141    /// Create a new node builder.
142    pub fn new(
143        node_id: NodeId,
144        connection_cfg: ConnectionConfig,
145    ) -> DatacakeNodeBuilder<DCAwareSelector> {
146        DatacakeNodeBuilder {
147            node_id,
148            connection_cfg,
149            cluster_id: DEFAULT_CLUSTER_ID.to_string(),
150            data_center: Cow::Borrowed(DEFAULT_DATA_CENTER),
151            node_selector: DCAwareSelector::default(),
152        }
153    }
154
155    /// Set a node selector.
156    ///
157    /// This is used by systems to select a specific set of nodes from
158    /// the live membership set with a given consistency level.
159    pub fn with_node_selector<S2>(self, selector: S2) -> DatacakeNodeBuilder<S2> {
160        DatacakeNodeBuilder {
161            node_id: self.node_id,
162            connection_cfg: self.connection_cfg,
163            cluster_id: self.cluster_id,
164            data_center: self.data_center,
165            node_selector: selector,
166        }
167    }
168
169    /// Set the cluster id for the given node.
170    pub fn with_cluster_id(mut self, cluster_id: impl Display) -> Self {
171        self.cluster_id = cluster_id.to_string();
172        self
173    }
174
175    /// Set the data center the node belongs to.
176    pub fn with_data_center(mut self, dc: impl Display) -> Self {
177        self.data_center = Cow::Owned(dc.to_string());
178        self
179    }
180
181    /// Starts the Datacake cluster, connecting to the targeted seed nodes.
182    ///
183    /// When connecting to the cluster, the `node_id` **must be unique** otherwise
184    /// the cluster will incorrectly propagate state and not become consistent.
185    ///
186    /// Typically you will only have one cluster and therefore only have one `cluster_id`
187    /// which should be the same for each node in the cluster.
188    /// Currently the `cluster_id` is not handled by anything other than
189    /// [chitchat](https://docs.rs/chitchat/0.4.1/chitchat/)
190    ///
191    /// No seed nodes need to be live at the time of connecting for the cluster to start correctly,
192    /// but they are required in order for nodes to discover one-another and share
193    /// their basic state.
194    pub async fn connect(self) -> Result<DatacakeNode, NodeError> {
195        let clock = Clock::new(self.node_id);
196
197        let statistics = ClusterStatistics::default();
198        let network = RpcNetwork::default();
199
200        let rpc_server = Server::listen(self.connection_cfg.listen_addr).await?;
201        let selector = nodes_selector::start_node_selector(
202            self.connection_cfg.public_addr,
203            self.data_center.clone(),
204            self.node_selector,
205        )
206        .await;
207
208        let cluster_info = ClusterInfo {
209            listen_addr: self.connection_cfg.listen_addr,
210            public_addr: self.connection_cfg.public_addr,
211            seed_nodes: self.connection_cfg.seed_nodes,
212            data_center: self.data_center.as_ref(),
213        };
214        let (node, transport) = connect_node(
215            self.node_id,
216            self.cluster_id.clone(),
217            clock.clone(),
218            network.clone(),
219            cluster_info,
220            &rpc_server,
221            statistics.clone(),
222        )
223        .await?;
224
225        let (tx, membership_changes) = watch::channel(MembershipChange::default());
226        tokio::spawn(watch_membership_changes(
227            self.node_id,
228            network.clone(),
229            selector.clone(),
230            statistics.clone(),
231            node.member_change_watcher(),
232            tx,
233        ));
234
235        info!(
236            node_id = %self.node_id,
237            cluster_id = %self.cluster_id,
238            listen_addr = %self.connection_cfg.listen_addr,
239            "Datacake cluster connected."
240        );
241
242        Ok(DatacakeNode {
243            rpc_server,
244            node,
245            network,
246            clock,
247            selector,
248            membership_changes,
249            // Needs to live to run the network.
250            _transport: transport,
251        })
252    }
253}
254
255#[derive(Debug, Clone)]
256/// Configuration for the cluster network.
257pub struct ConnectionConfig {
258    /// The binding address for the RPC server to bind and listen on.
259    ///
260    /// This is often `0.0.0.0` + your chosen port.
261    pub listen_addr: SocketAddr,
262
263    /// The public address to be broadcast to other cluster members.
264    ///
265    /// This is normally the machine's public IP address and the port the server is listening on.
266    pub public_addr: SocketAddr,
267
268    /// A set of initial seed nodes which the node will attempt to connect to and learn of any
269    /// other members in the cluster.
270    ///
271    /// Normal `2` or `3` seeds is fine when running a multi-node cluster.
272    /// Having only `1` seed can be dangerous if both nodes happen to go down but the seed
273    /// does not restart before this node, as it will be unable to re-join the cluster.
274    pub seed_nodes: Vec<String>,
275}
276
277impl ConnectionConfig {
278    /// Creates a new connection config.
279    pub fn new(
280        listen_addr: SocketAddr,
281        public_addr: SocketAddr,
282        seeds: impl IntoIterator<Item = impl AsRef<str>>,
283    ) -> Self {
284        Self {
285            listen_addr,
286            public_addr,
287            seed_nodes: seeds
288                .into_iter()
289                .map(|seed| seed.as_ref().to_string())
290                .collect(),
291        }
292    }
293}
294
295pub struct DatacakeNode {
296    node: ChitchatNode,
297    rpc_server: Server,
298    clock: Clock,
299    network: RpcNetwork,
300    selector: NodeSelectorHandle,
301    membership_changes: watch::Receiver<MembershipChange>,
302    _transport: Box<dyn Transport>,
303}
304
305impl DatacakeNode {
306    /// Shuts down the cluster and cleans up any connections.
307    pub async fn shutdown(self) {
308        self.node.shutdown().await;
309    }
310
311    /// Add a RPC service to the existing RPC system.
312    pub fn add_rpc_service<Svc>(&self, service: Svc)
313    where
314        Svc: RpcService + Send + Sync + 'static,
315    {
316        self.rpc_server.add_service(service);
317    }
318
319    /// Adds a new cluster extension to the existing node.
320    ///
321    /// Cluster extensions can be used to extend the cluster to provide
322    /// additional functionality like storage, messaging, etc...
323    pub async fn add_extension<Ext>(&self, ext: Ext) -> Result<Ext::Output, Ext::Error>
324    where
325        Ext: ClusterExtension,
326    {
327        ext.init_extension(self).await
328    }
329
330    #[inline]
331    /// Gets the live cluster statistics.
332    pub fn statistics(&self) -> ClusterStatistics {
333        self.node.statistics()
334    }
335
336    #[inline]
337    /// Get access to the cluster clock.
338    pub fn clock(&self) -> &Clock {
339        &self.clock
340    }
341
342    #[inline]
343    /// Get access to the current RPC network.
344    pub fn network(&self) -> &RpcNetwork {
345        &self.network
346    }
347
348    #[inline]
349    /// Return the cluster member of the node itself.
350    pub fn me(&self) -> &ClusterMember {
351        self.node.me.as_ref()
352    }
353
354    #[inline]
355    /// Get a stream of membership changes.
356    pub fn membership_changes(&self) -> WatchStream<MembershipChange> {
357        WatchStream::new(self.membership_changes.clone())
358    }
359
360    #[inline]
361    /// Selects a set of nodes using a provided consistency level.
362    pub async fn select_nodes(
363        &self,
364        consistency: Consistency,
365    ) -> Result<Nodes, ConsistencyError> {
366        self.selector.get_nodes(consistency).await
367    }
368
369    #[inline]
370    /// Waits for the given node IDs to join the cluster or timeout to elapse.
371    pub async fn wait_for_nodes(
372        &self,
373        node_ids: impl AsRef<[NodeId]>,
374        timeout: Duration,
375    ) -> Result<(), anyhow::Error> {
376        let nodes = node_ids.as_ref();
377        self.node
378            .wait_for_members(
379                |members| {
380                    for id in nodes {
381                        if !members.contains_key(id) {
382                            return false;
383                        }
384                    }
385                    true
386                },
387                timeout,
388            )
389            .await
390    }
391
392    #[inline]
393    /// Creates a handle to the cluster providing the core functionality of the node.
394    pub fn handle(&self) -> DatacakeHandle {
395        DatacakeHandle {
396            me: self.node.me.clone(),
397            clock: self.clock.clone(),
398            network: self.network.clone(),
399            selector: self.selector.clone(),
400            statistics: self.statistics(),
401            membership_changes: self.membership_changes.clone(),
402        }
403    }
404}
405
406#[derive(Clone)]
407pub struct DatacakeHandle {
408    me: Cow<'static, ClusterMember>,
409    clock: Clock,
410    network: RpcNetwork,
411    selector: NodeSelectorHandle,
412    statistics: ClusterStatistics,
413    membership_changes: watch::Receiver<MembershipChange>,
414}
415
416impl DatacakeHandle {
417    #[inline]
418    /// Gets the live cluster statistics.
419    pub fn statistics(&self) -> ClusterStatistics {
420        self.statistics.clone()
421    }
422
423    #[inline]
424    /// Get access to the cluster clock.
425    pub fn clock(&self) -> &Clock {
426        &self.clock
427    }
428
429    #[inline]
430    /// Get access to the current RPC network.
431    pub fn network(&self) -> &RpcNetwork {
432        &self.network
433    }
434
435    #[inline]
436    /// Get a stream of membership changes.
437    pub fn membership_changes(&self) -> WatchStream<MembershipChange> {
438        WatchStream::new(self.membership_changes.clone())
439    }
440
441    #[inline]
442    /// Return the cluster member of the node itself.
443    pub fn me(&self) -> &ClusterMember {
444        self.me.as_ref()
445    }
446
447    #[inline]
448    /// Selects a set of nodes using a provided consistency level.
449    pub async fn select_nodes(
450        &self,
451        consistency: Consistency,
452    ) -> Result<Nodes, ConsistencyError> {
453        self.selector.get_nodes(consistency).await
454    }
455}
456
457#[derive(Clone, Default)]
458pub struct MembershipChange {
459    pub joined: Vec<ClusterMember>,
460    pub left: Vec<ClusterMember>,
461}
462
463struct ClusterInfo<'a> {
464    listen_addr: SocketAddr,
465    public_addr: SocketAddr,
466    seed_nodes: Vec<String>,
467    data_center: &'a str,
468}
469
470/// Connects to the chitchat cluster.
471///
472/// The node will attempt to establish connections to the seed nodes and
473/// will broadcast the node's public address to communicate.
474async fn connect_node(
475    node_id: NodeId,
476    cluster_id: String,
477    clock: Clock,
478    network: RpcNetwork,
479    cluster_info: ClusterInfo<'_>,
480    server: &Server,
481    statistics: ClusterStatistics,
482) -> Result<(ChitchatNode, Box<dyn Transport>), NodeError> {
483    let (chitchat_tx, chitchat_rx) = flume::bounded(1000);
484
485    let service = ChitchatService::new(clock.clone(), chitchat_tx);
486    server.add_service(service);
487
488    let transport =
489        ChitchatTransport::new(cluster_info.listen_addr, clock, network, chitchat_rx);
490
491    let me = ClusterMember::new(
492        node_id,
493        cluster_info.public_addr,
494        cluster_info.data_center.to_string(),
495    );
496    let node = ChitchatNode::connect(
497        me,
498        cluster_info.listen_addr,
499        cluster_id,
500        cluster_info.seed_nodes,
501        FailureDetectorConfig::default(),
502        &transport,
503        statistics,
504    )
505    .await?;
506
507    Ok((node, Box::new(transport)))
508}
509
510/// Watches for changes in the cluster membership.
511///
512/// When nodes leave and join, pollers are stopped and started as required.
513async fn watch_membership_changes(
514    self_node_id: NodeId,
515    network: RpcNetwork,
516    node_selector: NodeSelectorHandle,
517    statistics: ClusterStatistics,
518    mut changes: WatchStream<NodeMembership>,
519    membership_changes_tx: watch::Sender<MembershipChange>,
520) {
521    let mut last_network_set = BTreeSet::new();
522    while let Some(members) = changes.next().await {
523        info!(
524            self_node_id = %self_node_id,
525            num_members = members.len(),
526            "Cluster membership has changed."
527        );
528
529        let mut membership_changes = MembershipChange::default();
530        let new_network_set = members
531            .iter()
532            .filter(|(node_id, _)| *node_id != &self_node_id)
533            .map(|(_, member)| (member.node_id, member.public_addr))
534            .collect::<BTreeSet<_>>();
535
536        {
537            let mut data_centers = BTreeMap::<Cow<'static, str>, Nodes>::new();
538            for member in members.values() {
539                let dc = Cow::Owned(member.data_center.clone());
540                data_centers.entry(dc).or_default().push(member.public_addr);
541            }
542
543            statistics
544                .num_data_centers
545                .store(data_centers.len() as u64, Ordering::Relaxed);
546            node_selector.set_nodes(data_centers).await;
547        }
548
549        // Remove client no longer apart of the network.
550        for (node_id, addr) in last_network_set.difference(&new_network_set) {
551            info!(
552                self_node_id = %self_node_id,
553                target_node_id = %node_id,
554                target_addr = %addr,
555                "Node is no longer part of cluster."
556            );
557
558            network.disconnect(*addr);
559
560            if let Some(member) = members.get(node_id) {
561                membership_changes.left.push(member.clone());
562            }
563        }
564
565        // Add new clients for each new node.
566        for (node_id, addr) in new_network_set.difference(&last_network_set) {
567            info!(
568                self_node_id = %self_node_id,
569                target_node_id = %node_id,
570                target_addr = %addr,
571                "Node has connected to the cluster."
572            );
573
574            if let Some(member) = members.get(node_id) {
575                membership_changes.joined.push(member.clone());
576            }
577        }
578
579        let _ = membership_changes_tx.send(membership_changes);
580        last_network_set = new_network_set;
581    }
582}