1use std::collections::HashMap;
16use std::net::SocketAddr;
17use std::sync::atomic::{AtomicU64, Ordering};
18use std::sync::{Arc, OnceLock, Weak};
19
20use parking_lot::Mutex;
21use tokio::net::TcpListener;
22use tokio::task::JoinHandle;
23use tokio::time::{Duration, MissedTickBehavior};
24use tokio_util::sync::CancellationToken;
25
26use crate::cluster::dispatch::{ClusterDispatcher, DispatchPlan};
27use crate::cluster::peer::{Peer, PeerEndpoint, PeerState};
28use crate::cluster::pool::{PoolConfig, ServerPool};
29use crate::conf::{ConfDynSeed, ConfPool, Config};
30use crate::embed::error::EmbedError;
31use crate::embed::events::{CloseReason, ConnRoleTag, EventBus, EventStream, ServerEvent};
32use crate::embed::hooks::{
33 CryptoProvider, Datastore, LoggingMetricsSink, MetricsSink, SeedsProvider,
34};
35use crate::embed::snapshots::{DatacenterSnapshot, PeerSnapshot, RingSnapshot};
36use crate::events::EventManager;
37use crate::hashkit::DynToken;
38use crate::msg::Msg;
39use crate::stats::{
40 describe_stats, MetricSpec, PoolField, PoolStats, ServerField, ServerStats, ServiceInfo,
41 Snapshot, Stats,
42};
43
44#[non_exhaustive]
52pub struct ServerHooks {
53 pub(crate) datastore: Option<Box<dyn Datastore>>,
54 pub(crate) seeds: Option<Box<dyn SeedsProvider>>,
55 pub(crate) crypto: Option<Box<dyn CryptoProvider>>,
56 pub(crate) metrics: Option<Box<dyn MetricsSink>>,
57}
58
59impl ServerHooks {
60 #[must_use]
62 pub fn datastore(&self) -> Option<&dyn Datastore> {
63 self.datastore.as_deref()
64 }
65
66 #[must_use]
68 pub fn seeds(&self) -> Option<&dyn SeedsProvider> {
69 self.seeds.as_deref()
70 }
71
72 #[must_use]
74 pub fn crypto(&self) -> Option<&dyn CryptoProvider> {
75 self.crypto.as_deref()
76 }
77
78 #[must_use]
80 pub fn metrics(&self) -> Option<&dyn MetricsSink> {
81 self.metrics.as_deref()
82 }
83}
84
85impl std::fmt::Debug for ServerHooks {
86 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
87 f.debug_struct("ServerHooks")
88 .field("datastore", &self.datastore.is_some())
89 .field("seeds", &self.seeds.is_some())
90 .field("crypto", &self.crypto.is_some())
91 .field("metrics", &self.metrics.is_some())
92 .finish()
93 }
94}
95
96fn registry() -> &'static Mutex<HashMap<SocketAddr, Weak<ServerInner>>> {
108 static R: OnceLock<Mutex<HashMap<SocketAddr, Weak<ServerInner>>>> = OnceLock::new();
109 R.get_or_init(|| Mutex::new(HashMap::new()))
110}
111
112fn registry_register(addr: SocketAddr, server: &Arc<ServerInner>) {
113 registry().lock().insert(addr, Arc::downgrade(server));
114}
115
116fn registry_remove(addr: SocketAddr) {
117 registry().lock().remove(&addr);
118}
119
120fn registry_lookup(addr: SocketAddr) -> Option<Arc<ServerInner>> {
121 registry().lock().get(&addr).and_then(Weak::upgrade)
122}
123
124fn next_generation() -> u64 {
129 static G: AtomicU64 = AtomicU64::new(0);
130 G.fetch_add(1, Ordering::Relaxed)
131}
132
133fn next_conn_id() -> u64 {
135 static C: AtomicU64 = AtomicU64::new(0);
136 C.fetch_add(1, Ordering::Relaxed)
137}
138
139pub(crate) struct ServerInner {
141 pool: Arc<ServerPool>,
142 dispatcher: ClusterDispatcher,
143 stats: Arc<Stats>,
144 snapshot_cache: Arc<Mutex<Snapshot>>,
145 bus: EventBus,
146 events: Arc<EventManager>,
147 datastore: Box<dyn Datastore>,
148 seeds: Box<dyn SeedsProvider>,
149 metrics: Box<dyn MetricsSink>,
150 crypto: Option<Box<dyn CryptoProvider>>,
151 listen_addr: Option<SocketAddr>,
152 dyn_listen_addr: Option<SocketAddr>,
153 cancel: CancellationToken,
154 pool_name: String,
155 config: Mutex<ConfPool>,
156 generation: AtomicU64,
157 command_extension: Option<Arc<dyn crate::embed::CommandExtension>>,
158}
159
160impl std::fmt::Debug for ServerInner {
161 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
162 f.debug_struct("ServerInner")
163 .field("pool_name", &self.pool_name)
164 .field("listen", &self.listen_addr)
165 .field("dyn_listen", &self.dyn_listen_addr)
166 .finish_non_exhaustive()
167 }
168}
169
170impl ServerInner {
171 fn dispatch_local(&self, req: Msg) -> super::hooks::BoxFuture<'_, Result<Msg, EmbedError>> {
172 let fut = self.datastore.dispatch(req);
173 Box::pin(async move {
174 let rsp = fut.await.map_err(|e| EmbedError::Inject(e.to_string()))?;
175 Ok(rsp)
176 })
177 }
178}
179
180#[derive(Debug)]
187pub struct Server {
188 pool_name: String,
189 pool: ConfPool,
190 cluster: Arc<ServerPool>,
191 hooks: ServerHooks,
192 stats: Arc<Stats>,
193 command_extension: Option<Arc<dyn crate::embed::CommandExtension>>,
194}
195
196impl Server {
197 pub(crate) fn from_pool(
199 pool_name: String,
200 pool: ConfPool,
201 hooks: ServerHooks,
202 command_extension: Option<Arc<dyn crate::embed::CommandExtension>>,
203 ) -> Self {
204 let pool_cfg = PoolConfig::from_conf(&pool_name, &pool);
205 let local_peer = build_local_peer(&pool, &pool_cfg);
206 let mut peers = vec![local_peer];
207 if let Some(seeds) = pool.dyn_seeds.as_ref() {
208 let start = u32::try_from(peers.len()).unwrap_or(0);
209 peers.extend(peers_from_seeds(&pool_cfg, seeds, start));
210 }
211 let server_pool_arc = Arc::new(ServerPool::new(pool_cfg.clone(), peers));
212 server_pool_arc.preselect_remote_racks();
213
214 let stats = Arc::new(Stats::new(
215 ServiceInfo {
216 source: pool_cfg.name.clone(),
217 version: env!("CARGO_PKG_VERSION").to_string(),
218 rack: pool_cfg.rack.clone(),
219 dc: pool_cfg.dc.clone(),
220 },
221 PoolStats::new(&pool_cfg.name),
222 ServerStats::new("backend"),
223 ));
224
225 Self {
226 pool_name,
227 pool,
228 cluster: server_pool_arc,
229 hooks,
230 stats,
231 command_extension,
232 }
233 }
234
235 #[must_use]
238 pub fn command_extension(&self) -> Option<&Arc<dyn crate::embed::CommandExtension>> {
239 self.command_extension.as_ref()
240 }
241
242 #[must_use]
244 pub fn pool_name(&self) -> &str {
245 &self.pool_name
246 }
247
248 pub async fn start(self) -> Result<ServerHandle, EmbedError> {
289 let Server {
290 pool_name,
291 pool,
292 cluster,
293 mut hooks,
294 stats,
295 command_extension,
296 } = self;
297
298 let mut dispatcher = ClusterDispatcher::new(cluster.clone());
299 if let Some(ext) = command_extension.as_ref() {
300 dispatcher = dispatcher.with_command_extension(ext.clone());
301 }
302 let bus = EventBus::new(64);
303 let events = Arc::new(EventManager::new(64));
304 let cancel = CancellationToken::new();
305 let snapshot_cache = Arc::new(Mutex::new(Snapshot::default()));
306
307 let datastore = hooks
308 .datastore
309 .take()
310 .expect("invariant: builder always populates datastore");
311 let seeds = hooks
312 .seeds
313 .take()
314 .expect("invariant: builder always populates seeds");
315 let metrics = hooks
316 .metrics
317 .take()
318 .unwrap_or_else(|| Box::new(LoggingMetricsSink::new(pool_name.clone())));
319
320 let transport = pool.transport.unwrap_or_default();
321 let (listen_listener, listen_addr) =
322 bind_listener(pool.listen.as_ref(), transport, &pool).await?;
323 let (dyn_listener, dyn_listen_addr) =
324 bind_listener(pool.dyn_listen.as_ref(), transport, &pool).await?;
325
326 let inner = Arc::new(ServerInner {
327 pool: cluster.clone(),
328 dispatcher,
329 stats: stats.clone(),
330 snapshot_cache: snapshot_cache.clone(),
331 bus: bus.clone(),
332 events: events.clone(),
333 datastore,
334 seeds,
335 metrics,
336 crypto: hooks.crypto,
337 listen_addr,
338 dyn_listen_addr,
339 cancel: cancel.clone(),
340 pool_name: pool_name.clone(),
341 config: Mutex::new(pool.clone()),
342 generation: AtomicU64::new(0),
343 command_extension,
344 });
345
346 if let Some(addr) = inner.dyn_listen_addr {
349 registry_register(addr, &inner);
350 }
351
352 let mut tasks: Vec<JoinHandle<()>> = Vec::new();
354 tasks.push(tokio::spawn(stats_loop(inner.clone(), pool.stats_interval)));
355 tasks.push(tokio::spawn(metrics_loop(inner.clone())));
356 if pool.enable_gossip.unwrap_or(false) {
357 tasks.push(tokio::spawn(gossip_loop(
358 inner.clone(),
359 Duration::from_millis(
360 u64::try_from(pool.gos_interval.unwrap_or(1_000)).unwrap_or(1_000),
361 ),
362 )));
363 }
364 if let (Some(listener), Some(addr)) = (listen_listener, listen_addr) {
365 tasks.push(tokio::spawn(accept_loop(
366 inner.clone(),
367 listener,
368 addr,
369 ConnRoleTag::Proxy,
370 )));
371 }
372 if let (Some(listener), Some(addr)) = (dyn_listener, dyn_listen_addr) {
373 tasks.push(tokio::spawn(accept_loop(
374 inner.clone(),
375 listener,
376 addr,
377 ConnRoleTag::DnodeProxy,
378 )));
379 }
380
381 Ok(ServerHandle {
382 inner,
383 tasks: Arc::new(Mutex::new(tasks)),
384 })
385 }
386}
387
388#[derive(Clone)]
395pub struct ServerHandle {
396 inner: Arc<ServerInner>,
397 tasks: Arc<Mutex<Vec<JoinHandle<()>>>>,
398}
399
400impl std::fmt::Debug for ServerHandle {
401 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
402 f.debug_struct("ServerHandle")
403 .field("pool", &self.inner.pool_name)
404 .field("listen", &self.inner.listen_addr)
405 .field("dyn_listen", &self.inner.dyn_listen_addr)
406 .finish_non_exhaustive()
407 }
408}
409
410impl ServerHandle {
411 #[must_use]
414 pub fn crypto_provider(&self) -> Option<&dyn CryptoProvider> {
415 self.inner.crypto.as_deref()
416 }
417
418 #[must_use]
421 pub fn command_extension(&self) -> Option<Arc<dyn crate::embed::CommandExtension>> {
422 self.inner.command_extension.as_ref().map(Arc::clone)
423 }
424
425 #[must_use]
428 pub fn listen_addr(&self) -> Option<SocketAddr> {
429 self.inner.listen_addr
430 }
431
432 #[must_use]
435 pub fn dyn_listen_addr(&self) -> Option<SocketAddr> {
436 self.inner.dyn_listen_addr
437 }
438
439 #[must_use]
460 pub fn subscribe_events(&self) -> EventStream {
461 self.inner.bus.subscribe()
462 }
463
464 #[must_use]
472 pub fn events(&self) -> Arc<EventManager> {
473 Arc::clone(&self.inner.events)
474 }
475
476 #[must_use]
478 pub fn stats(&self) -> Snapshot {
479 self.inner.stats.snapshot()
480 }
481
482 #[must_use]
497 pub fn stats_handle(&self) -> Arc<Stats> {
498 Arc::clone(&self.inner.stats)
499 }
500
501 #[must_use]
503 pub fn describe_stats(&self) -> Vec<MetricSpec> {
504 let _ = describe_stats(); let mut out: Vec<MetricSpec> = Vec::new();
506 out.extend(crate::stats::POOL_CODEC.iter().copied());
507 out.extend(crate::stats::SERVER_CODEC.iter().copied());
508 out
509 }
510
511 #[must_use]
513 pub fn peers(&self) -> Vec<PeerSnapshot> {
514 self.inner
515 .pool
516 .peers()
517 .read()
518 .iter()
519 .map(PeerSnapshot::from)
520 .collect()
521 }
522
523 #[must_use]
525 pub fn datacenters(&self) -> Vec<DatacenterSnapshot> {
526 self.inner
527 .pool
528 .datacenters()
529 .read()
530 .iter()
531 .map(DatacenterSnapshot::from)
532 .collect()
533 }
534
535 #[must_use]
537 pub fn ring(&self) -> RingSnapshot {
538 let dcs = self.inner.pool.datacenters().read();
539 let mut entries: Vec<(DynToken, u32)> = Vec::new();
540 for dc in dcs.iter() {
541 for rack in dc.racks() {
542 for c in rack.continuums() {
543 entries.push((c.token.clone(), c.peer_idx));
544 }
545 }
546 }
547 RingSnapshot {
548 entries,
549 generation: self.inner.generation.load(Ordering::Relaxed),
550 }
551 }
552
553 pub async fn inject_request(&self, req: Msg) -> Result<Msg, EmbedError> {
565 let key: Vec<u8> = req
569 .keys()
570 .first()
571 .map(|kp| kp.tag_bytes().to_vec())
572 .unwrap_or_default();
573 let plan = self.inner.dispatcher.plan(&req, &key);
574 match plan {
575 DispatchPlan::Drop => {
576 let mut rsp = Msg::new(req.id(), crate::msg::MsgType::Unknown, false);
579 rsp.set_parent_id(req.id());
580 Ok(rsp)
581 }
582 DispatchPlan::NoTargets => Err(EmbedError::Inject(
583 "cluster has no quorum-eligible targets".into(),
584 )),
585 DispatchPlan::LocalDatastore => self.inner.dispatch_local(req).await,
586 DispatchPlan::Replicas { targets, .. } => {
587 self.inner.stats.pool_incr_by(PoolField::ForwardError, 0); self.inner.stats.server_incr(ServerField::ReadRequests);
589 let resolved: Vec<(bool, Option<SocketAddr>)> = {
593 let peers = self.inner.pool.peers().read();
594 targets
595 .iter()
596 .map(|t| {
597 let peer = peers.get(t.peer_idx as usize);
598 let is_local = peer.is_some_and(crate::cluster::peer::Peer::is_local);
599 let addr = peer.and_then(|p| p.endpoint().pname().parse().ok());
600 (is_local, addr)
601 })
602 .collect()
603 };
604 let mut last_err: Option<EmbedError> = None;
605 for (is_local, addr) in resolved {
606 if is_local {
607 return self.inner.dispatch_local(req).await;
608 }
609 if let Some(addr) = addr {
610 if let Some(remote) = registry_lookup(addr) {
611 let mut forwarded = Msg::new(req.id(), req.ty(), true);
612 forwarded.set_parent_id(req.parent_id());
613 match remote.datastore.dispatch(forwarded).await {
614 Ok(rsp) => return Ok(rsp),
615 Err(e) => last_err = Some(EmbedError::Inject(e.to_string())),
616 }
617 }
618 }
619 }
620 if let Some(e) = last_err {
621 return Err(e);
622 }
623 self.inner.dispatch_local(req).await
627 }
628 }
629 }
630
631 pub async fn reload(&self, mut cfg: Config) -> Result<(), EmbedError> {
640 cfg.finalize();
641 cfg.validate()?;
642 let new_pool = cfg.pool().clone();
643 *self.inner.config.lock() = new_pool;
648 let gen_id = next_generation();
649 self.inner.generation.store(gen_id, Ordering::Relaxed);
650 self.inner
651 .bus
652 .send(ServerEvent::ConfigReloaded { generation: gen_id });
653 tokio::task::yield_now().await;
656 Ok(())
657 }
658
659 pub async fn shutdown(&self) -> Result<(), EmbedError> {
667 self.inner.cancel.cancel();
668 if let Some(addr) = self.inner.dyn_listen_addr {
669 registry_remove(addr);
670 }
671 let drained: Vec<JoinHandle<()>> = std::mem::take(&mut *self.tasks.lock());
672 for t in drained {
673 let _ = t.await;
675 }
676 Ok(())
677 }
678
679 pub async fn join(&self) {
715 let drained: Vec<JoinHandle<()>> = std::mem::take(&mut *self.tasks.lock());
716 for t in drained {
717 let _ = t.await;
718 }
719 }
720}
721
722fn build_local_peer(pool: &ConfPool, cfg: &PoolConfig) -> Peer {
725 let dyn_listen = pool.dyn_listen.as_ref().map_or_else(
726 || ("127.0.0.1".to_string(), 0u16),
727 |l| (l.name().to_string(), l.port()),
728 );
729 let tokens = pool
730 .tokens
731 .as_ref()
732 .map(|tl| {
733 tl.components()
734 .iter()
735 .map(|c| DynToken::from_u32(c.digits.parse::<u32>().unwrap_or(0)))
736 .collect::<Vec<_>>()
737 })
738 .unwrap_or_default();
739 let mut peer = Peer::new(
740 0,
741 PeerEndpoint::tcp(dyn_listen.0, dyn_listen.1),
742 cfg.rack.clone(),
743 cfg.dc.clone(),
744 if tokens.is_empty() {
745 vec![DynToken::from_u32(0)]
746 } else {
747 tokens
748 },
749 true,
750 true,
751 false,
752 );
753 let now_secs = std::time::SystemTime::now()
754 .duration_since(std::time::UNIX_EPOCH)
755 .map(|d| d.as_secs())
756 .unwrap_or(0);
757 peer.set_state(PeerState::Normal, now_secs);
758 peer
759}
760
761fn peers_from_seeds(cfg: &PoolConfig, seeds: &[ConfDynSeed], start_idx: u32) -> Vec<Peer> {
762 let now_secs = std::time::SystemTime::now()
763 .duration_since(std::time::UNIX_EPOCH)
764 .map(|d| d.as_secs())
765 .unwrap_or(0);
766 seeds
767 .iter()
768 .enumerate()
769 .map(|(i, s)| {
770 let tokens: Vec<DynToken> = s
771 .tokens()
772 .components()
773 .iter()
774 .map(|c| DynToken::from_u32(c.digits.parse::<u32>().unwrap_or(0)))
775 .collect();
776 let idx_off = u32::try_from(i).unwrap_or(0);
777 let mut p = Peer::new(
778 start_idx + idx_off,
779 PeerEndpoint::tcp(s.host().to_string(), s.port()),
780 s.rack().to_string(),
781 s.dc().to_string(),
782 if tokens.is_empty() {
783 vec![DynToken::from_u32(0)]
784 } else {
785 tokens
786 },
787 false,
788 s.dc() == cfg.dc,
789 false,
790 );
791 p.set_state(PeerState::Normal, now_secs);
792 p
793 })
794 .collect()
795}
796
797async fn bind_listener(
800 listen: Option<&crate::conf::ConfListen>,
801 transport: crate::conf::Transport,
802 #[cfg_attr(not(feature = "quic"), allow(unused_variables))] pool: &crate::conf::ConfPool,
803) -> Result<(Option<TcpListener>, Option<SocketAddr>), EmbedError> {
804 let Some(l) = listen else {
805 return Ok((None, None));
806 };
807 let host = l.name();
808 let port = l.port();
809 if host.is_empty() {
810 return Ok((None, None));
811 }
812 let addr_str = format!("{host}:{port}");
813 let Ok(_addr) = addr_str.parse::<SocketAddr>() else {
814 return Ok((None, None));
815 };
816 match transport {
817 crate::conf::Transport::Tcp => {
818 let listener = TcpListener::bind(&addr_str).await?;
819 let local = listener.local_addr()?;
820 Ok((Some(listener), Some(local)))
821 }
822 #[cfg(feature = "quic")]
823 crate::conf::Transport::Quic => {
824 let cert = pool
834 .quic_cert_file
835 .as_ref()
836 .and_then(|p| p.to_str().map(str::to_owned))
837 .ok_or_else(|| {
838 EmbedError::Build(
839 "transport: quic requires quic_cert_file (UTF-8 path)".to_string(),
840 )
841 })?;
842 let key = pool
843 .quic_key_file
844 .as_ref()
845 .and_then(|p| p.to_str().map(str::to_owned))
846 .ok_or_else(|| {
847 EmbedError::Build(
848 "transport: quic requires quic_key_file (UTF-8 path)".to_string(),
849 )
850 })?;
851 let cfg = crate::net::QuicConfig::server_with_cert_paths(cert, key);
852 let parsed: SocketAddr = addr_str.parse().map_err(|e: std::net::AddrParseError| {
853 EmbedError::Build(format!("invalid quic listen address {addr_str}: {e}"))
854 })?;
855 let listener = crate::net::QuicListener::bind(parsed, cfg).await?;
856 let local = listener.local_addr();
857 drop(listener);
864 Ok((None, Some(local)))
865 }
866 #[cfg(not(feature = "quic"))]
867 crate::conf::Transport::Quic => Err(EmbedError::Build(
868 "transport: quic requires the engine's `quic` Cargo feature".to_string(),
869 )),
870 }
871}
872
873async fn accept_loop(
876 inner: Arc<ServerInner>,
877 listener: TcpListener,
878 addr: SocketAddr,
879 role: ConnRoleTag,
880) {
881 loop {
882 tokio::select! {
883 biased;
884 () = inner.cancel.cancelled() => return,
885 res = listener.accept() => {
886 let Ok((sock, peer)) = res else { return };
887 let conn_id = next_conn_id();
888 inner.bus.send(ServerEvent::ConnectionAccepted {
889 conn_id,
890 role,
891 local_addr: Some(addr),
892 });
893 tracing::warn!(
902 listen = %addr,
903 peer = %peer,
904 role = ?role,
905 conn_id,
906 "embedded listen_addr accepted a connection; embedded mode does not yet \
907 forward to the dispatcher; use ServerHandle::inject_request instead. \
908 Closing connection."
909 );
910 let bus = inner.bus.clone();
911 let cancel = inner.cancel.clone();
912 tokio::spawn(async move {
913 let _ = sock; let close_reason = if cancel.is_cancelled() {
915 CloseReason::LocalClose
916 } else {
917 CloseReason::PeerEof
918 };
919 bus.send(ServerEvent::ConnectionClosed { conn_id, reason: close_reason });
920 });
921 }
922 }
923 }
924}
925
926async fn stats_loop(inner: Arc<ServerInner>, interval_ms: Option<i64>) {
927 let interval =
928 Duration::from_millis(u64::try_from(interval_ms.unwrap_or(1_000)).unwrap_or(1_000));
929 let mut ticker = tokio::time::interval(interval);
930 ticker.set_missed_tick_behavior(MissedTickBehavior::Delay);
931 loop {
932 tokio::select! {
933 biased;
934 () = inner.cancel.cancelled() => return,
935 _ = ticker.tick() => {
936 let snap = inner.stats.snapshot();
937 *inner.snapshot_cache.lock() = snap;
938 }
939 }
940 }
941}
942
943async fn metrics_loop(inner: Arc<ServerInner>) {
944 let interval = inner.metrics.flush_interval();
945 let mut ticker = tokio::time::interval(interval.max(Duration::from_millis(50)));
946 ticker.set_missed_tick_behavior(MissedTickBehavior::Delay);
947 loop {
948 tokio::select! {
949 biased;
950 () = inner.cancel.cancelled() => return,
951 _ = ticker.tick() => {
952 let snap = inner.snapshot_cache.lock().clone();
953 let _ = inner.metrics.emit(&snap).await;
954 }
955 }
956 }
957}
958
959async fn gossip_loop(inner: Arc<ServerInner>, interval: Duration) {
960 let mut ticker = tokio::time::interval(interval.max(Duration::from_millis(20)));
961 ticker.set_missed_tick_behavior(MissedTickBehavior::Delay);
962 let mut round: u64 = 0;
963 let mut known: std::collections::HashSet<(String, u16)> = std::collections::HashSet::new();
964 {
965 let peers = inner.pool.peers().read();
966 for p in peers.iter() {
967 known.insert((p.endpoint().host().to_string(), p.endpoint().port()));
968 }
969 }
970 loop {
971 tokio::select! {
972 biased;
973 () = inner.cancel.cancelled() => return,
974 _ = ticker.tick() => {
975 let round_started = std::time::Instant::now();
976 let round_ts = std::time::SystemTime::now();
977 round += 1;
978 let seeds = inner.seeds.fetch().unwrap_or_default();
979 let mut added: u32 = 0;
980 {
981 let mut peers = inner.pool.peers().write();
982 let now_secs = std::time::SystemTime::now()
983 .duration_since(std::time::UNIX_EPOCH)
984 .map(|d| d.as_secs())
985 .unwrap_or(0);
986 let cfg = inner.pool.config().clone();
987 for seed in &seeds {
988 let key = (seed.host().to_string(), seed.port());
989 if known.contains(&key) {
990 continue;
991 }
992 let next_idx = u32::try_from(peers.len()).unwrap_or(u32::MAX);
993 let tokens: Vec<DynToken> = seed
994 .tokens()
995 .components()
996 .iter()
997 .map(|c| DynToken::from_u32(c.digits.parse::<u32>().unwrap_or(0)))
998 .collect();
999 let mut p = Peer::new(
1000 next_idx,
1001 PeerEndpoint::tcp(seed.host().to_string(), seed.port()),
1002 seed.rack().to_string(),
1003 seed.dc().to_string(),
1004 if tokens.is_empty() {
1005 vec![DynToken::from_u32(0)]
1006 } else {
1007 tokens
1008 },
1009 false,
1010 seed.dc() == cfg.dc,
1011 false,
1012 );
1013 p.set_state(PeerState::Normal, now_secs);
1014 peers.push(p);
1015 known.insert(key);
1016 added += 1;
1017 }
1018 }
1019 if added > 0 {
1020 rebuild_topology(&inner.pool);
1024 let peers_now = inner.pool.peers().read();
1025 for (idx, p) in peers_now.iter().enumerate().rev().take(added as usize) {
1026 let _ = idx;
1027 inner.bus.send(ServerEvent::PeerUp(p.idx()));
1028 inner.events.publish(crate::events::ClusterEvent::PeerUp {
1029 peer_id: p.idx(),
1030 dc: p.dc().to_string(),
1031 ts: round_ts,
1032 });
1033 }
1034 inner.events.publish(crate::events::ClusterEvent::RingChanged {
1035 tag: "seed-discovery".to_string(),
1036 ts: round_ts,
1037 });
1038 }
1039 let count = u32::try_from(inner.pool.peers().read().len()).unwrap_or(u32::MAX);
1040 inner.bus.send(ServerEvent::GossipRound { round, peers: count });
1041 inner.events.publish(crate::events::ClusterEvent::GossipRoundComplete {
1042 duration: round_started.elapsed(),
1043 peers_seen: count as usize,
1044 ts: round_ts,
1045 });
1046 inner.stats.pool_incr(PoolField::StatsCount);
1047 }
1048 }
1049 }
1050}
1051
1052fn rebuild_topology(pool: &Arc<ServerPool>) {
1053 use crate::cluster::Datacenter;
1054 let peers = pool.peers().read();
1055 let mut new_dcs: Vec<Datacenter> = Vec::new();
1056 for p in peers.iter() {
1057 let idx = if let Some(i) = new_dcs.iter().position(|d| d.name() == p.dc()) {
1058 i
1059 } else {
1060 new_dcs.push(Datacenter::new(p.dc().to_string()));
1061 new_dcs.len() - 1
1062 };
1063 new_dcs[idx].upsert_rack(p.rack().to_string());
1064 }
1065 drop(peers);
1066 {
1067 let mut dcs = pool.datacenters().write();
1068 *dcs = new_dcs;
1069 }
1070 pool.rebuild_ring();
1071 pool.preselect_remote_racks();
1072}
1073
1074