1pub(crate) mod clock;
84mod error;
85mod extension;
86mod node;
87mod nodes_selector;
88mod rpc;
89mod statistics;
90
91use std::borrow::Cow;
92use std::collections::{BTreeMap, BTreeSet};
93use std::fmt::Display;
94use std::net::SocketAddr;
95use std::sync::atomic::Ordering;
96use std::time::Duration;
97
98use chitchat::transport::Transport;
99use chitchat::FailureDetectorConfig;
100pub use clock::Clock;
101use datacake_rpc::{RpcService, Server};
102pub use error::NodeError;
103pub use extension::ClusterExtension;
104use futures::StreamExt;
105pub use node::{ChitchatNode, ClusterMember};
106pub use nodes_selector::{
107 Consistency,
108 ConsistencyError,
109 DCAwareSelector,
110 NodeSelector,
111 NodeSelectorHandle,
112 Nodes,
113};
114pub use rpc::network::RpcNetwork;
115pub use statistics::ClusterStatistics;
116use tokio::sync::watch;
117use tokio_stream::wrappers::WatchStream;
118use tracing::info;
119
120use crate::node::NodeMembership;
121use crate::rpc::chitchat_transport::ChitchatTransport;
122use crate::rpc::services::chitchat_impl::ChitchatService;
123
124pub static DEFAULT_CLUSTER_ID: &str = "datacake-cluster-unknown";
125pub static DEFAULT_DATA_CENTER: &str = "datacake-dc-unknown";
126pub type NodeId = u8;
127
128pub struct DatacakeNodeBuilder<S = DCAwareSelector> {
130 node_id: NodeId,
131 connection_cfg: ConnectionConfig,
132 cluster_id: String,
133 data_center: Cow<'static, str>,
134 node_selector: S,
135}
136
137impl<S> DatacakeNodeBuilder<S>
138where
139 S: NodeSelector + Send + 'static,
140{
141 pub fn new(
143 node_id: NodeId,
144 connection_cfg: ConnectionConfig,
145 ) -> DatacakeNodeBuilder<DCAwareSelector> {
146 DatacakeNodeBuilder {
147 node_id,
148 connection_cfg,
149 cluster_id: DEFAULT_CLUSTER_ID.to_string(),
150 data_center: Cow::Borrowed(DEFAULT_DATA_CENTER),
151 node_selector: DCAwareSelector::default(),
152 }
153 }
154
155 pub fn with_node_selector<S2>(self, selector: S2) -> DatacakeNodeBuilder<S2> {
160 DatacakeNodeBuilder {
161 node_id: self.node_id,
162 connection_cfg: self.connection_cfg,
163 cluster_id: self.cluster_id,
164 data_center: self.data_center,
165 node_selector: selector,
166 }
167 }
168
169 pub fn with_cluster_id(mut self, cluster_id: impl Display) -> Self {
171 self.cluster_id = cluster_id.to_string();
172 self
173 }
174
175 pub fn with_data_center(mut self, dc: impl Display) -> Self {
177 self.data_center = Cow::Owned(dc.to_string());
178 self
179 }
180
181 pub async fn connect(self) -> Result<DatacakeNode, NodeError> {
195 let clock = Clock::new(self.node_id);
196
197 let statistics = ClusterStatistics::default();
198 let network = RpcNetwork::default();
199
200 let rpc_server = Server::listen(self.connection_cfg.listen_addr).await?;
201 let selector = nodes_selector::start_node_selector(
202 self.connection_cfg.public_addr,
203 self.data_center.clone(),
204 self.node_selector,
205 )
206 .await;
207
208 let cluster_info = ClusterInfo {
209 listen_addr: self.connection_cfg.listen_addr,
210 public_addr: self.connection_cfg.public_addr,
211 seed_nodes: self.connection_cfg.seed_nodes,
212 data_center: self.data_center.as_ref(),
213 };
214 let (node, transport) = connect_node(
215 self.node_id,
216 self.cluster_id.clone(),
217 clock.clone(),
218 network.clone(),
219 cluster_info,
220 &rpc_server,
221 statistics.clone(),
222 )
223 .await?;
224
225 let (tx, membership_changes) = watch::channel(MembershipChange::default());
226 tokio::spawn(watch_membership_changes(
227 self.node_id,
228 network.clone(),
229 selector.clone(),
230 statistics.clone(),
231 node.member_change_watcher(),
232 tx,
233 ));
234
235 info!(
236 node_id = %self.node_id,
237 cluster_id = %self.cluster_id,
238 listen_addr = %self.connection_cfg.listen_addr,
239 "Datacake cluster connected."
240 );
241
242 Ok(DatacakeNode {
243 rpc_server,
244 node,
245 network,
246 clock,
247 selector,
248 membership_changes,
249 _transport: transport,
251 })
252 }
253}
254
255#[derive(Debug, Clone)]
256pub struct ConnectionConfig {
258 pub listen_addr: SocketAddr,
262
263 pub public_addr: SocketAddr,
267
268 pub seed_nodes: Vec<String>,
275}
276
277impl ConnectionConfig {
278 pub fn new(
280 listen_addr: SocketAddr,
281 public_addr: SocketAddr,
282 seeds: impl IntoIterator<Item = impl AsRef<str>>,
283 ) -> Self {
284 Self {
285 listen_addr,
286 public_addr,
287 seed_nodes: seeds
288 .into_iter()
289 .map(|seed| seed.as_ref().to_string())
290 .collect(),
291 }
292 }
293}
294
295pub struct DatacakeNode {
296 node: ChitchatNode,
297 rpc_server: Server,
298 clock: Clock,
299 network: RpcNetwork,
300 selector: NodeSelectorHandle,
301 membership_changes: watch::Receiver<MembershipChange>,
302 _transport: Box<dyn Transport>,
303}
304
305impl DatacakeNode {
306 pub async fn shutdown(self) {
308 self.node.shutdown().await;
309 }
310
311 pub fn add_rpc_service<Svc>(&self, service: Svc)
313 where
314 Svc: RpcService + Send + Sync + 'static,
315 {
316 self.rpc_server.add_service(service);
317 }
318
319 pub async fn add_extension<Ext>(&self, ext: Ext) -> Result<Ext::Output, Ext::Error>
324 where
325 Ext: ClusterExtension,
326 {
327 ext.init_extension(self).await
328 }
329
330 #[inline]
331 pub fn statistics(&self) -> ClusterStatistics {
333 self.node.statistics()
334 }
335
336 #[inline]
337 pub fn clock(&self) -> &Clock {
339 &self.clock
340 }
341
342 #[inline]
343 pub fn network(&self) -> &RpcNetwork {
345 &self.network
346 }
347
348 #[inline]
349 pub fn me(&self) -> &ClusterMember {
351 self.node.me.as_ref()
352 }
353
354 #[inline]
355 pub fn membership_changes(&self) -> WatchStream<MembershipChange> {
357 WatchStream::new(self.membership_changes.clone())
358 }
359
360 #[inline]
361 pub async fn select_nodes(
363 &self,
364 consistency: Consistency,
365 ) -> Result<Nodes, ConsistencyError> {
366 self.selector.get_nodes(consistency).await
367 }
368
369 #[inline]
370 pub async fn wait_for_nodes(
372 &self,
373 node_ids: impl AsRef<[NodeId]>,
374 timeout: Duration,
375 ) -> Result<(), anyhow::Error> {
376 let nodes = node_ids.as_ref();
377 self.node
378 .wait_for_members(
379 |members| {
380 for id in nodes {
381 if !members.contains_key(id) {
382 return false;
383 }
384 }
385 true
386 },
387 timeout,
388 )
389 .await
390 }
391
392 #[inline]
393 pub fn handle(&self) -> DatacakeHandle {
395 DatacakeHandle {
396 me: self.node.me.clone(),
397 clock: self.clock.clone(),
398 network: self.network.clone(),
399 selector: self.selector.clone(),
400 statistics: self.statistics(),
401 membership_changes: self.membership_changes.clone(),
402 }
403 }
404}
405
406#[derive(Clone)]
407pub struct DatacakeHandle {
408 me: Cow<'static, ClusterMember>,
409 clock: Clock,
410 network: RpcNetwork,
411 selector: NodeSelectorHandle,
412 statistics: ClusterStatistics,
413 membership_changes: watch::Receiver<MembershipChange>,
414}
415
416impl DatacakeHandle {
417 #[inline]
418 pub fn statistics(&self) -> ClusterStatistics {
420 self.statistics.clone()
421 }
422
423 #[inline]
424 pub fn clock(&self) -> &Clock {
426 &self.clock
427 }
428
429 #[inline]
430 pub fn network(&self) -> &RpcNetwork {
432 &self.network
433 }
434
435 #[inline]
436 pub fn membership_changes(&self) -> WatchStream<MembershipChange> {
438 WatchStream::new(self.membership_changes.clone())
439 }
440
441 #[inline]
442 pub fn me(&self) -> &ClusterMember {
444 self.me.as_ref()
445 }
446
447 #[inline]
448 pub async fn select_nodes(
450 &self,
451 consistency: Consistency,
452 ) -> Result<Nodes, ConsistencyError> {
453 self.selector.get_nodes(consistency).await
454 }
455}
456
457#[derive(Clone, Default)]
458pub struct MembershipChange {
459 pub joined: Vec<ClusterMember>,
460 pub left: Vec<ClusterMember>,
461}
462
463struct ClusterInfo<'a> {
464 listen_addr: SocketAddr,
465 public_addr: SocketAddr,
466 seed_nodes: Vec<String>,
467 data_center: &'a str,
468}
469
470async fn connect_node(
475 node_id: NodeId,
476 cluster_id: String,
477 clock: Clock,
478 network: RpcNetwork,
479 cluster_info: ClusterInfo<'_>,
480 server: &Server,
481 statistics: ClusterStatistics,
482) -> Result<(ChitchatNode, Box<dyn Transport>), NodeError> {
483 let (chitchat_tx, chitchat_rx) = flume::bounded(1000);
484
485 let service = ChitchatService::new(clock.clone(), chitchat_tx);
486 server.add_service(service);
487
488 let transport =
489 ChitchatTransport::new(cluster_info.listen_addr, clock, network, chitchat_rx);
490
491 let me = ClusterMember::new(
492 node_id,
493 cluster_info.public_addr,
494 cluster_info.data_center.to_string(),
495 );
496 let node = ChitchatNode::connect(
497 me,
498 cluster_info.listen_addr,
499 cluster_id,
500 cluster_info.seed_nodes,
501 FailureDetectorConfig::default(),
502 &transport,
503 statistics,
504 )
505 .await?;
506
507 Ok((node, Box::new(transport)))
508}
509
510async fn watch_membership_changes(
514 self_node_id: NodeId,
515 network: RpcNetwork,
516 node_selector: NodeSelectorHandle,
517 statistics: ClusterStatistics,
518 mut changes: WatchStream<NodeMembership>,
519 membership_changes_tx: watch::Sender<MembershipChange>,
520) {
521 let mut last_network_set = BTreeSet::new();
522 while let Some(members) = changes.next().await {
523 info!(
524 self_node_id = %self_node_id,
525 num_members = members.len(),
526 "Cluster membership has changed."
527 );
528
529 let mut membership_changes = MembershipChange::default();
530 let new_network_set = members
531 .iter()
532 .filter(|(node_id, _)| *node_id != &self_node_id)
533 .map(|(_, member)| (member.node_id, member.public_addr))
534 .collect::<BTreeSet<_>>();
535
536 {
537 let mut data_centers = BTreeMap::<Cow<'static, str>, Nodes>::new();
538 for member in members.values() {
539 let dc = Cow::Owned(member.data_center.clone());
540 data_centers.entry(dc).or_default().push(member.public_addr);
541 }
542
543 statistics
544 .num_data_centers
545 .store(data_centers.len() as u64, Ordering::Relaxed);
546 node_selector.set_nodes(data_centers).await;
547 }
548
549 for (node_id, addr) in last_network_set.difference(&new_network_set) {
551 info!(
552 self_node_id = %self_node_id,
553 target_node_id = %node_id,
554 target_addr = %addr,
555 "Node is no longer part of cluster."
556 );
557
558 network.disconnect(*addr);
559
560 if let Some(member) = members.get(node_id) {
561 membership_changes.left.push(member.clone());
562 }
563 }
564
565 for (node_id, addr) in new_network_set.difference(&last_network_set) {
567 info!(
568 self_node_id = %self_node_id,
569 target_node_id = %node_id,
570 target_addr = %addr,
571 "Node has connected to the cluster."
572 );
573
574 if let Some(member) = members.get(node_id) {
575 membership_changes.joined.push(member.clone());
576 }
577 }
578
579 let _ = membership_changes_tx.send(membership_changes);
580 last_network_set = new_network_set;
581 }
582}