1use std::collections::HashMap;
16use std::net::SocketAddr;
17use std::sync::atomic::{AtomicU64, Ordering};
18use std::sync::{Arc, OnceLock, Weak};
19
20use parking_lot::Mutex;
21use tokio::net::TcpListener;
22use tokio::task::JoinHandle;
23use tokio::time::{Duration, MissedTickBehavior};
24use tokio_util::sync::CancellationToken;
25
26use crate::cluster::dispatch::{ClusterDispatcher, DispatchPlan};
27use crate::cluster::peer::{Peer, PeerEndpoint, PeerState};
28use crate::cluster::pool::{PoolConfig, ServerPool};
29use crate::conf::{ConfDynSeed, ConfPool, Config};
30use crate::embed::error::EmbedError;
31use crate::embed::events::{CloseReason, ConnRoleTag, EventBus, EventStream, ServerEvent};
32use crate::embed::hooks::{
33 CryptoProvider, Datastore, LoggingMetricsSink, MetricsSink, SeedsProvider,
34};
35use crate::embed::snapshots::{DatacenterSnapshot, PeerSnapshot, RingSnapshot};
36use crate::events::EventManager;
37use crate::hashkit::DynToken;
38use crate::msg::Msg;
39use crate::stats::{
40 describe_stats, MetricSpec, PoolField, PoolStats, ServerField, ServerStats, ServiceInfo,
41 Snapshot, Stats,
42};
43
44#[non_exhaustive]
52pub struct ServerHooks {
53 pub(crate) datastore: Option<Box<dyn Datastore>>,
54 pub(crate) seeds: Option<Box<dyn SeedsProvider>>,
55 pub(crate) crypto: Option<Box<dyn CryptoProvider>>,
56 pub(crate) metrics: Option<Box<dyn MetricsSink>>,
57}
58
59impl ServerHooks {
60 #[must_use]
62 pub fn datastore(&self) -> Option<&dyn Datastore> {
63 self.datastore.as_deref()
64 }
65
66 #[must_use]
68 pub fn seeds(&self) -> Option<&dyn SeedsProvider> {
69 self.seeds.as_deref()
70 }
71
72 #[must_use]
74 pub fn crypto(&self) -> Option<&dyn CryptoProvider> {
75 self.crypto.as_deref()
76 }
77
78 #[must_use]
80 pub fn metrics(&self) -> Option<&dyn MetricsSink> {
81 self.metrics.as_deref()
82 }
83}
84
85impl std::fmt::Debug for ServerHooks {
86 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
87 f.debug_struct("ServerHooks")
88 .field("datastore", &self.datastore.is_some())
89 .field("seeds", &self.seeds.is_some())
90 .field("crypto", &self.crypto.is_some())
91 .field("metrics", &self.metrics.is_some())
92 .finish()
93 }
94}
95
96fn registry() -> &'static Mutex<HashMap<SocketAddr, Weak<ServerInner>>> {
108 static R: OnceLock<Mutex<HashMap<SocketAddr, Weak<ServerInner>>>> = OnceLock::new();
109 R.get_or_init(|| Mutex::new(HashMap::new()))
110}
111
112fn registry_register(addr: SocketAddr, server: &Arc<ServerInner>) {
113 registry().lock().insert(addr, Arc::downgrade(server));
114}
115
116fn registry_remove(addr: SocketAddr) {
117 registry().lock().remove(&addr);
118}
119
120fn registry_lookup(addr: SocketAddr) -> Option<Arc<ServerInner>> {
121 registry().lock().get(&addr).and_then(Weak::upgrade)
122}
123
124fn next_generation() -> u64 {
129 static G: AtomicU64 = AtomicU64::new(0);
130 G.fetch_add(1, Ordering::Relaxed)
131}
132
133fn next_conn_id() -> u64 {
135 static C: AtomicU64 = AtomicU64::new(0);
136 C.fetch_add(1, Ordering::Relaxed)
137}
138
139pub(crate) struct ServerInner {
141 pool: Arc<ServerPool>,
142 dispatcher: ClusterDispatcher,
143 stats: Arc<Stats>,
144 snapshot_cache: Arc<Mutex<Snapshot>>,
145 bus: EventBus,
146 events: Arc<EventManager>,
147 datastore: Box<dyn Datastore>,
148 seeds: Box<dyn SeedsProvider>,
149 metrics: Box<dyn MetricsSink>,
150 crypto: Option<Box<dyn CryptoProvider>>,
151 listen_addr: Option<SocketAddr>,
152 dyn_listen_addr: Option<SocketAddr>,
153 cancel: CancellationToken,
154 pool_name: String,
155 config: Mutex<ConfPool>,
156 generation: AtomicU64,
157 command_extension: Option<Arc<dyn crate::embed::CommandExtension>>,
158}
159
160impl std::fmt::Debug for ServerInner {
161 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
162 f.debug_struct("ServerInner")
163 .field("pool_name", &self.pool_name)
164 .field("listen", &self.listen_addr)
165 .field("dyn_listen", &self.dyn_listen_addr)
166 .finish_non_exhaustive()
167 }
168}
169
170impl ServerInner {
171 fn dispatch_local(&self, req: Msg) -> super::hooks::BoxFuture<'_, Result<Msg, EmbedError>> {
172 let fut = self.datastore.dispatch(req);
173 Box::pin(async move {
174 let rsp = fut.await.map_err(|e| EmbedError::Inject(e.to_string()))?;
175 Ok(rsp)
176 })
177 }
178}
179
180#[derive(Debug)]
187pub struct Server {
188 pool_name: String,
189 pool: ConfPool,
190 cluster: Arc<ServerPool>,
191 hooks: ServerHooks,
192 stats: Arc<Stats>,
193 command_extension: Option<Arc<dyn crate::embed::CommandExtension>>,
194}
195
196impl Server {
197 pub(crate) fn from_pool(
199 pool_name: String,
200 pool: ConfPool,
201 hooks: ServerHooks,
202 command_extension: Option<Arc<dyn crate::embed::CommandExtension>>,
203 ) -> Self {
204 let pool_cfg = PoolConfig::from_conf(&pool_name, &pool);
205 let local_peer = build_local_peer(&pool, &pool_cfg);
206 let mut peers = vec![local_peer];
207 if let Some(seeds) = pool.dyn_seeds.as_ref() {
208 let start = u32::try_from(peers.len()).unwrap_or(0);
209 peers.extend(peers_from_seeds(&pool_cfg, seeds, start));
210 }
211 let server_pool_arc = Arc::new(ServerPool::new(pool_cfg.clone(), peers));
212 server_pool_arc.preselect_remote_racks();
213
214 let stats = Arc::new(Stats::new(
215 ServiceInfo {
216 source: pool_cfg.name.clone(),
217 version: env!("CARGO_PKG_VERSION").to_string(),
218 rack: pool_cfg.rack.clone(),
219 dc: pool_cfg.dc.clone(),
220 },
221 PoolStats::new(&pool_cfg.name),
222 ServerStats::new("backend"),
223 ));
224
225 Self {
226 pool_name,
227 pool,
228 cluster: server_pool_arc,
229 hooks,
230 stats,
231 command_extension,
232 }
233 }
234
235 #[must_use]
238 pub fn command_extension(&self) -> Option<&Arc<dyn crate::embed::CommandExtension>> {
239 self.command_extension.as_ref()
240 }
241
242 #[must_use]
244 pub fn pool_name(&self) -> &str {
245 &self.pool_name
246 }
247
248 pub async fn start(self) -> Result<ServerHandle, EmbedError> {
289 let Server {
290 pool_name,
291 pool,
292 cluster,
293 mut hooks,
294 stats,
295 command_extension,
296 } = self;
297
298 let mut dispatcher = ClusterDispatcher::new(cluster.clone());
299 if let Some(ext) = command_extension.as_ref() {
300 dispatcher = dispatcher.with_command_extension(ext.clone());
301 }
302 let bus = EventBus::new(64);
303 let events = Arc::new(EventManager::new(64));
304 let cancel = CancellationToken::new();
305 let snapshot_cache = Arc::new(Mutex::new(Snapshot::default()));
306
307 let datastore = hooks
308 .datastore
309 .take()
310 .expect("invariant: builder always populates datastore");
311 let seeds = hooks
312 .seeds
313 .take()
314 .expect("invariant: builder always populates seeds");
315 let metrics = hooks
316 .metrics
317 .take()
318 .unwrap_or_else(|| Box::new(LoggingMetricsSink::new(pool_name.clone())));
319
320 let (listen_listener, listen_addr) = bind_listener(pool.listen.as_ref()).await?;
321 let (dyn_listener, dyn_listen_addr) = bind_listener(pool.dyn_listen.as_ref()).await?;
322
323 let inner = Arc::new(ServerInner {
324 pool: cluster.clone(),
325 dispatcher,
326 stats: stats.clone(),
327 snapshot_cache: snapshot_cache.clone(),
328 bus: bus.clone(),
329 events: events.clone(),
330 datastore,
331 seeds,
332 metrics,
333 crypto: hooks.crypto,
334 listen_addr,
335 dyn_listen_addr,
336 cancel: cancel.clone(),
337 pool_name: pool_name.clone(),
338 config: Mutex::new(pool.clone()),
339 generation: AtomicU64::new(0),
340 command_extension,
341 });
342
343 if let Some(addr) = inner.dyn_listen_addr {
346 registry_register(addr, &inner);
347 }
348
349 let mut tasks: Vec<JoinHandle<()>> = Vec::new();
351 tasks.push(tokio::spawn(stats_loop(inner.clone(), pool.stats_interval)));
352 tasks.push(tokio::spawn(metrics_loop(inner.clone())));
353 if pool.enable_gossip.unwrap_or(false) {
354 tasks.push(tokio::spawn(gossip_loop(
355 inner.clone(),
356 Duration::from_millis(
357 u64::try_from(pool.gos_interval.unwrap_or(1_000)).unwrap_or(1_000),
358 ),
359 )));
360 }
361 if let (Some(listener), Some(addr)) = (listen_listener, listen_addr) {
362 tasks.push(tokio::spawn(accept_loop(
363 inner.clone(),
364 listener,
365 addr,
366 ConnRoleTag::Proxy,
367 )));
368 }
369 if let (Some(listener), Some(addr)) = (dyn_listener, dyn_listen_addr) {
370 tasks.push(tokio::spawn(accept_loop(
371 inner.clone(),
372 listener,
373 addr,
374 ConnRoleTag::DnodeProxy,
375 )));
376 }
377
378 Ok(ServerHandle {
379 inner,
380 tasks: Arc::new(Mutex::new(tasks)),
381 })
382 }
383}
384
385#[derive(Clone)]
392pub struct ServerHandle {
393 inner: Arc<ServerInner>,
394 tasks: Arc<Mutex<Vec<JoinHandle<()>>>>,
395}
396
397impl std::fmt::Debug for ServerHandle {
398 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
399 f.debug_struct("ServerHandle")
400 .field("pool", &self.inner.pool_name)
401 .field("listen", &self.inner.listen_addr)
402 .field("dyn_listen", &self.inner.dyn_listen_addr)
403 .finish_non_exhaustive()
404 }
405}
406
407impl ServerHandle {
408 #[must_use]
411 pub fn crypto_provider(&self) -> Option<&dyn CryptoProvider> {
412 self.inner.crypto.as_deref()
413 }
414
415 #[must_use]
418 pub fn command_extension(&self) -> Option<Arc<dyn crate::embed::CommandExtension>> {
419 self.inner.command_extension.as_ref().map(Arc::clone)
420 }
421
422 #[must_use]
425 pub fn listen_addr(&self) -> Option<SocketAddr> {
426 self.inner.listen_addr
427 }
428
429 #[must_use]
432 pub fn dyn_listen_addr(&self) -> Option<SocketAddr> {
433 self.inner.dyn_listen_addr
434 }
435
436 #[must_use]
457 pub fn subscribe_events(&self) -> EventStream {
458 self.inner.bus.subscribe()
459 }
460
461 #[must_use]
469 pub fn events(&self) -> Arc<EventManager> {
470 Arc::clone(&self.inner.events)
471 }
472
473 #[must_use]
475 pub fn stats(&self) -> Snapshot {
476 self.inner.stats.snapshot()
477 }
478
479 #[must_use]
494 pub fn stats_handle(&self) -> Arc<Stats> {
495 Arc::clone(&self.inner.stats)
496 }
497
498 #[must_use]
500 pub fn describe_stats(&self) -> Vec<MetricSpec> {
501 let _ = describe_stats(); let mut out: Vec<MetricSpec> = Vec::new();
503 out.extend(crate::stats::POOL_CODEC.iter().copied());
504 out.extend(crate::stats::SERVER_CODEC.iter().copied());
505 out
506 }
507
508 #[must_use]
510 pub fn peers(&self) -> Vec<PeerSnapshot> {
511 self.inner
512 .pool
513 .peers()
514 .read()
515 .iter()
516 .map(PeerSnapshot::from)
517 .collect()
518 }
519
520 #[must_use]
522 pub fn datacenters(&self) -> Vec<DatacenterSnapshot> {
523 self.inner
524 .pool
525 .datacenters()
526 .read()
527 .iter()
528 .map(DatacenterSnapshot::from)
529 .collect()
530 }
531
532 #[must_use]
534 pub fn ring(&self) -> RingSnapshot {
535 let dcs = self.inner.pool.datacenters().read();
536 let mut entries: Vec<(DynToken, u32)> = Vec::new();
537 for dc in dcs.iter() {
538 for rack in dc.racks() {
539 for c in rack.continuums() {
540 entries.push((c.token.clone(), c.peer_idx));
541 }
542 }
543 }
544 RingSnapshot {
545 entries,
546 generation: self.inner.generation.load(Ordering::Relaxed),
547 }
548 }
549
550 pub async fn inject_request(&self, req: Msg) -> Result<Msg, EmbedError> {
562 let key: Vec<u8> = req
566 .keys()
567 .first()
568 .map(|kp| kp.tag_bytes().to_vec())
569 .unwrap_or_default();
570 let plan = self.inner.dispatcher.plan(&req, &key);
571 match plan {
572 DispatchPlan::Drop => {
573 let mut rsp = Msg::new(req.id(), crate::msg::MsgType::Unknown, false);
576 rsp.set_parent_id(req.id());
577 Ok(rsp)
578 }
579 DispatchPlan::NoTargets => Err(EmbedError::Inject(
580 "cluster has no quorum-eligible targets".into(),
581 )),
582 DispatchPlan::LocalDatastore => self.inner.dispatch_local(req).await,
583 DispatchPlan::Replicas { targets, .. } => {
584 self.inner.stats.pool_incr_by(PoolField::ForwardError, 0); self.inner.stats.server_incr(ServerField::ReadRequests);
586 let resolved: Vec<(bool, Option<SocketAddr>)> = {
590 let peers = self.inner.pool.peers().read();
591 targets
592 .iter()
593 .map(|t| {
594 let peer = peers.get(t.peer_idx as usize);
595 let is_local = peer.is_some_and(crate::cluster::peer::Peer::is_local);
596 let addr = peer.and_then(|p| p.endpoint().pname().parse().ok());
597 (is_local, addr)
598 })
599 .collect()
600 };
601 let mut last_err: Option<EmbedError> = None;
602 for (is_local, addr) in resolved {
603 if is_local {
604 return self.inner.dispatch_local(req).await;
605 }
606 if let Some(addr) = addr {
607 if let Some(remote) = registry_lookup(addr) {
608 let mut forwarded = Msg::new(req.id(), req.ty(), true);
609 forwarded.set_parent_id(req.parent_id());
610 match remote.datastore.dispatch(forwarded).await {
611 Ok(rsp) => return Ok(rsp),
612 Err(e) => last_err = Some(EmbedError::Inject(e.to_string())),
613 }
614 }
615 }
616 }
617 if let Some(e) = last_err {
618 return Err(e);
619 }
620 self.inner.dispatch_local(req).await
624 }
625 }
626 }
627
628 pub async fn reload(&self, mut cfg: Config) -> Result<(), EmbedError> {
637 cfg.finalize();
638 cfg.validate()?;
639 let new_pool = cfg.pool().clone();
640 *self.inner.config.lock() = new_pool;
645 let gen_id = next_generation();
646 self.inner.generation.store(gen_id, Ordering::Relaxed);
647 self.inner
648 .bus
649 .send(ServerEvent::ConfigReloaded { generation: gen_id });
650 tokio::task::yield_now().await;
653 Ok(())
654 }
655
656 pub async fn shutdown(&self) -> Result<(), EmbedError> {
664 self.inner.cancel.cancel();
665 if let Some(addr) = self.inner.dyn_listen_addr {
666 registry_remove(addr);
667 }
668 let drained: Vec<JoinHandle<()>> = std::mem::take(&mut *self.tasks.lock());
669 for t in drained {
670 let _ = t.await;
672 }
673 Ok(())
674 }
675
676 pub async fn join(&self) {
712 let drained: Vec<JoinHandle<()>> = std::mem::take(&mut *self.tasks.lock());
713 for t in drained {
714 let _ = t.await;
715 }
716 }
717}
718
719fn build_local_peer(pool: &ConfPool, cfg: &PoolConfig) -> Peer {
722 let dyn_listen = pool.dyn_listen.as_ref().map_or_else(
723 || ("127.0.0.1".to_string(), 0u16),
724 |l| (l.name().to_string(), l.port()),
725 );
726 let tokens = pool
727 .tokens
728 .as_ref()
729 .map(|tl| {
730 tl.components()
731 .iter()
732 .map(|c| DynToken::from_u32(c.digits.parse::<u32>().unwrap_or(0)))
733 .collect::<Vec<_>>()
734 })
735 .unwrap_or_default();
736 let mut peer = Peer::new(
737 0,
738 PeerEndpoint::tcp(dyn_listen.0, dyn_listen.1),
739 cfg.rack.clone(),
740 cfg.dc.clone(),
741 if tokens.is_empty() {
742 vec![DynToken::from_u32(0)]
743 } else {
744 tokens
745 },
746 true,
747 true,
748 false,
749 );
750 let now_secs = std::time::SystemTime::now()
751 .duration_since(std::time::UNIX_EPOCH)
752 .map(|d| d.as_secs())
753 .unwrap_or(0);
754 peer.set_state(PeerState::Normal, now_secs);
755 peer
756}
757
758fn peers_from_seeds(cfg: &PoolConfig, seeds: &[ConfDynSeed], start_idx: u32) -> Vec<Peer> {
759 let now_secs = std::time::SystemTime::now()
760 .duration_since(std::time::UNIX_EPOCH)
761 .map(|d| d.as_secs())
762 .unwrap_or(0);
763 seeds
764 .iter()
765 .enumerate()
766 .map(|(i, s)| {
767 let tokens: Vec<DynToken> = s
768 .tokens()
769 .components()
770 .iter()
771 .map(|c| DynToken::from_u32(c.digits.parse::<u32>().unwrap_or(0)))
772 .collect();
773 let idx_off = u32::try_from(i).unwrap_or(0);
774 let mut p = Peer::new(
775 start_idx + idx_off,
776 PeerEndpoint::tcp(s.host().to_string(), s.port()),
777 s.rack().to_string(),
778 s.dc().to_string(),
779 if tokens.is_empty() {
780 vec![DynToken::from_u32(0)]
781 } else {
782 tokens
783 },
784 false,
785 s.dc() == cfg.dc,
786 false,
787 );
788 p.set_state(PeerState::Normal, now_secs);
789 p
790 })
791 .collect()
792}
793
794async fn bind_listener(
797 listen: Option<&crate::conf::ConfListen>,
798) -> Result<(Option<TcpListener>, Option<SocketAddr>), EmbedError> {
799 let Some(l) = listen else {
800 return Ok((None, None));
801 };
802 let host = l.name();
803 let port = l.port();
804 if host.is_empty() {
805 return Ok((None, None));
806 }
807 let addr_str = format!("{host}:{port}");
808 let Ok(_addr) = addr_str.parse::<SocketAddr>() else {
809 return Ok((None, None));
810 };
811 let listener = TcpListener::bind(&addr_str).await?;
812 let local = listener.local_addr()?;
813 Ok((Some(listener), Some(local)))
814}
815
816async fn accept_loop(
819 inner: Arc<ServerInner>,
820 listener: TcpListener,
821 addr: SocketAddr,
822 role: ConnRoleTag,
823) {
824 loop {
825 tokio::select! {
826 biased;
827 () = inner.cancel.cancelled() => return,
828 res = listener.accept() => {
829 let Ok((sock, peer)) = res else { return };
830 let conn_id = next_conn_id();
831 inner.bus.send(ServerEvent::ConnectionAccepted {
832 conn_id,
833 role,
834 local_addr: Some(addr),
835 });
836 tracing::warn!(
845 listen = %addr,
846 peer = %peer,
847 role = ?role,
848 conn_id,
849 "embedded listen_addr accepted a connection; embedded mode does not yet \
850 forward to the dispatcher; use ServerHandle::inject_request instead. \
851 Closing connection."
852 );
853 let bus = inner.bus.clone();
854 let cancel = inner.cancel.clone();
855 tokio::spawn(async move {
856 let _ = sock; let close_reason = if cancel.is_cancelled() {
858 CloseReason::LocalClose
859 } else {
860 CloseReason::PeerEof
861 };
862 bus.send(ServerEvent::ConnectionClosed { conn_id, reason: close_reason });
863 });
864 }
865 }
866 }
867}
868
869async fn stats_loop(inner: Arc<ServerInner>, interval_ms: Option<i64>) {
870 let interval =
871 Duration::from_millis(u64::try_from(interval_ms.unwrap_or(1_000)).unwrap_or(1_000));
872 let mut ticker = tokio::time::interval(interval);
873 ticker.set_missed_tick_behavior(MissedTickBehavior::Delay);
874 loop {
875 tokio::select! {
876 biased;
877 () = inner.cancel.cancelled() => return,
878 _ = ticker.tick() => {
879 let snap = inner.stats.snapshot();
880 *inner.snapshot_cache.lock() = snap;
881 }
882 }
883 }
884}
885
886async fn metrics_loop(inner: Arc<ServerInner>) {
887 let interval = inner.metrics.flush_interval();
888 let mut ticker = tokio::time::interval(interval.max(Duration::from_millis(50)));
889 ticker.set_missed_tick_behavior(MissedTickBehavior::Delay);
890 loop {
891 tokio::select! {
892 biased;
893 () = inner.cancel.cancelled() => return,
894 _ = ticker.tick() => {
895 let snap = inner.snapshot_cache.lock().clone();
896 let _ = inner.metrics.emit(&snap).await;
897 }
898 }
899 }
900}
901
902async fn gossip_loop(inner: Arc<ServerInner>, interval: Duration) {
903 let mut ticker = tokio::time::interval(interval.max(Duration::from_millis(20)));
904 ticker.set_missed_tick_behavior(MissedTickBehavior::Delay);
905 let mut round: u64 = 0;
906 let mut known: std::collections::HashSet<(String, u16)> = std::collections::HashSet::new();
907 {
908 let peers = inner.pool.peers().read();
909 for p in peers.iter() {
910 known.insert((p.endpoint().host().to_string(), p.endpoint().port()));
911 }
912 }
913 loop {
914 tokio::select! {
915 biased;
916 () = inner.cancel.cancelled() => return,
917 _ = ticker.tick() => {
918 let round_started = std::time::Instant::now();
919 let round_ts = std::time::SystemTime::now();
920 round += 1;
921 let seeds = inner.seeds.fetch().unwrap_or_default();
922 let mut added: u32 = 0;
923 {
924 let mut peers = inner.pool.peers().write();
925 let now_secs = std::time::SystemTime::now()
926 .duration_since(std::time::UNIX_EPOCH)
927 .map(|d| d.as_secs())
928 .unwrap_or(0);
929 let cfg = inner.pool.config().clone();
930 for seed in &seeds {
931 let key = (seed.host().to_string(), seed.port());
932 if known.contains(&key) {
933 continue;
934 }
935 let next_idx = u32::try_from(peers.len()).unwrap_or(u32::MAX);
936 let tokens: Vec<DynToken> = seed
937 .tokens()
938 .components()
939 .iter()
940 .map(|c| DynToken::from_u32(c.digits.parse::<u32>().unwrap_or(0)))
941 .collect();
942 let mut p = Peer::new(
943 next_idx,
944 PeerEndpoint::tcp(seed.host().to_string(), seed.port()),
945 seed.rack().to_string(),
946 seed.dc().to_string(),
947 if tokens.is_empty() {
948 vec![DynToken::from_u32(0)]
949 } else {
950 tokens
951 },
952 false,
953 seed.dc() == cfg.dc,
954 false,
955 );
956 p.set_state(PeerState::Normal, now_secs);
957 peers.push(p);
958 known.insert(key);
959 added += 1;
960 }
961 }
962 if added > 0 {
963 rebuild_topology(&inner.pool);
967 let peers_now = inner.pool.peers().read();
968 for (idx, p) in peers_now.iter().enumerate().rev().take(added as usize) {
969 let _ = idx;
970 inner.bus.send(ServerEvent::PeerUp(p.idx()));
971 inner.events.publish(crate::events::ClusterEvent::PeerUp {
972 peer_id: p.idx(),
973 dc: p.dc().to_string(),
974 ts: round_ts,
975 });
976 }
977 inner.events.publish(crate::events::ClusterEvent::RingChanged {
978 tag: "seed-discovery".to_string(),
979 ts: round_ts,
980 });
981 }
982 let count = u32::try_from(inner.pool.peers().read().len()).unwrap_or(u32::MAX);
983 inner.bus.send(ServerEvent::GossipRound { round, peers: count });
984 inner.events.publish(crate::events::ClusterEvent::GossipRoundComplete {
985 duration: round_started.elapsed(),
986 peers_seen: count as usize,
987 ts: round_ts,
988 });
989 inner.stats.pool_incr(PoolField::StatsCount);
990 }
991 }
992 }
993}
994
995fn rebuild_topology(pool: &Arc<ServerPool>) {
996 use crate::cluster::Datacenter;
997 let peers = pool.peers().read();
998 let mut new_dcs: Vec<Datacenter> = Vec::new();
999 for p in peers.iter() {
1000 let idx = if let Some(i) = new_dcs.iter().position(|d| d.name() == p.dc()) {
1001 i
1002 } else {
1003 new_dcs.push(Datacenter::new(p.dc().to_string()));
1004 new_dcs.len() - 1
1005 };
1006 new_dcs[idx].upsert_rack(p.rack().to_string());
1007 }
1008 drop(peers);
1009 {
1010 let mut dcs = pool.datacenters().write();
1011 *dcs = new_dcs;
1012 }
1013 pool.rebuild_ring();
1014 pool.preselect_remote_racks();
1015}
1016
1017