Skip to main content

dynomite/embed/
server.rs

1//! Embedding-API server runtime.
2//!
3//! [`Server`] owns the runtime data structures (cluster pool,
4//! dispatcher, stats, event bus, hooks). [`Server::start`] spawns
5//! the background tasks (gossip, stats aggregator, optional TCP
6//! listeners) on the current tokio runtime and returns a
7//! [`ServerHandle`].
8//!
9//! The handle exposes the public control surface documented in
10//! `docs/book/src/embedding/server.md`. It is `Clone + Send +
11//! Sync`; multiple consumers may hold one. Dropping the last
12//! handle does not shut the server down - only
13//! [`ServerHandle::shutdown`] does.
14
15use std::collections::HashMap;
16use std::net::SocketAddr;
17use std::sync::atomic::{AtomicU64, Ordering};
18use std::sync::{Arc, OnceLock, Weak};
19
20use parking_lot::Mutex;
21use tokio::net::TcpListener;
22use tokio::task::JoinHandle;
23use tokio::time::{Duration, MissedTickBehavior};
24use tokio_util::sync::CancellationToken;
25
26use crate::cluster::dispatch::{ClusterDispatcher, DispatchPlan};
27use crate::cluster::peer::{Peer, PeerEndpoint, PeerState};
28use crate::cluster::pool::{PoolConfig, ServerPool};
29use crate::conf::{ConfDynSeed, ConfPool, Config};
30use crate::embed::error::EmbedError;
31use crate::embed::events::{CloseReason, ConnRoleTag, EventBus, EventStream, ServerEvent};
32use crate::embed::hooks::{
33    CryptoProvider, Datastore, LoggingMetricsSink, MetricsSink, SeedsProvider,
34};
35use crate::embed::snapshots::{DatacenterSnapshot, PeerSnapshot, RingSnapshot};
36use crate::events::EventManager;
37use crate::hashkit::DynToken;
38use crate::msg::Msg;
39use crate::stats::{
40    describe_stats, MetricSpec, PoolField, PoolStats, ServerField, ServerStats, ServiceInfo,
41    Snapshot, Stats,
42};
43
44/// Bag of optional hook overrides supplied to a builder.
45///
46/// All fields are private so adding new hooks (for example, a
47/// `Box<dyn TransportListener>` slot) is not a SemVer-breaking
48/// change. Construct a `ServerHooks` only through
49/// [`crate::embed::ServerBuilder`] setters; read it back through
50/// the accessor methods below.
51#[non_exhaustive]
52pub struct ServerHooks {
53    pub(crate) datastore: Option<Box<dyn Datastore>>,
54    pub(crate) seeds: Option<Box<dyn SeedsProvider>>,
55    pub(crate) crypto: Option<Box<dyn CryptoProvider>>,
56    pub(crate) metrics: Option<Box<dyn MetricsSink>>,
57}
58
59impl ServerHooks {
60    /// Borrow the configured [`Datastore`], if any.
61    #[must_use]
62    pub fn datastore(&self) -> Option<&dyn Datastore> {
63        self.datastore.as_deref()
64    }
65
66    /// Borrow the configured [`SeedsProvider`], if any.
67    #[must_use]
68    pub fn seeds(&self) -> Option<&dyn SeedsProvider> {
69        self.seeds.as_deref()
70    }
71
72    /// Borrow the configured [`CryptoProvider`], if any.
73    #[must_use]
74    pub fn crypto(&self) -> Option<&dyn CryptoProvider> {
75        self.crypto.as_deref()
76    }
77
78    /// Borrow the configured [`MetricsSink`], if any.
79    #[must_use]
80    pub fn metrics(&self) -> Option<&dyn MetricsSink> {
81        self.metrics.as_deref()
82    }
83}
84
85impl std::fmt::Debug for ServerHooks {
86    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
87        f.debug_struct("ServerHooks")
88            .field("datastore", &self.datastore.is_some())
89            .field("seeds", &self.seeds.is_some())
90            .field("crypto", &self.crypto.is_some())
91            .field("metrics", &self.metrics.is_some())
92            .finish()
93    }
94}
95
96// -------- in-process registry -------------------------------------------
97
98/// Process-wide registry for in-process clusters.
99///
100/// When `inject_request` produces a `Replicas` plan that targets
101/// a peer co-located in the same process, the embed runtime
102/// looks up the target via this registry and forwards directly
103/// without the dnode wire path. This keeps the in-process tests
104/// from depending on real network I/O while preserving the
105/// production code path semantics (compute plan, deliver to
106/// target peers).
107fn registry() -> &'static Mutex<HashMap<SocketAddr, Weak<ServerInner>>> {
108    static R: OnceLock<Mutex<HashMap<SocketAddr, Weak<ServerInner>>>> = OnceLock::new();
109    R.get_or_init(|| Mutex::new(HashMap::new()))
110}
111
112fn registry_register(addr: SocketAddr, server: &Arc<ServerInner>) {
113    registry().lock().insert(addr, Arc::downgrade(server));
114}
115
116fn registry_remove(addr: SocketAddr) {
117    registry().lock().remove(&addr);
118}
119
120fn registry_lookup(addr: SocketAddr) -> Option<Arc<ServerInner>> {
121    registry().lock().get(&addr).and_then(Weak::upgrade)
122}
123
124// -------- inner state ----------------------------------------------------
125
126/// Generation counter for [`ServerEvent::ConfigReloaded`] /
127/// [`RingSnapshot`].
128fn next_generation() -> u64 {
129    static G: AtomicU64 = AtomicU64::new(0);
130    G.fetch_add(1, Ordering::Relaxed)
131}
132
133/// Connection-id allocator.
134fn next_conn_id() -> u64 {
135    static C: AtomicU64 = AtomicU64::new(0);
136    C.fetch_add(1, Ordering::Relaxed)
137}
138
139/// Inner state shared by every clone of a [`ServerHandle`].
140pub(crate) struct ServerInner {
141    pool: Arc<ServerPool>,
142    dispatcher: ClusterDispatcher,
143    stats: Arc<Stats>,
144    snapshot_cache: Arc<Mutex<Snapshot>>,
145    bus: EventBus,
146    events: Arc<EventManager>,
147    datastore: Box<dyn Datastore>,
148    seeds: Box<dyn SeedsProvider>,
149    metrics: Box<dyn MetricsSink>,
150    crypto: Option<Box<dyn CryptoProvider>>,
151    listen_addr: Option<SocketAddr>,
152    dyn_listen_addr: Option<SocketAddr>,
153    cancel: CancellationToken,
154    pool_name: String,
155    config: Mutex<ConfPool>,
156    generation: AtomicU64,
157    vector_registry: Arc<crate::vector::registry::VectorRegistry>,
158}
159
160impl std::fmt::Debug for ServerInner {
161    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
162        f.debug_struct("ServerInner")
163            .field("pool_name", &self.pool_name)
164            .field("listen", &self.listen_addr)
165            .field("dyn_listen", &self.dyn_listen_addr)
166            .finish_non_exhaustive()
167    }
168}
169
170impl ServerInner {
171    fn dispatch_local(&self, req: Msg) -> super::hooks::BoxFuture<'_, Result<Msg, EmbedError>> {
172        let fut = self.datastore.dispatch(req);
173        Box::pin(async move {
174            let rsp = fut.await.map_err(|e| EmbedError::Inject(e.to_string()))?;
175            Ok(rsp)
176        })
177    }
178}
179
180// -------- Server (configured-but-not-running) ----------------------------
181
182/// Configured-but-not-running server.
183///
184/// Build one with [`ServerBuilder`](crate::embed::ServerBuilder),
185/// then call [`Server::start`] to spawn the background tasks.
186#[derive(Debug)]
187pub struct Server {
188    pool_name: String,
189    pool: ConfPool,
190    cluster: Arc<ServerPool>,
191    hooks: ServerHooks,
192    stats: Arc<Stats>,
193    vector_registry: Arc<crate::vector::registry::VectorRegistry>,
194}
195
196impl Server {
197    /// Used internally by [`crate::embed::ServerBuilder::build`].
198    pub(crate) fn from_pool(
199        pool_name: String,
200        pool: ConfPool,
201        hooks: ServerHooks,
202        vector_registry: Arc<crate::vector::registry::VectorRegistry>,
203    ) -> Self {
204        let pool_cfg = PoolConfig::from_conf(&pool_name, &pool);
205        let local_peer = build_local_peer(&pool, &pool_cfg);
206        let mut peers = vec![local_peer];
207        if let Some(seeds) = pool.dyn_seeds.as_ref() {
208            let start = u32::try_from(peers.len()).unwrap_or(0);
209            peers.extend(peers_from_seeds(&pool_cfg, seeds, start));
210        }
211        let server_pool_arc = Arc::new(ServerPool::new(pool_cfg.clone(), peers));
212        server_pool_arc.preselect_remote_racks();
213
214        let stats = Arc::new(Stats::new(
215            ServiceInfo {
216                source: pool_cfg.name.clone(),
217                version: env!("CARGO_PKG_VERSION").to_string(),
218                rack: pool_cfg.rack.clone(),
219                dc: pool_cfg.dc.clone(),
220            },
221            PoolStats::new(&pool_cfg.name),
222            ServerStats::new("backend"),
223        ));
224
225        Self {
226            pool_name,
227            pool,
228            cluster: server_pool_arc,
229            hooks,
230            stats,
231            vector_registry,
232        }
233    }
234
235    /// Borrow the [`crate::vector::registry::VectorRegistry`]
236    /// this server was built with. The [`crate::embed::ServerBuilder`]
237    /// installs a fresh registry by default; call
238    /// [`crate::embed::ServerBuilder::with_vector_registry`] to
239    /// share one across multiple servers.
240    #[must_use]
241    pub fn vector_registry(&self) -> &Arc<crate::vector::registry::VectorRegistry> {
242        &self.vector_registry
243    }
244
245    /// The pool name configured in the YAML / builder.
246    #[must_use]
247    pub fn pool_name(&self) -> &str {
248        &self.pool_name
249    }
250
251    /// Spawn background tasks on the current tokio runtime and
252    /// return a [`ServerHandle`].
253    ///
254    /// `start` is non-blocking. The returned handle is `Clone +
255    /// Send + Sync`.
256    ///
257    /// # In-process only
258    ///
259    /// The embedded server in this stage is **in-process only**:
260    /// the `listen:` and `dyn_listen:` sockets bind so that
261    /// configured ports are reservable and post-bind reporting
262    /// works, but cross-process clients connecting to those
263    /// ports see open-then-immediate-close (with a runtime
264    /// warning logged on each accept). The sanctioned way to
265    /// drive an embedded `Server` from in-process code is
266    /// [`ServerHandle::inject_request`]. Cross-process traffic
267    /// is supported by the `dynomited` binary, which wires the
268    /// proxy module directly. Wiring the embedded accept loop
269    /// to the dispatcher is tracked as a follow-up; the contract
270    /// is documented in `docs/parity.md`.
271    ///
272    /// # Examples
273    ///
274    /// ```no_run
275    /// use dynomite::embed::ServerBuilder;
276    /// use dynomite::conf::DataStore;
277    /// # tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap().block_on(async {
278    /// let server = ServerBuilder::new("dyn_o_mite")
279    ///     .listen("127.0.0.1:0".parse().unwrap())
280    ///     .dyn_listen("127.0.0.1:0".parse().unwrap())
281    ///     .data_store(DataStore::Redis)
282    ///     .servers(vec![dynomite::conf::ConfServer::parse("127.0.0.1:6379:1").unwrap()])
283    ///     .tokens_str("0")
284    ///     .enable_gossip(false)
285    ///     .build()
286    ///     .unwrap();
287    /// let handle = server.start().await.unwrap();
288    /// handle.shutdown().await.unwrap();
289    /// # });
290    /// ```
291    pub async fn start(self) -> Result<ServerHandle, EmbedError> {
292        let Server {
293            pool_name,
294            pool,
295            cluster,
296            mut hooks,
297            stats,
298            vector_registry,
299        } = self;
300
301        let dispatcher =
302            ClusterDispatcher::new(cluster.clone()).with_vector_registry(vector_registry.clone());
303        let bus = EventBus::new(64);
304        let events = Arc::new(EventManager::new(64));
305        let cancel = CancellationToken::new();
306        let snapshot_cache = Arc::new(Mutex::new(Snapshot::default()));
307
308        let datastore = hooks
309            .datastore
310            .take()
311            .expect("invariant: builder always populates datastore");
312        let seeds = hooks
313            .seeds
314            .take()
315            .expect("invariant: builder always populates seeds");
316        let metrics = hooks
317            .metrics
318            .take()
319            .unwrap_or_else(|| Box::new(LoggingMetricsSink::new(pool_name.clone())));
320
321        let (listen_listener, listen_addr) = bind_listener(pool.listen.as_ref()).await?;
322        let (dyn_listener, dyn_listen_addr) = bind_listener(pool.dyn_listen.as_ref()).await?;
323
324        let inner = Arc::new(ServerInner {
325            pool: cluster.clone(),
326            dispatcher,
327            stats: stats.clone(),
328            snapshot_cache: snapshot_cache.clone(),
329            bus: bus.clone(),
330            events: events.clone(),
331            datastore,
332            seeds,
333            metrics,
334            crypto: hooks.crypto,
335            listen_addr,
336            dyn_listen_addr,
337            cancel: cancel.clone(),
338            pool_name: pool_name.clone(),
339            config: Mutex::new(pool.clone()),
340            generation: AtomicU64::new(0),
341            vector_registry,
342        });
343
344        // Register self in the in-process registry so peer
345        // forwarding can find this node.
346        if let Some(addr) = inner.dyn_listen_addr {
347            registry_register(addr, &inner);
348        }
349
350        // Spawn background tasks.
351        let mut tasks: Vec<JoinHandle<()>> = Vec::new();
352        tasks.push(tokio::spawn(stats_loop(inner.clone(), pool.stats_interval)));
353        tasks.push(tokio::spawn(metrics_loop(inner.clone())));
354        if pool.enable_gossip.unwrap_or(false) {
355            tasks.push(tokio::spawn(gossip_loop(
356                inner.clone(),
357                Duration::from_millis(
358                    u64::try_from(pool.gos_interval.unwrap_or(1_000)).unwrap_or(1_000),
359                ),
360            )));
361        }
362        if let (Some(listener), Some(addr)) = (listen_listener, listen_addr) {
363            tasks.push(tokio::spawn(accept_loop(
364                inner.clone(),
365                listener,
366                addr,
367                ConnRoleTag::Proxy,
368            )));
369        }
370        if let (Some(listener), Some(addr)) = (dyn_listener, dyn_listen_addr) {
371            tasks.push(tokio::spawn(accept_loop(
372                inner.clone(),
373                listener,
374                addr,
375                ConnRoleTag::DnodeProxy,
376            )));
377        }
378
379        Ok(ServerHandle {
380            inner,
381            tasks: Arc::new(Mutex::new(tasks)),
382        })
383    }
384}
385
386// -------- ServerHandle ---------------------------------------------------
387
388/// Cloneable handle to a running [`Server`].
389///
390/// The handle is the public control surface: shutdown, reload,
391/// stats, events, request injection, topology snapshots.
392#[derive(Clone)]
393pub struct ServerHandle {
394    inner: Arc<ServerInner>,
395    tasks: Arc<Mutex<Vec<JoinHandle<()>>>>,
396}
397
398impl std::fmt::Debug for ServerHandle {
399    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
400        f.debug_struct("ServerHandle")
401            .field("pool", &self.inner.pool_name)
402            .field("listen", &self.inner.listen_addr)
403            .field("dyn_listen", &self.inner.dyn_listen_addr)
404            .finish_non_exhaustive()
405    }
406}
407
408impl ServerHandle {
409    /// Borrow the configured [`CryptoProvider`], if one was
410    /// plugged. The default builder does not install one.
411    #[must_use]
412    pub fn crypto_provider(&self) -> Option<&dyn CryptoProvider> {
413        self.inner.crypto.as_deref()
414    }
415
416    /// Borrow the [`crate::vector::registry::VectorRegistry`]
417    /// installed at build time. The default builder installs a
418    /// fresh, empty registry; share one across servers via
419    /// [`crate::embed::ServerBuilder::with_vector_registry`].
420    #[must_use]
421    pub fn vector_registry(&self) -> Arc<crate::vector::registry::VectorRegistry> {
422        Arc::clone(&self.inner.vector_registry)
423    }
424
425    /// Local listen address (post-bind), if a `listen:` was
426    /// configured.
427    #[must_use]
428    pub fn listen_addr(&self) -> Option<SocketAddr> {
429        self.inner.listen_addr
430    }
431
432    /// Local dnode listen address (post-bind), if a
433    /// `dyn_listen:` was configured.
434    #[must_use]
435    pub fn dyn_listen_addr(&self) -> Option<SocketAddr> {
436        self.inner.dyn_listen_addr
437    }
438
439    /// Subscribe to the [`ServerEvent`] broadcast.
440    ///
441    /// # Examples
442    ///
443    /// ```no_run
444    /// # use dynomite::embed::ServerBuilder;
445    /// # use dynomite::conf::DataStore;
446    /// # tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap().block_on(async {
447    /// let server = ServerBuilder::new("p")
448    ///     .listen("127.0.0.1:0".parse().unwrap())
449    ///     .dyn_listen("127.0.0.1:0".parse().unwrap())
450    ///     .data_store(DataStore::Redis)
451    ///     .servers(vec![dynomite::conf::ConfServer::parse("127.0.0.1:6379:1").unwrap()])
452    ///     .tokens_str("0")
453    ///     .build().unwrap();
454    /// let handle = server.start().await.unwrap();
455    /// let _events = handle.subscribe_events();
456    /// handle.shutdown().await.unwrap();
457    /// # });
458    /// ```
459    #[must_use]
460    pub fn subscribe_events(&self) -> EventStream {
461        self.inner.bus.subscribe()
462    }
463
464    /// Borrow the [`EventManager`] handle that publishes
465    /// structured cluster-event payloads (`PeerUp`, `PeerDown`,
466    /// `GossipRoundComplete`, etc.).
467    ///
468    /// Returned as `Arc` so embedders can hold a long-lived
469    /// reference and pass it across crate boundaries (notably to
470    /// `dyniak`, which publishes AAE start/complete events).
471    #[must_use]
472    pub fn events(&self) -> Arc<EventManager> {
473        Arc::clone(&self.inner.events)
474    }
475
476    /// Latest stats snapshot.
477    #[must_use]
478    pub fn stats(&self) -> Snapshot {
479        self.inner.stats.snapshot()
480    }
481
482    /// Borrow the live [`Stats`] aggregator as a shared handle.
483    ///
484    /// Returns the same `Arc<Stats>` the runtime publishes to,
485    /// so an embedder can read counters as they update without
486    /// going through the periodic snapshot path. Useful for
487    /// in-process metrics integrations that prefer a pull model
488    /// (Prometheus scrape, OpenTelemetry pull readers) over the
489    /// push-on-flush path exposed by [`MetricsSink`].
490    ///
491    /// The returned handle is `Clone`; callers may share it
492    /// across threads. The aggregator outlives the handle for as
493    /// long as the [`ServerHandle`] tree is alive.
494    ///
495    /// [`MetricsSink`]: crate::embed::MetricsSink
496    #[must_use]
497    pub fn stats_handle(&self) -> Arc<Stats> {
498        Arc::clone(&self.inner.stats)
499    }
500
501    /// Manifest of every metric the engine emits.
502    #[must_use]
503    pub fn describe_stats(&self) -> Vec<MetricSpec> {
504        let _ = describe_stats(); // ensure the module is exercised
505        let mut out: Vec<MetricSpec> = Vec::new();
506        out.extend(crate::stats::POOL_CODEC.iter().copied());
507        out.extend(crate::stats::SERVER_CODEC.iter().copied());
508        out
509    }
510
511    /// Snapshot of every peer in the cluster.
512    #[must_use]
513    pub fn peers(&self) -> Vec<PeerSnapshot> {
514        self.inner
515            .pool
516            .peers()
517            .read()
518            .iter()
519            .map(PeerSnapshot::from)
520            .collect()
521    }
522
523    /// Snapshot of every datacenter and its racks.
524    #[must_use]
525    pub fn datacenters(&self) -> Vec<DatacenterSnapshot> {
526        self.inner
527            .pool
528            .datacenters()
529            .read()
530            .iter()
531            .map(DatacenterSnapshot::from)
532            .collect()
533    }
534
535    /// Snapshot of the token ring.
536    #[must_use]
537    pub fn ring(&self) -> RingSnapshot {
538        let dcs = self.inner.pool.datacenters().read();
539        let mut entries: Vec<(DynToken, u32)> = Vec::new();
540        for dc in dcs.iter() {
541            for rack in dc.racks() {
542                for c in rack.continuums() {
543                    entries.push((c.token.clone(), c.peer_idx));
544                }
545            }
546        }
547        RingSnapshot {
548            entries,
549            generation: self.inner.generation.load(Ordering::Relaxed),
550        }
551    }
552
553    /// Inject a parsed request as if it had arrived from a
554    /// client. Returns the response message.
555    ///
556    /// This is the test/embedding entry point that bypasses the
557    /// proxy listener. The dispatcher computes a routing plan; if
558    /// the plan resolves to the local datastore, the
559    /// [`Datastore::dispatch`](crate::embed::hooks::Datastore::dispatch)
560    /// hook is invoked. If the plan resolves to a remote peer
561    /// co-located in the same process (via the in-process
562    /// registry), the request is forwarded to that peer's
563    /// datastore and its response is returned.
564    pub async fn inject_request(&self, req: Msg) -> Result<Msg, EmbedError> {
565        // Use the parsed key (if any) for routing; if there are
566        // no keys, the dispatcher's `LocalDatastore` short-circuit
567        // applies.
568        let key: Vec<u8> = req
569            .keys()
570            .first()
571            .map(|kp| kp.tag_bytes().to_vec())
572            .unwrap_or_default();
573        let plan = self.inner.dispatcher.plan(&req, &key);
574        match plan {
575            DispatchPlan::Drop => {
576                // Drop: respond with an empty message of the same
577                // id so callers can correlate.
578                let mut rsp = Msg::new(req.id(), crate::msg::MsgType::Unknown, false);
579                rsp.set_parent_id(req.id());
580                Ok(rsp)
581            }
582            DispatchPlan::NoTargets => Err(EmbedError::Inject(
583                "cluster has no quorum-eligible targets".into(),
584            )),
585            DispatchPlan::LocalDatastore => self.inner.dispatch_local(req).await,
586            DispatchPlan::Replicas { targets, .. } => {
587                self.inner.stats.pool_incr_by(PoolField::ForwardError, 0); // touch counter table
588                self.inner.stats.server_incr(ServerField::ReadRequests);
589                // Snapshot the targets' addresses outside any
590                // lock so we don't hold the peer-list read guard
591                // across the awaits below.
592                let resolved: Vec<(bool, Option<SocketAddr>)> = {
593                    let peers = self.inner.pool.peers().read();
594                    targets
595                        .iter()
596                        .map(|t| {
597                            let peer = peers.get(t.peer_idx as usize);
598                            let is_local = peer.is_some_and(crate::cluster::peer::Peer::is_local);
599                            let addr = peer.and_then(|p| p.endpoint().pname().parse().ok());
600                            (is_local, addr)
601                        })
602                        .collect()
603                };
604                let mut last_err: Option<EmbedError> = None;
605                for (is_local, addr) in resolved {
606                    if is_local {
607                        return self.inner.dispatch_local(req).await;
608                    }
609                    if let Some(addr) = addr {
610                        if let Some(remote) = registry_lookup(addr) {
611                            let mut forwarded = Msg::new(req.id(), req.ty(), true);
612                            forwarded.set_parent_id(req.parent_id());
613                            match remote.datastore.dispatch(forwarded).await {
614                                Ok(rsp) => return Ok(rsp),
615                                Err(e) => last_err = Some(EmbedError::Inject(e.to_string())),
616                            }
617                        }
618                    }
619                }
620                if let Some(e) = last_err {
621                    return Err(e);
622                }
623                // No registered remote peer; fall back to the
624                // local datastore so the embed surface stays
625                // useful in mixed deployments.
626                self.inner.dispatch_local(req).await
627            }
628        }
629    }
630
631    /// Reload the configuration. Validates the new [`Config`]
632    /// before any state is touched; on failure the running config
633    /// is left untouched.
634    ///
635    /// This is the in-process equivalent of SIGHUP. The C
636    /// reference's `dynomite_reload_conf` (in
637    /// `_/dynomite/src/dynomite.c`) re-reads the YAML from disk;
638    /// the embedding API takes the `Config` directly.
639    pub async fn reload(&self, mut cfg: Config) -> Result<(), EmbedError> {
640        cfg.finalize();
641        cfg.validate()?;
642        let new_pool = cfg.pool().clone();
643        // Update the stored pool so subsequent .stats() / .peers()
644        // calls reflect the live config. The dispatcher's pool
645        // structure is left untouched here; a future stage will
646        // rebuild it under a write lock.
647        *self.inner.config.lock() = new_pool;
648        let gen_id = next_generation();
649        self.inner.generation.store(gen_id, Ordering::Relaxed);
650        self.inner
651            .bus
652            .send(ServerEvent::ConfigReloaded { generation: gen_id });
653        // Yield once so the broadcast send is observable on the
654        // calling task's scheduler before the function returns.
655        tokio::task::yield_now().await;
656        Ok(())
657    }
658
659    /// Graceful shutdown.
660    ///
661    /// Cancels every background task, deregisters from the
662    /// in-process peer registry, drains the join set, and
663    /// returns. Idempotent: calling `shutdown` after a previous
664    /// `shutdown` (or after [`ServerHandle::join`] returned)
665    /// completes successfully without doing any work.
666    pub async fn shutdown(&self) -> Result<(), EmbedError> {
667        self.inner.cancel.cancel();
668        if let Some(addr) = self.inner.dyn_listen_addr {
669            registry_remove(addr);
670        }
671        let drained: Vec<JoinHandle<()>> = std::mem::take(&mut *self.tasks.lock());
672        for t in drained {
673            // Tasks observe `cancel` and return promptly.
674            let _ = t.await;
675        }
676        Ok(())
677    }
678
679    /// Wait for every background task to complete.
680    ///
681    /// Unlike [`ServerHandle::shutdown`], `join` does not request
682    /// cancellation: it parks the caller until the runtime tasks
683    /// (gossip, stats aggregator, accept loops, metrics flusher)
684    /// finish on their own. The intended use is the
685    /// "start-then-park" pattern in long-running embedders that
686    /// drive shutdown from a separate signal handler:
687    ///
688    /// ```no_run
689    /// # use dynomite::embed::ServerBuilder;
690    /// # use dynomite::conf::DataStore;
691    /// # tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap().block_on(async {
692    /// let handle = ServerBuilder::new("p")
693    ///     .listen("127.0.0.1:0".parse().unwrap())
694    ///     .dyn_listen("127.0.0.1:0".parse().unwrap())
695    ///     .data_store(DataStore::Redis)
696    ///     .servers(vec![dynomite::conf::ConfServer::parse("127.0.0.1:6379:1").unwrap()])
697    ///     .tokens_str("0")
698    ///     .build().unwrap()
699    ///     .start().await.unwrap();
700    /// let shutdown = handle.clone();
701    /// tokio::spawn(async move {
702    ///     // Replace with `tokio::signal::ctrl_c().await.unwrap();`
703    ///     // in a real embedder.
704    ///     shutdown.shutdown().await.unwrap();
705    /// });
706    /// handle.join().await;
707    /// # });
708    /// ```
709    ///
710    /// `join` resolves once every task spawned by
711    /// [`Server::start`] has returned. Calling `join` repeatedly
712    /// is safe; the second call returns immediately because the
713    /// task set is drained on the first.
714    pub async fn join(&self) {
715        let drained: Vec<JoinHandle<()>> = std::mem::take(&mut *self.tasks.lock());
716        for t in drained {
717            let _ = t.await;
718        }
719    }
720}
721
722// -------- helpers ---------------------------------------------------------
723
724fn build_local_peer(pool: &ConfPool, cfg: &PoolConfig) -> Peer {
725    let dyn_listen = pool.dyn_listen.as_ref().map_or_else(
726        || ("127.0.0.1".to_string(), 0u16),
727        |l| (l.name().to_string(), l.port()),
728    );
729    let tokens = pool
730        .tokens
731        .as_ref()
732        .map(|tl| {
733            tl.components()
734                .iter()
735                .map(|c| DynToken::from_u32(c.digits.parse::<u32>().unwrap_or(0)))
736                .collect::<Vec<_>>()
737        })
738        .unwrap_or_default();
739    let mut peer = Peer::new(
740        0,
741        PeerEndpoint::tcp(dyn_listen.0, dyn_listen.1),
742        cfg.rack.clone(),
743        cfg.dc.clone(),
744        if tokens.is_empty() {
745            vec![DynToken::from_u32(0)]
746        } else {
747            tokens
748        },
749        true,
750        true,
751        false,
752    );
753    let now_secs = std::time::SystemTime::now()
754        .duration_since(std::time::UNIX_EPOCH)
755        .map(|d| d.as_secs())
756        .unwrap_or(0);
757    peer.set_state(PeerState::Normal, now_secs);
758    peer
759}
760
761fn peers_from_seeds(cfg: &PoolConfig, seeds: &[ConfDynSeed], start_idx: u32) -> Vec<Peer> {
762    let now_secs = std::time::SystemTime::now()
763        .duration_since(std::time::UNIX_EPOCH)
764        .map(|d| d.as_secs())
765        .unwrap_or(0);
766    seeds
767        .iter()
768        .enumerate()
769        .map(|(i, s)| {
770            let tokens: Vec<DynToken> = s
771                .tokens()
772                .components()
773                .iter()
774                .map(|c| DynToken::from_u32(c.digits.parse::<u32>().unwrap_or(0)))
775                .collect();
776            let idx_off = u32::try_from(i).unwrap_or(0);
777            let mut p = Peer::new(
778                start_idx + idx_off,
779                PeerEndpoint::tcp(s.host().to_string(), s.port()),
780                s.rack().to_string(),
781                s.dc().to_string(),
782                if tokens.is_empty() {
783                    vec![DynToken::from_u32(0)]
784                } else {
785                    tokens
786                },
787                false,
788                s.dc() == cfg.dc,
789                false,
790            );
791            p.set_state(PeerState::Normal, now_secs);
792            p
793        })
794        .collect()
795}
796
797// Unused legacy helpers removed.
798
799async fn bind_listener(
800    listen: Option<&crate::conf::ConfListen>,
801) -> Result<(Option<TcpListener>, Option<SocketAddr>), EmbedError> {
802    let Some(l) = listen else {
803        return Ok((None, None));
804    };
805    let host = l.name();
806    let port = l.port();
807    if host.is_empty() {
808        return Ok((None, None));
809    }
810    let addr_str = format!("{host}:{port}");
811    let Ok(_addr) = addr_str.parse::<SocketAddr>() else {
812        return Ok((None, None));
813    };
814    let listener = TcpListener::bind(&addr_str).await?;
815    let local = listener.local_addr()?;
816    Ok((Some(listener), Some(local)))
817}
818
819// removed unused try_bind helper
820
821async fn accept_loop(
822    inner: Arc<ServerInner>,
823    listener: TcpListener,
824    addr: SocketAddr,
825    role: ConnRoleTag,
826) {
827    loop {
828        tokio::select! {
829            biased;
830            () = inner.cancel.cancelled() => return,
831            res = listener.accept() => {
832                let Ok((sock, peer)) = res else { return };
833                let conn_id = next_conn_id();
834                inner.bus.send(ServerEvent::ConnectionAccepted {
835                    conn_id,
836                    role,
837                    local_addr: Some(addr),
838                });
839                // The embedded server is in-process only at this
840                // stage: the kernel-bound socket is reserved so
841                // post-bind reporting works, but the per-role
842                // protocol parser is not wired. Cross-process
843                // clients see open-then-immediate-close. Use
844                // `ServerHandle::inject_request` for in-process
845                // traffic; use the `dynomited` binary for the
846                // wire path.
847                tracing::warn!(
848                    listen = %addr,
849                    peer = %peer,
850                    role = ?role,
851                    conn_id,
852                    "embedded listen_addr accepted a connection; embedded mode does not yet \
853                     forward to the dispatcher; use ServerHandle::inject_request instead. \
854                     Closing connection."
855                );
856                let bus = inner.bus.clone();
857                let cancel = inner.cancel.clone();
858                tokio::spawn(async move {
859                    let _ = sock; // drop on close
860                    let close_reason = if cancel.is_cancelled() {
861                        CloseReason::LocalClose
862                    } else {
863                        CloseReason::PeerEof
864                    };
865                    bus.send(ServerEvent::ConnectionClosed { conn_id, reason: close_reason });
866                });
867            }
868        }
869    }
870}
871
872async fn stats_loop(inner: Arc<ServerInner>, interval_ms: Option<i64>) {
873    let interval =
874        Duration::from_millis(u64::try_from(interval_ms.unwrap_or(1_000)).unwrap_or(1_000));
875    let mut ticker = tokio::time::interval(interval);
876    ticker.set_missed_tick_behavior(MissedTickBehavior::Delay);
877    loop {
878        tokio::select! {
879            biased;
880            () = inner.cancel.cancelled() => return,
881            _ = ticker.tick() => {
882                let snap = inner.stats.snapshot();
883                *inner.snapshot_cache.lock() = snap;
884            }
885        }
886    }
887}
888
889async fn metrics_loop(inner: Arc<ServerInner>) {
890    let interval = inner.metrics.flush_interval();
891    let mut ticker = tokio::time::interval(interval.max(Duration::from_millis(50)));
892    ticker.set_missed_tick_behavior(MissedTickBehavior::Delay);
893    loop {
894        tokio::select! {
895            biased;
896            () = inner.cancel.cancelled() => return,
897            _ = ticker.tick() => {
898                let snap = inner.snapshot_cache.lock().clone();
899                let _ = inner.metrics.emit(&snap).await;
900            }
901        }
902    }
903}
904
905async fn gossip_loop(inner: Arc<ServerInner>, interval: Duration) {
906    let mut ticker = tokio::time::interval(interval.max(Duration::from_millis(20)));
907    ticker.set_missed_tick_behavior(MissedTickBehavior::Delay);
908    let mut round: u64 = 0;
909    let mut known: std::collections::HashSet<(String, u16)> = std::collections::HashSet::new();
910    {
911        let peers = inner.pool.peers().read();
912        for p in peers.iter() {
913            known.insert((p.endpoint().host().to_string(), p.endpoint().port()));
914        }
915    }
916    loop {
917        tokio::select! {
918            biased;
919            () = inner.cancel.cancelled() => return,
920            _ = ticker.tick() => {
921                let round_started = std::time::Instant::now();
922                let round_ts = std::time::SystemTime::now();
923                round += 1;
924                let seeds = inner.seeds.fetch().unwrap_or_default();
925                let mut added: u32 = 0;
926                {
927                    let mut peers = inner.pool.peers().write();
928                    let now_secs = std::time::SystemTime::now()
929                        .duration_since(std::time::UNIX_EPOCH)
930                        .map(|d| d.as_secs())
931                        .unwrap_or(0);
932                    let cfg = inner.pool.config().clone();
933                    for seed in &seeds {
934                        let key = (seed.host().to_string(), seed.port());
935                        if known.contains(&key) {
936                            continue;
937                        }
938                        let next_idx = u32::try_from(peers.len()).unwrap_or(u32::MAX);
939                        let tokens: Vec<DynToken> = seed
940                            .tokens()
941                            .components()
942                            .iter()
943                            .map(|c| DynToken::from_u32(c.digits.parse::<u32>().unwrap_or(0)))
944                            .collect();
945                        let mut p = Peer::new(
946                            next_idx,
947                            PeerEndpoint::tcp(seed.host().to_string(), seed.port()),
948                            seed.rack().to_string(),
949                            seed.dc().to_string(),
950                            if tokens.is_empty() {
951                                vec![DynToken::from_u32(0)]
952                            } else {
953                                tokens
954                            },
955                            false,
956                            seed.dc() == cfg.dc,
957                            false,
958                        );
959                        p.set_state(PeerState::Normal, now_secs);
960                        peers.push(p);
961                        known.insert(key);
962                        added += 1;
963                    }
964                }
965                if added > 0 {
966                    // Rebuild the topology + ring under fresh
967                    // datacenters so the dispatcher sees the new
968                    // peers.
969                    rebuild_topology(&inner.pool);
970                    let peers_now = inner.pool.peers().read();
971                    for (idx, p) in peers_now.iter().enumerate().rev().take(added as usize) {
972                        let _ = idx;
973                        inner.bus.send(ServerEvent::PeerUp(p.idx()));
974                        inner.events.publish(crate::events::ClusterEvent::PeerUp {
975                            peer_id: p.idx(),
976                            dc: p.dc().to_string(),
977                            ts: round_ts,
978                        });
979                    }
980                    inner.events.publish(crate::events::ClusterEvent::RingChanged {
981                        tag: "seed-discovery".to_string(),
982                        ts: round_ts,
983                    });
984                }
985                let count = u32::try_from(inner.pool.peers().read().len()).unwrap_or(u32::MAX);
986                inner.bus.send(ServerEvent::GossipRound { round, peers: count });
987                inner.events.publish(crate::events::ClusterEvent::GossipRoundComplete {
988                    duration: round_started.elapsed(),
989                    peers_seen: count as usize,
990                    ts: round_ts,
991                });
992                inner.stats.pool_incr(PoolField::StatsCount);
993            }
994        }
995    }
996}
997
998fn rebuild_topology(pool: &Arc<ServerPool>) {
999    use crate::cluster::Datacenter;
1000    let peers = pool.peers().read();
1001    let mut new_dcs: Vec<Datacenter> = Vec::new();
1002    for p in peers.iter() {
1003        let idx = if let Some(i) = new_dcs.iter().position(|d| d.name() == p.dc()) {
1004            i
1005        } else {
1006            new_dcs.push(Datacenter::new(p.dc().to_string()));
1007            new_dcs.len() - 1
1008        };
1009        new_dcs[idx].upsert_rack(p.rack().to_string());
1010    }
1011    drop(peers);
1012    {
1013        let mut dcs = pool.datacenters().write();
1014        *dcs = new_dcs;
1015    }
1016    pool.rebuild_ring();
1017    pool.preselect_remote_racks();
1018}
1019
1020// Internal helpers (`shutdown_signal`, `_instant_now`) removed:
1021// they were public-but-doc(hidden) escape hatches that risked
1022// SemVer commitments. The `oneshot` channel was not used
1023// outside the (deleted) test helper, and `Instant` was kept in
1024// scope only to silence an unused-import warning. Both have
1025// been dropped.