1use std::collections::HashMap;
16use std::net::SocketAddr;
17use std::sync::atomic::{AtomicU64, Ordering};
18use std::sync::{Arc, OnceLock, Weak};
19
20use parking_lot::Mutex;
21use tokio::net::TcpListener;
22use tokio::task::JoinHandle;
23use tokio::time::{Duration, MissedTickBehavior};
24use tokio_util::sync::CancellationToken;
25
26use crate::cluster::dispatch::{ClusterDispatcher, DispatchPlan};
27use crate::cluster::peer::{Peer, PeerEndpoint, PeerState};
28use crate::cluster::pool::{PoolConfig, ServerPool};
29use crate::conf::{ConfDynSeed, ConfPool, Config};
30use crate::embed::error::EmbedError;
31use crate::embed::events::{CloseReason, ConnRoleTag, EventBus, EventStream, ServerEvent};
32use crate::embed::hooks::{
33 CryptoProvider, Datastore, LoggingMetricsSink, MetricsSink, SeedsProvider,
34};
35use crate::embed::snapshots::{DatacenterSnapshot, PeerSnapshot, RingSnapshot};
36use crate::events::EventManager;
37use crate::hashkit::DynToken;
38use crate::msg::Msg;
39use crate::stats::{
40 describe_stats, MetricSpec, PoolField, PoolStats, ServerField, ServerStats, ServiceInfo,
41 Snapshot, Stats,
42};
43
44#[non_exhaustive]
52pub struct ServerHooks {
53 pub(crate) datastore: Option<Box<dyn Datastore>>,
54 pub(crate) seeds: Option<Box<dyn SeedsProvider>>,
55 pub(crate) crypto: Option<Box<dyn CryptoProvider>>,
56 pub(crate) metrics: Option<Box<dyn MetricsSink>>,
57}
58
59impl ServerHooks {
60 #[must_use]
62 pub fn datastore(&self) -> Option<&dyn Datastore> {
63 self.datastore.as_deref()
64 }
65
66 #[must_use]
68 pub fn seeds(&self) -> Option<&dyn SeedsProvider> {
69 self.seeds.as_deref()
70 }
71
72 #[must_use]
74 pub fn crypto(&self) -> Option<&dyn CryptoProvider> {
75 self.crypto.as_deref()
76 }
77
78 #[must_use]
80 pub fn metrics(&self) -> Option<&dyn MetricsSink> {
81 self.metrics.as_deref()
82 }
83}
84
85impl std::fmt::Debug for ServerHooks {
86 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
87 f.debug_struct("ServerHooks")
88 .field("datastore", &self.datastore.is_some())
89 .field("seeds", &self.seeds.is_some())
90 .field("crypto", &self.crypto.is_some())
91 .field("metrics", &self.metrics.is_some())
92 .finish()
93 }
94}
95
96fn registry() -> &'static Mutex<HashMap<SocketAddr, Weak<ServerInner>>> {
108 static R: OnceLock<Mutex<HashMap<SocketAddr, Weak<ServerInner>>>> = OnceLock::new();
109 R.get_or_init(|| Mutex::new(HashMap::new()))
110}
111
112fn registry_register(addr: SocketAddr, server: &Arc<ServerInner>) {
113 registry().lock().insert(addr, Arc::downgrade(server));
114}
115
116fn registry_remove(addr: SocketAddr) {
117 registry().lock().remove(&addr);
118}
119
120fn registry_lookup(addr: SocketAddr) -> Option<Arc<ServerInner>> {
121 registry().lock().get(&addr).and_then(Weak::upgrade)
122}
123
124fn next_generation() -> u64 {
129 static G: AtomicU64 = AtomicU64::new(0);
130 G.fetch_add(1, Ordering::Relaxed)
131}
132
133fn next_conn_id() -> u64 {
135 static C: AtomicU64 = AtomicU64::new(0);
136 C.fetch_add(1, Ordering::Relaxed)
137}
138
139pub(crate) struct ServerInner {
141 pool: Arc<ServerPool>,
142 dispatcher: ClusterDispatcher,
143 stats: Arc<Stats>,
144 snapshot_cache: Arc<Mutex<Snapshot>>,
145 bus: EventBus,
146 events: Arc<EventManager>,
147 datastore: Box<dyn Datastore>,
148 seeds: Box<dyn SeedsProvider>,
149 metrics: Box<dyn MetricsSink>,
150 crypto: Option<Box<dyn CryptoProvider>>,
151 listen_addr: Option<SocketAddr>,
152 dyn_listen_addr: Option<SocketAddr>,
153 cancel: CancellationToken,
154 pool_name: String,
155 config: Mutex<ConfPool>,
156 generation: AtomicU64,
157 vector_registry: Arc<crate::vector::registry::VectorRegistry>,
158}
159
160impl std::fmt::Debug for ServerInner {
161 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
162 f.debug_struct("ServerInner")
163 .field("pool_name", &self.pool_name)
164 .field("listen", &self.listen_addr)
165 .field("dyn_listen", &self.dyn_listen_addr)
166 .finish_non_exhaustive()
167 }
168}
169
170impl ServerInner {
171 fn dispatch_local(&self, req: Msg) -> super::hooks::BoxFuture<'_, Result<Msg, EmbedError>> {
172 let fut = self.datastore.dispatch(req);
173 Box::pin(async move {
174 let rsp = fut.await.map_err(|e| EmbedError::Inject(e.to_string()))?;
175 Ok(rsp)
176 })
177 }
178}
179
180#[derive(Debug)]
187pub struct Server {
188 pool_name: String,
189 pool: ConfPool,
190 cluster: Arc<ServerPool>,
191 hooks: ServerHooks,
192 stats: Arc<Stats>,
193 vector_registry: Arc<crate::vector::registry::VectorRegistry>,
194}
195
196impl Server {
197 pub(crate) fn from_pool(
199 pool_name: String,
200 pool: ConfPool,
201 hooks: ServerHooks,
202 vector_registry: Arc<crate::vector::registry::VectorRegistry>,
203 ) -> Self {
204 let pool_cfg = PoolConfig::from_conf(&pool_name, &pool);
205 let local_peer = build_local_peer(&pool, &pool_cfg);
206 let mut peers = vec![local_peer];
207 if let Some(seeds) = pool.dyn_seeds.as_ref() {
208 let start = u32::try_from(peers.len()).unwrap_or(0);
209 peers.extend(peers_from_seeds(&pool_cfg, seeds, start));
210 }
211 let server_pool_arc = Arc::new(ServerPool::new(pool_cfg.clone(), peers));
212 server_pool_arc.preselect_remote_racks();
213
214 let stats = Arc::new(Stats::new(
215 ServiceInfo {
216 source: pool_cfg.name.clone(),
217 version: env!("CARGO_PKG_VERSION").to_string(),
218 rack: pool_cfg.rack.clone(),
219 dc: pool_cfg.dc.clone(),
220 },
221 PoolStats::new(&pool_cfg.name),
222 ServerStats::new("backend"),
223 ));
224
225 Self {
226 pool_name,
227 pool,
228 cluster: server_pool_arc,
229 hooks,
230 stats,
231 vector_registry,
232 }
233 }
234
235 #[must_use]
241 pub fn vector_registry(&self) -> &Arc<crate::vector::registry::VectorRegistry> {
242 &self.vector_registry
243 }
244
245 #[must_use]
247 pub fn pool_name(&self) -> &str {
248 &self.pool_name
249 }
250
251 pub async fn start(self) -> Result<ServerHandle, EmbedError> {
292 let Server {
293 pool_name,
294 pool,
295 cluster,
296 mut hooks,
297 stats,
298 vector_registry,
299 } = self;
300
301 let dispatcher =
302 ClusterDispatcher::new(cluster.clone()).with_vector_registry(vector_registry.clone());
303 let bus = EventBus::new(64);
304 let events = Arc::new(EventManager::new(64));
305 let cancel = CancellationToken::new();
306 let snapshot_cache = Arc::new(Mutex::new(Snapshot::default()));
307
308 let datastore = hooks
309 .datastore
310 .take()
311 .expect("invariant: builder always populates datastore");
312 let seeds = hooks
313 .seeds
314 .take()
315 .expect("invariant: builder always populates seeds");
316 let metrics = hooks
317 .metrics
318 .take()
319 .unwrap_or_else(|| Box::new(LoggingMetricsSink::new(pool_name.clone())));
320
321 let (listen_listener, listen_addr) = bind_listener(pool.listen.as_ref()).await?;
322 let (dyn_listener, dyn_listen_addr) = bind_listener(pool.dyn_listen.as_ref()).await?;
323
324 let inner = Arc::new(ServerInner {
325 pool: cluster.clone(),
326 dispatcher,
327 stats: stats.clone(),
328 snapshot_cache: snapshot_cache.clone(),
329 bus: bus.clone(),
330 events: events.clone(),
331 datastore,
332 seeds,
333 metrics,
334 crypto: hooks.crypto,
335 listen_addr,
336 dyn_listen_addr,
337 cancel: cancel.clone(),
338 pool_name: pool_name.clone(),
339 config: Mutex::new(pool.clone()),
340 generation: AtomicU64::new(0),
341 vector_registry,
342 });
343
344 if let Some(addr) = inner.dyn_listen_addr {
347 registry_register(addr, &inner);
348 }
349
350 let mut tasks: Vec<JoinHandle<()>> = Vec::new();
352 tasks.push(tokio::spawn(stats_loop(inner.clone(), pool.stats_interval)));
353 tasks.push(tokio::spawn(metrics_loop(inner.clone())));
354 if pool.enable_gossip.unwrap_or(false) {
355 tasks.push(tokio::spawn(gossip_loop(
356 inner.clone(),
357 Duration::from_millis(
358 u64::try_from(pool.gos_interval.unwrap_or(1_000)).unwrap_or(1_000),
359 ),
360 )));
361 }
362 if let (Some(listener), Some(addr)) = (listen_listener, listen_addr) {
363 tasks.push(tokio::spawn(accept_loop(
364 inner.clone(),
365 listener,
366 addr,
367 ConnRoleTag::Proxy,
368 )));
369 }
370 if let (Some(listener), Some(addr)) = (dyn_listener, dyn_listen_addr) {
371 tasks.push(tokio::spawn(accept_loop(
372 inner.clone(),
373 listener,
374 addr,
375 ConnRoleTag::DnodeProxy,
376 )));
377 }
378
379 Ok(ServerHandle {
380 inner,
381 tasks: Arc::new(Mutex::new(tasks)),
382 })
383 }
384}
385
386#[derive(Clone)]
393pub struct ServerHandle {
394 inner: Arc<ServerInner>,
395 tasks: Arc<Mutex<Vec<JoinHandle<()>>>>,
396}
397
398impl std::fmt::Debug for ServerHandle {
399 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
400 f.debug_struct("ServerHandle")
401 .field("pool", &self.inner.pool_name)
402 .field("listen", &self.inner.listen_addr)
403 .field("dyn_listen", &self.inner.dyn_listen_addr)
404 .finish_non_exhaustive()
405 }
406}
407
408impl ServerHandle {
409 #[must_use]
412 pub fn crypto_provider(&self) -> Option<&dyn CryptoProvider> {
413 self.inner.crypto.as_deref()
414 }
415
416 #[must_use]
421 pub fn vector_registry(&self) -> Arc<crate::vector::registry::VectorRegistry> {
422 Arc::clone(&self.inner.vector_registry)
423 }
424
425 #[must_use]
428 pub fn listen_addr(&self) -> Option<SocketAddr> {
429 self.inner.listen_addr
430 }
431
432 #[must_use]
435 pub fn dyn_listen_addr(&self) -> Option<SocketAddr> {
436 self.inner.dyn_listen_addr
437 }
438
439 #[must_use]
460 pub fn subscribe_events(&self) -> EventStream {
461 self.inner.bus.subscribe()
462 }
463
464 #[must_use]
472 pub fn events(&self) -> Arc<EventManager> {
473 Arc::clone(&self.inner.events)
474 }
475
476 #[must_use]
478 pub fn stats(&self) -> Snapshot {
479 self.inner.stats.snapshot()
480 }
481
482 #[must_use]
497 pub fn stats_handle(&self) -> Arc<Stats> {
498 Arc::clone(&self.inner.stats)
499 }
500
501 #[must_use]
503 pub fn describe_stats(&self) -> Vec<MetricSpec> {
504 let _ = describe_stats(); let mut out: Vec<MetricSpec> = Vec::new();
506 out.extend(crate::stats::POOL_CODEC.iter().copied());
507 out.extend(crate::stats::SERVER_CODEC.iter().copied());
508 out
509 }
510
511 #[must_use]
513 pub fn peers(&self) -> Vec<PeerSnapshot> {
514 self.inner
515 .pool
516 .peers()
517 .read()
518 .iter()
519 .map(PeerSnapshot::from)
520 .collect()
521 }
522
523 #[must_use]
525 pub fn datacenters(&self) -> Vec<DatacenterSnapshot> {
526 self.inner
527 .pool
528 .datacenters()
529 .read()
530 .iter()
531 .map(DatacenterSnapshot::from)
532 .collect()
533 }
534
535 #[must_use]
537 pub fn ring(&self) -> RingSnapshot {
538 let dcs = self.inner.pool.datacenters().read();
539 let mut entries: Vec<(DynToken, u32)> = Vec::new();
540 for dc in dcs.iter() {
541 for rack in dc.racks() {
542 for c in rack.continuums() {
543 entries.push((c.token.clone(), c.peer_idx));
544 }
545 }
546 }
547 RingSnapshot {
548 entries,
549 generation: self.inner.generation.load(Ordering::Relaxed),
550 }
551 }
552
553 pub async fn inject_request(&self, req: Msg) -> Result<Msg, EmbedError> {
565 let key: Vec<u8> = req
569 .keys()
570 .first()
571 .map(|kp| kp.tag_bytes().to_vec())
572 .unwrap_or_default();
573 let plan = self.inner.dispatcher.plan(&req, &key);
574 match plan {
575 DispatchPlan::Drop => {
576 let mut rsp = Msg::new(req.id(), crate::msg::MsgType::Unknown, false);
579 rsp.set_parent_id(req.id());
580 Ok(rsp)
581 }
582 DispatchPlan::NoTargets => Err(EmbedError::Inject(
583 "cluster has no quorum-eligible targets".into(),
584 )),
585 DispatchPlan::LocalDatastore => self.inner.dispatch_local(req).await,
586 DispatchPlan::Replicas { targets, .. } => {
587 self.inner.stats.pool_incr_by(PoolField::ForwardError, 0); self.inner.stats.server_incr(ServerField::ReadRequests);
589 let resolved: Vec<(bool, Option<SocketAddr>)> = {
593 let peers = self.inner.pool.peers().read();
594 targets
595 .iter()
596 .map(|t| {
597 let peer = peers.get(t.peer_idx as usize);
598 let is_local = peer.is_some_and(crate::cluster::peer::Peer::is_local);
599 let addr = peer.and_then(|p| p.endpoint().pname().parse().ok());
600 (is_local, addr)
601 })
602 .collect()
603 };
604 let mut last_err: Option<EmbedError> = None;
605 for (is_local, addr) in resolved {
606 if is_local {
607 return self.inner.dispatch_local(req).await;
608 }
609 if let Some(addr) = addr {
610 if let Some(remote) = registry_lookup(addr) {
611 let mut forwarded = Msg::new(req.id(), req.ty(), true);
612 forwarded.set_parent_id(req.parent_id());
613 match remote.datastore.dispatch(forwarded).await {
614 Ok(rsp) => return Ok(rsp),
615 Err(e) => last_err = Some(EmbedError::Inject(e.to_string())),
616 }
617 }
618 }
619 }
620 if let Some(e) = last_err {
621 return Err(e);
622 }
623 self.inner.dispatch_local(req).await
627 }
628 }
629 }
630
631 pub async fn reload(&self, mut cfg: Config) -> Result<(), EmbedError> {
640 cfg.finalize();
641 cfg.validate()?;
642 let new_pool = cfg.pool().clone();
643 *self.inner.config.lock() = new_pool;
648 let gen_id = next_generation();
649 self.inner.generation.store(gen_id, Ordering::Relaxed);
650 self.inner
651 .bus
652 .send(ServerEvent::ConfigReloaded { generation: gen_id });
653 tokio::task::yield_now().await;
656 Ok(())
657 }
658
659 pub async fn shutdown(&self) -> Result<(), EmbedError> {
667 self.inner.cancel.cancel();
668 if let Some(addr) = self.inner.dyn_listen_addr {
669 registry_remove(addr);
670 }
671 let drained: Vec<JoinHandle<()>> = std::mem::take(&mut *self.tasks.lock());
672 for t in drained {
673 let _ = t.await;
675 }
676 Ok(())
677 }
678
679 pub async fn join(&self) {
715 let drained: Vec<JoinHandle<()>> = std::mem::take(&mut *self.tasks.lock());
716 for t in drained {
717 let _ = t.await;
718 }
719 }
720}
721
722fn build_local_peer(pool: &ConfPool, cfg: &PoolConfig) -> Peer {
725 let dyn_listen = pool.dyn_listen.as_ref().map_or_else(
726 || ("127.0.0.1".to_string(), 0u16),
727 |l| (l.name().to_string(), l.port()),
728 );
729 let tokens = pool
730 .tokens
731 .as_ref()
732 .map(|tl| {
733 tl.components()
734 .iter()
735 .map(|c| DynToken::from_u32(c.digits.parse::<u32>().unwrap_or(0)))
736 .collect::<Vec<_>>()
737 })
738 .unwrap_or_default();
739 let mut peer = Peer::new(
740 0,
741 PeerEndpoint::tcp(dyn_listen.0, dyn_listen.1),
742 cfg.rack.clone(),
743 cfg.dc.clone(),
744 if tokens.is_empty() {
745 vec![DynToken::from_u32(0)]
746 } else {
747 tokens
748 },
749 true,
750 true,
751 false,
752 );
753 let now_secs = std::time::SystemTime::now()
754 .duration_since(std::time::UNIX_EPOCH)
755 .map(|d| d.as_secs())
756 .unwrap_or(0);
757 peer.set_state(PeerState::Normal, now_secs);
758 peer
759}
760
761fn peers_from_seeds(cfg: &PoolConfig, seeds: &[ConfDynSeed], start_idx: u32) -> Vec<Peer> {
762 let now_secs = std::time::SystemTime::now()
763 .duration_since(std::time::UNIX_EPOCH)
764 .map(|d| d.as_secs())
765 .unwrap_or(0);
766 seeds
767 .iter()
768 .enumerate()
769 .map(|(i, s)| {
770 let tokens: Vec<DynToken> = s
771 .tokens()
772 .components()
773 .iter()
774 .map(|c| DynToken::from_u32(c.digits.parse::<u32>().unwrap_or(0)))
775 .collect();
776 let idx_off = u32::try_from(i).unwrap_or(0);
777 let mut p = Peer::new(
778 start_idx + idx_off,
779 PeerEndpoint::tcp(s.host().to_string(), s.port()),
780 s.rack().to_string(),
781 s.dc().to_string(),
782 if tokens.is_empty() {
783 vec![DynToken::from_u32(0)]
784 } else {
785 tokens
786 },
787 false,
788 s.dc() == cfg.dc,
789 false,
790 );
791 p.set_state(PeerState::Normal, now_secs);
792 p
793 })
794 .collect()
795}
796
797async fn bind_listener(
800 listen: Option<&crate::conf::ConfListen>,
801) -> Result<(Option<TcpListener>, Option<SocketAddr>), EmbedError> {
802 let Some(l) = listen else {
803 return Ok((None, None));
804 };
805 let host = l.name();
806 let port = l.port();
807 if host.is_empty() {
808 return Ok((None, None));
809 }
810 let addr_str = format!("{host}:{port}");
811 let Ok(_addr) = addr_str.parse::<SocketAddr>() else {
812 return Ok((None, None));
813 };
814 let listener = TcpListener::bind(&addr_str).await?;
815 let local = listener.local_addr()?;
816 Ok((Some(listener), Some(local)))
817}
818
819async fn accept_loop(
822 inner: Arc<ServerInner>,
823 listener: TcpListener,
824 addr: SocketAddr,
825 role: ConnRoleTag,
826) {
827 loop {
828 tokio::select! {
829 biased;
830 () = inner.cancel.cancelled() => return,
831 res = listener.accept() => {
832 let Ok((sock, peer)) = res else { return };
833 let conn_id = next_conn_id();
834 inner.bus.send(ServerEvent::ConnectionAccepted {
835 conn_id,
836 role,
837 local_addr: Some(addr),
838 });
839 tracing::warn!(
848 listen = %addr,
849 peer = %peer,
850 role = ?role,
851 conn_id,
852 "embedded listen_addr accepted a connection; embedded mode does not yet \
853 forward to the dispatcher; use ServerHandle::inject_request instead. \
854 Closing connection."
855 );
856 let bus = inner.bus.clone();
857 let cancel = inner.cancel.clone();
858 tokio::spawn(async move {
859 let _ = sock; let close_reason = if cancel.is_cancelled() {
861 CloseReason::LocalClose
862 } else {
863 CloseReason::PeerEof
864 };
865 bus.send(ServerEvent::ConnectionClosed { conn_id, reason: close_reason });
866 });
867 }
868 }
869 }
870}
871
872async fn stats_loop(inner: Arc<ServerInner>, interval_ms: Option<i64>) {
873 let interval =
874 Duration::from_millis(u64::try_from(interval_ms.unwrap_or(1_000)).unwrap_or(1_000));
875 let mut ticker = tokio::time::interval(interval);
876 ticker.set_missed_tick_behavior(MissedTickBehavior::Delay);
877 loop {
878 tokio::select! {
879 biased;
880 () = inner.cancel.cancelled() => return,
881 _ = ticker.tick() => {
882 let snap = inner.stats.snapshot();
883 *inner.snapshot_cache.lock() = snap;
884 }
885 }
886 }
887}
888
889async fn metrics_loop(inner: Arc<ServerInner>) {
890 let interval = inner.metrics.flush_interval();
891 let mut ticker = tokio::time::interval(interval.max(Duration::from_millis(50)));
892 ticker.set_missed_tick_behavior(MissedTickBehavior::Delay);
893 loop {
894 tokio::select! {
895 biased;
896 () = inner.cancel.cancelled() => return,
897 _ = ticker.tick() => {
898 let snap = inner.snapshot_cache.lock().clone();
899 let _ = inner.metrics.emit(&snap).await;
900 }
901 }
902 }
903}
904
905async fn gossip_loop(inner: Arc<ServerInner>, interval: Duration) {
906 let mut ticker = tokio::time::interval(interval.max(Duration::from_millis(20)));
907 ticker.set_missed_tick_behavior(MissedTickBehavior::Delay);
908 let mut round: u64 = 0;
909 let mut known: std::collections::HashSet<(String, u16)> = std::collections::HashSet::new();
910 {
911 let peers = inner.pool.peers().read();
912 for p in peers.iter() {
913 known.insert((p.endpoint().host().to_string(), p.endpoint().port()));
914 }
915 }
916 loop {
917 tokio::select! {
918 biased;
919 () = inner.cancel.cancelled() => return,
920 _ = ticker.tick() => {
921 let round_started = std::time::Instant::now();
922 let round_ts = std::time::SystemTime::now();
923 round += 1;
924 let seeds = inner.seeds.fetch().unwrap_or_default();
925 let mut added: u32 = 0;
926 {
927 let mut peers = inner.pool.peers().write();
928 let now_secs = std::time::SystemTime::now()
929 .duration_since(std::time::UNIX_EPOCH)
930 .map(|d| d.as_secs())
931 .unwrap_or(0);
932 let cfg = inner.pool.config().clone();
933 for seed in &seeds {
934 let key = (seed.host().to_string(), seed.port());
935 if known.contains(&key) {
936 continue;
937 }
938 let next_idx = u32::try_from(peers.len()).unwrap_or(u32::MAX);
939 let tokens: Vec<DynToken> = seed
940 .tokens()
941 .components()
942 .iter()
943 .map(|c| DynToken::from_u32(c.digits.parse::<u32>().unwrap_or(0)))
944 .collect();
945 let mut p = Peer::new(
946 next_idx,
947 PeerEndpoint::tcp(seed.host().to_string(), seed.port()),
948 seed.rack().to_string(),
949 seed.dc().to_string(),
950 if tokens.is_empty() {
951 vec![DynToken::from_u32(0)]
952 } else {
953 tokens
954 },
955 false,
956 seed.dc() == cfg.dc,
957 false,
958 );
959 p.set_state(PeerState::Normal, now_secs);
960 peers.push(p);
961 known.insert(key);
962 added += 1;
963 }
964 }
965 if added > 0 {
966 rebuild_topology(&inner.pool);
970 let peers_now = inner.pool.peers().read();
971 for (idx, p) in peers_now.iter().enumerate().rev().take(added as usize) {
972 let _ = idx;
973 inner.bus.send(ServerEvent::PeerUp(p.idx()));
974 inner.events.publish(crate::events::ClusterEvent::PeerUp {
975 peer_id: p.idx(),
976 dc: p.dc().to_string(),
977 ts: round_ts,
978 });
979 }
980 inner.events.publish(crate::events::ClusterEvent::RingChanged {
981 tag: "seed-discovery".to_string(),
982 ts: round_ts,
983 });
984 }
985 let count = u32::try_from(inner.pool.peers().read().len()).unwrap_or(u32::MAX);
986 inner.bus.send(ServerEvent::GossipRound { round, peers: count });
987 inner.events.publish(crate::events::ClusterEvent::GossipRoundComplete {
988 duration: round_started.elapsed(),
989 peers_seen: count as usize,
990 ts: round_ts,
991 });
992 inner.stats.pool_incr(PoolField::StatsCount);
993 }
994 }
995 }
996}
997
998fn rebuild_topology(pool: &Arc<ServerPool>) {
999 use crate::cluster::Datacenter;
1000 let peers = pool.peers().read();
1001 let mut new_dcs: Vec<Datacenter> = Vec::new();
1002 for p in peers.iter() {
1003 let idx = if let Some(i) = new_dcs.iter().position(|d| d.name() == p.dc()) {
1004 i
1005 } else {
1006 new_dcs.push(Datacenter::new(p.dc().to_string()));
1007 new_dcs.len() - 1
1008 };
1009 new_dcs[idx].upsert_rack(p.rack().to_string());
1010 }
1011 drop(peers);
1012 {
1013 let mut dcs = pool.datacenters().write();
1014 *dcs = new_dcs;
1015 }
1016 pool.rebuild_ring();
1017 pool.preselect_remote_racks();
1018}
1019
1020