Skip to main content

dynomite/embed/
server.rs

1//! Embedding-API server runtime.
2//!
3//! [`Server`] owns the runtime data structures (cluster pool,
4//! dispatcher, stats, event bus, hooks). [`Server::start`] spawns
5//! the background tasks (gossip, stats aggregator, optional TCP
6//! listeners) on the current tokio runtime and returns a
7//! [`ServerHandle`].
8//!
9//! The handle exposes the public control surface documented in
10//! `docs/book/src/embedding/server.md`. It is `Clone + Send +
11//! Sync`; multiple consumers may hold one. Dropping the last
12//! handle does not shut the server down - only
13//! [`ServerHandle::shutdown`] does.
14
15use std::collections::HashMap;
16use std::net::SocketAddr;
17use std::sync::atomic::{AtomicU64, Ordering};
18use std::sync::{Arc, OnceLock, Weak};
19
20use parking_lot::Mutex;
21use tokio::net::TcpListener;
22use tokio::task::JoinHandle;
23use tokio::time::{Duration, MissedTickBehavior};
24use tokio_util::sync::CancellationToken;
25
26use crate::cluster::dispatch::{ClusterDispatcher, DispatchPlan};
27use crate::cluster::peer::{Peer, PeerEndpoint, PeerState};
28use crate::cluster::pool::{PoolConfig, ServerPool};
29use crate::conf::{ConfDynSeed, ConfPool, Config};
30use crate::embed::error::EmbedError;
31use crate::embed::events::{CloseReason, ConnRoleTag, EventBus, EventStream, ServerEvent};
32use crate::embed::hooks::{
33    CryptoProvider, Datastore, LoggingMetricsSink, MetricsSink, SeedsProvider,
34};
35use crate::embed::snapshots::{DatacenterSnapshot, PeerSnapshot, RingSnapshot};
36use crate::events::EventManager;
37use crate::hashkit::DynToken;
38use crate::msg::Msg;
39use crate::stats::{
40    describe_stats, MetricSpec, PoolField, PoolStats, ServerField, ServerStats, ServiceInfo,
41    Snapshot, Stats,
42};
43
44/// Bag of optional hook overrides supplied to a builder.
45///
46/// All fields are private so adding new hooks (for example, a
47/// `Box<dyn TransportListener>` slot) is not a SemVer-breaking
48/// change. Construct a `ServerHooks` only through
49/// [`crate::embed::ServerBuilder`] setters; read it back through
50/// the accessor methods below.
51#[non_exhaustive]
52pub struct ServerHooks {
53    pub(crate) datastore: Option<Box<dyn Datastore>>,
54    pub(crate) seeds: Option<Box<dyn SeedsProvider>>,
55    pub(crate) crypto: Option<Box<dyn CryptoProvider>>,
56    pub(crate) metrics: Option<Box<dyn MetricsSink>>,
57}
58
59impl ServerHooks {
60    /// Borrow the configured [`Datastore`], if any.
61    #[must_use]
62    pub fn datastore(&self) -> Option<&dyn Datastore> {
63        self.datastore.as_deref()
64    }
65
66    /// Borrow the configured [`SeedsProvider`], if any.
67    #[must_use]
68    pub fn seeds(&self) -> Option<&dyn SeedsProvider> {
69        self.seeds.as_deref()
70    }
71
72    /// Borrow the configured [`CryptoProvider`], if any.
73    #[must_use]
74    pub fn crypto(&self) -> Option<&dyn CryptoProvider> {
75        self.crypto.as_deref()
76    }
77
78    /// Borrow the configured [`MetricsSink`], if any.
79    #[must_use]
80    pub fn metrics(&self) -> Option<&dyn MetricsSink> {
81        self.metrics.as_deref()
82    }
83}
84
85impl std::fmt::Debug for ServerHooks {
86    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
87        f.debug_struct("ServerHooks")
88            .field("datastore", &self.datastore.is_some())
89            .field("seeds", &self.seeds.is_some())
90            .field("crypto", &self.crypto.is_some())
91            .field("metrics", &self.metrics.is_some())
92            .finish()
93    }
94}
95
96// -------- in-process registry -------------------------------------------
97
98/// Process-wide registry for in-process clusters.
99///
100/// When `inject_request` produces a `Replicas` plan that targets
101/// a peer co-located in the same process, the embed runtime
102/// looks up the target via this registry and forwards directly
103/// without the dnode wire path. This keeps the in-process tests
104/// from depending on real network I/O while preserving the
105/// production code path semantics (compute plan, deliver to
106/// target peers).
107fn registry() -> &'static Mutex<HashMap<SocketAddr, Weak<ServerInner>>> {
108    static R: OnceLock<Mutex<HashMap<SocketAddr, Weak<ServerInner>>>> = OnceLock::new();
109    R.get_or_init(|| Mutex::new(HashMap::new()))
110}
111
112fn registry_register(addr: SocketAddr, server: &Arc<ServerInner>) {
113    registry().lock().insert(addr, Arc::downgrade(server));
114}
115
116fn registry_remove(addr: SocketAddr) {
117    registry().lock().remove(&addr);
118}
119
120fn registry_lookup(addr: SocketAddr) -> Option<Arc<ServerInner>> {
121    registry().lock().get(&addr).and_then(Weak::upgrade)
122}
123
124// -------- inner state ----------------------------------------------------
125
126/// Generation counter for [`ServerEvent::ConfigReloaded`] /
127/// [`RingSnapshot`].
128fn next_generation() -> u64 {
129    static G: AtomicU64 = AtomicU64::new(0);
130    G.fetch_add(1, Ordering::Relaxed)
131}
132
133/// Connection-id allocator.
134fn next_conn_id() -> u64 {
135    static C: AtomicU64 = AtomicU64::new(0);
136    C.fetch_add(1, Ordering::Relaxed)
137}
138
139/// Inner state shared by every clone of a [`ServerHandle`].
140pub(crate) struct ServerInner {
141    pool: Arc<ServerPool>,
142    dispatcher: ClusterDispatcher,
143    stats: Arc<Stats>,
144    snapshot_cache: Arc<Mutex<Snapshot>>,
145    bus: EventBus,
146    events: Arc<EventManager>,
147    datastore: Box<dyn Datastore>,
148    seeds: Box<dyn SeedsProvider>,
149    metrics: Box<dyn MetricsSink>,
150    crypto: Option<Box<dyn CryptoProvider>>,
151    listen_addr: Option<SocketAddr>,
152    dyn_listen_addr: Option<SocketAddr>,
153    cancel: CancellationToken,
154    pool_name: String,
155    config: Mutex<ConfPool>,
156    generation: AtomicU64,
157    command_extension: Option<Arc<dyn crate::embed::CommandExtension>>,
158}
159
160impl std::fmt::Debug for ServerInner {
161    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
162        f.debug_struct("ServerInner")
163            .field("pool_name", &self.pool_name)
164            .field("listen", &self.listen_addr)
165            .field("dyn_listen", &self.dyn_listen_addr)
166            .finish_non_exhaustive()
167    }
168}
169
170impl ServerInner {
171    fn dispatch_local(&self, req: Msg) -> super::hooks::BoxFuture<'_, Result<Msg, EmbedError>> {
172        let fut = self.datastore.dispatch(req);
173        Box::pin(async move {
174            let rsp = fut.await.map_err(|e| EmbedError::Inject(e.to_string()))?;
175            Ok(rsp)
176        })
177    }
178}
179
180// -------- Server (configured-but-not-running) ----------------------------
181
182/// Configured-but-not-running server.
183///
184/// Build one with [`ServerBuilder`](crate::embed::ServerBuilder),
185/// then call [`Server::start`] to spawn the background tasks.
186#[derive(Debug)]
187pub struct Server {
188    pool_name: String,
189    pool: ConfPool,
190    cluster: Arc<ServerPool>,
191    hooks: ServerHooks,
192    stats: Arc<Stats>,
193    command_extension: Option<Arc<dyn crate::embed::CommandExtension>>,
194}
195
196impl Server {
197    /// Used internally by [`crate::embed::ServerBuilder::build`].
198    pub(crate) fn from_pool(
199        pool_name: String,
200        pool: ConfPool,
201        hooks: ServerHooks,
202        command_extension: Option<Arc<dyn crate::embed::CommandExtension>>,
203    ) -> Self {
204        let pool_cfg = PoolConfig::from_conf(&pool_name, &pool);
205        let local_peer = build_local_peer(&pool, &pool_cfg);
206        let mut peers = vec![local_peer];
207        if let Some(seeds) = pool.dyn_seeds.as_ref() {
208            let start = u32::try_from(peers.len()).unwrap_or(0);
209            peers.extend(peers_from_seeds(&pool_cfg, seeds, start));
210        }
211        let server_pool_arc = Arc::new(ServerPool::new(pool_cfg.clone(), peers));
212        server_pool_arc.preselect_remote_racks();
213
214        let stats = Arc::new(Stats::new(
215            ServiceInfo {
216                source: pool_cfg.name.clone(),
217                version: env!("CARGO_PKG_VERSION").to_string(),
218                rack: pool_cfg.rack.clone(),
219                dc: pool_cfg.dc.clone(),
220            },
221            PoolStats::new(&pool_cfg.name),
222            ServerStats::new("backend"),
223        ));
224
225        Self {
226            pool_name,
227            pool,
228            cluster: server_pool_arc,
229            hooks,
230            stats,
231            command_extension,
232        }
233    }
234
235    /// Borrow the [`crate::embed::CommandExtension`]
236    /// this server was built with, if any.
237    #[must_use]
238    pub fn command_extension(&self) -> Option<&Arc<dyn crate::embed::CommandExtension>> {
239        self.command_extension.as_ref()
240    }
241
242    /// The pool name configured in the YAML / builder.
243    #[must_use]
244    pub fn pool_name(&self) -> &str {
245        &self.pool_name
246    }
247
248    /// Spawn background tasks on the current tokio runtime and
249    /// return a [`ServerHandle`].
250    ///
251    /// `start` is non-blocking. The returned handle is `Clone +
252    /// Send + Sync`.
253    ///
254    /// # In-process only
255    ///
256    /// The embedded server in this stage is **in-process only**:
257    /// the `listen:` and `dyn_listen:` sockets bind so that
258    /// configured ports are reservable and post-bind reporting
259    /// works, but cross-process clients connecting to those
260    /// ports see open-then-immediate-close (with a runtime
261    /// warning logged on each accept). The sanctioned way to
262    /// drive an embedded `Server` from in-process code is
263    /// [`ServerHandle::inject_request`]. Cross-process traffic
264    /// is supported by the `dynomited` binary, which wires the
265    /// proxy module directly. Wiring the embedded accept loop
266    /// to the dispatcher is tracked as a follow-up; the contract
267    /// is documented in `docs/parity.md`.
268    ///
269    /// # Examples
270    ///
271    /// ```no_run
272    /// use dynomite::embed::ServerBuilder;
273    /// use dynomite::conf::DataStore;
274    /// # tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap().block_on(async {
275    /// let server = ServerBuilder::new("dyn_o_mite")
276    ///     .listen("127.0.0.1:0".parse().unwrap())
277    ///     .dyn_listen("127.0.0.1:0".parse().unwrap())
278    ///     .data_store(DataStore::Redis)
279    ///     .servers(vec![dynomite::conf::ConfServer::parse("127.0.0.1:6379:1").unwrap()])
280    ///     .tokens_str("0")
281    ///     .enable_gossip(false)
282    ///     .build()
283    ///     .unwrap();
284    /// let handle = server.start().await.unwrap();
285    /// handle.shutdown().await.unwrap();
286    /// # });
287    /// ```
288    pub async fn start(self) -> Result<ServerHandle, EmbedError> {
289        let Server {
290            pool_name,
291            pool,
292            cluster,
293            mut hooks,
294            stats,
295            command_extension,
296        } = self;
297
298        let mut dispatcher = ClusterDispatcher::new(cluster.clone());
299        if let Some(ext) = command_extension.as_ref() {
300            dispatcher = dispatcher.with_command_extension(ext.clone());
301        }
302        let bus = EventBus::new(64);
303        let events = Arc::new(EventManager::new(64));
304        let cancel = CancellationToken::new();
305        let snapshot_cache = Arc::new(Mutex::new(Snapshot::default()));
306
307        let datastore = hooks
308            .datastore
309            .take()
310            .expect("invariant: builder always populates datastore");
311        let seeds = hooks
312            .seeds
313            .take()
314            .expect("invariant: builder always populates seeds");
315        let metrics = hooks
316            .metrics
317            .take()
318            .unwrap_or_else(|| Box::new(LoggingMetricsSink::new(pool_name.clone())));
319
320        let transport = pool.transport.unwrap_or_default();
321        let (listen_listener, listen_addr) =
322            bind_listener(pool.listen.as_ref(), transport, &pool).await?;
323        let (dyn_listener, dyn_listen_addr) =
324            bind_listener(pool.dyn_listen.as_ref(), transport, &pool).await?;
325
326        let inner = Arc::new(ServerInner {
327            pool: cluster.clone(),
328            dispatcher,
329            stats: stats.clone(),
330            snapshot_cache: snapshot_cache.clone(),
331            bus: bus.clone(),
332            events: events.clone(),
333            datastore,
334            seeds,
335            metrics,
336            crypto: hooks.crypto,
337            listen_addr,
338            dyn_listen_addr,
339            cancel: cancel.clone(),
340            pool_name: pool_name.clone(),
341            config: Mutex::new(pool.clone()),
342            generation: AtomicU64::new(0),
343            command_extension,
344        });
345
346        // Register self in the in-process registry so peer
347        // forwarding can find this node.
348        if let Some(addr) = inner.dyn_listen_addr {
349            registry_register(addr, &inner);
350        }
351
352        // Spawn background tasks.
353        let mut tasks: Vec<JoinHandle<()>> = Vec::new();
354        tasks.push(tokio::spawn(stats_loop(inner.clone(), pool.stats_interval)));
355        tasks.push(tokio::spawn(metrics_loop(inner.clone())));
356        if pool.enable_gossip.unwrap_or(false) {
357            tasks.push(tokio::spawn(gossip_loop(
358                inner.clone(),
359                Duration::from_millis(
360                    u64::try_from(pool.gos_interval.unwrap_or(1_000)).unwrap_or(1_000),
361                ),
362            )));
363        }
364        if let (Some(listener), Some(addr)) = (listen_listener, listen_addr) {
365            tasks.push(tokio::spawn(accept_loop(
366                inner.clone(),
367                listener,
368                addr,
369                ConnRoleTag::Proxy,
370            )));
371        }
372        if let (Some(listener), Some(addr)) = (dyn_listener, dyn_listen_addr) {
373            tasks.push(tokio::spawn(accept_loop(
374                inner.clone(),
375                listener,
376                addr,
377                ConnRoleTag::DnodeProxy,
378            )));
379        }
380
381        Ok(ServerHandle {
382            inner,
383            tasks: Arc::new(Mutex::new(tasks)),
384        })
385    }
386}
387
388// -------- ServerHandle ---------------------------------------------------
389
390/// Cloneable handle to a running [`Server`].
391///
392/// The handle is the public control surface: shutdown, reload,
393/// stats, events, request injection, topology snapshots.
394#[derive(Clone)]
395pub struct ServerHandle {
396    inner: Arc<ServerInner>,
397    tasks: Arc<Mutex<Vec<JoinHandle<()>>>>,
398}
399
400impl std::fmt::Debug for ServerHandle {
401    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
402        f.debug_struct("ServerHandle")
403            .field("pool", &self.inner.pool_name)
404            .field("listen", &self.inner.listen_addr)
405            .field("dyn_listen", &self.inner.dyn_listen_addr)
406            .finish_non_exhaustive()
407    }
408}
409
410impl ServerHandle {
411    /// Borrow the configured [`CryptoProvider`], if one was
412    /// plugged. The default builder does not install one.
413    #[must_use]
414    pub fn crypto_provider(&self) -> Option<&dyn CryptoProvider> {
415        self.inner.crypto.as_deref()
416    }
417
418    /// Borrow the [`crate::embed::CommandExtension`] installed
419    /// at build time, if any.
420    #[must_use]
421    pub fn command_extension(&self) -> Option<Arc<dyn crate::embed::CommandExtension>> {
422        self.inner.command_extension.as_ref().map(Arc::clone)
423    }
424
425    /// Local listen address (post-bind), if a `listen:` was
426    /// configured.
427    #[must_use]
428    pub fn listen_addr(&self) -> Option<SocketAddr> {
429        self.inner.listen_addr
430    }
431
432    /// Local dnode listen address (post-bind), if a
433    /// `dyn_listen:` was configured.
434    #[must_use]
435    pub fn dyn_listen_addr(&self) -> Option<SocketAddr> {
436        self.inner.dyn_listen_addr
437    }
438
439    /// Subscribe to the [`ServerEvent`] broadcast.
440    ///
441    /// # Examples
442    ///
443    /// ```no_run
444    /// # use dynomite::embed::ServerBuilder;
445    /// # use dynomite::conf::DataStore;
446    /// # tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap().block_on(async {
447    /// let server = ServerBuilder::new("p")
448    ///     .listen("127.0.0.1:0".parse().unwrap())
449    ///     .dyn_listen("127.0.0.1:0".parse().unwrap())
450    ///     .data_store(DataStore::Redis)
451    ///     .servers(vec![dynomite::conf::ConfServer::parse("127.0.0.1:6379:1").unwrap()])
452    ///     .tokens_str("0")
453    ///     .build().unwrap();
454    /// let handle = server.start().await.unwrap();
455    /// let _events = handle.subscribe_events();
456    /// handle.shutdown().await.unwrap();
457    /// # });
458    /// ```
459    #[must_use]
460    pub fn subscribe_events(&self) -> EventStream {
461        self.inner.bus.subscribe()
462    }
463
464    /// Borrow the [`EventManager`] handle that publishes
465    /// structured cluster-event payloads (`PeerUp`, `PeerDown`,
466    /// `GossipRoundComplete`, etc.).
467    ///
468    /// Returned as `Arc` so embedders can hold a long-lived
469    /// reference and pass it across crate boundaries (notably to
470    /// `dyniak`, which publishes AAE start/complete events).
471    #[must_use]
472    pub fn events(&self) -> Arc<EventManager> {
473        Arc::clone(&self.inner.events)
474    }
475
476    /// Latest stats snapshot.
477    #[must_use]
478    pub fn stats(&self) -> Snapshot {
479        self.inner.stats.snapshot()
480    }
481
482    /// Borrow the live [`Stats`] aggregator as a shared handle.
483    ///
484    /// Returns the same `Arc<Stats>` the runtime publishes to,
485    /// so an embedder can read counters as they update without
486    /// going through the periodic snapshot path. Useful for
487    /// in-process metrics integrations that prefer a pull model
488    /// (Prometheus scrape, OpenTelemetry pull readers) over the
489    /// push-on-flush path exposed by [`MetricsSink`].
490    ///
491    /// The returned handle is `Clone`; callers may share it
492    /// across threads. The aggregator outlives the handle for as
493    /// long as the [`ServerHandle`] tree is alive.
494    ///
495    /// [`MetricsSink`]: crate::embed::MetricsSink
496    #[must_use]
497    pub fn stats_handle(&self) -> Arc<Stats> {
498        Arc::clone(&self.inner.stats)
499    }
500
501    /// Manifest of every metric the engine emits.
502    #[must_use]
503    pub fn describe_stats(&self) -> Vec<MetricSpec> {
504        let _ = describe_stats(); // ensure the module is exercised
505        let mut out: Vec<MetricSpec> = Vec::new();
506        out.extend(crate::stats::POOL_CODEC.iter().copied());
507        out.extend(crate::stats::SERVER_CODEC.iter().copied());
508        out
509    }
510
511    /// Snapshot of every peer in the cluster.
512    #[must_use]
513    pub fn peers(&self) -> Vec<PeerSnapshot> {
514        self.inner
515            .pool
516            .peers()
517            .read()
518            .iter()
519            .map(PeerSnapshot::from)
520            .collect()
521    }
522
523    /// Snapshot of every datacenter and its racks.
524    #[must_use]
525    pub fn datacenters(&self) -> Vec<DatacenterSnapshot> {
526        self.inner
527            .pool
528            .datacenters()
529            .read()
530            .iter()
531            .map(DatacenterSnapshot::from)
532            .collect()
533    }
534
535    /// Snapshot of the token ring.
536    #[must_use]
537    pub fn ring(&self) -> RingSnapshot {
538        let dcs = self.inner.pool.datacenters().read();
539        let mut entries: Vec<(DynToken, u32)> = Vec::new();
540        for dc in dcs.iter() {
541            for rack in dc.racks() {
542                for c in rack.continuums() {
543                    entries.push((c.token.clone(), c.peer_idx));
544                }
545            }
546        }
547        RingSnapshot {
548            entries,
549            generation: self.inner.generation.load(Ordering::Relaxed),
550        }
551    }
552
553    /// Inject a parsed request as if it had arrived from a
554    /// client. Returns the response message.
555    ///
556    /// This is the test/embedding entry point that bypasses the
557    /// proxy listener. The dispatcher computes a routing plan; if
558    /// the plan resolves to the local datastore, the
559    /// [`Datastore::dispatch`](crate::embed::hooks::Datastore::dispatch)
560    /// hook is invoked. If the plan resolves to a remote peer
561    /// co-located in the same process (via the in-process
562    /// registry), the request is forwarded to that peer's
563    /// datastore and its response is returned.
564    pub async fn inject_request(&self, req: Msg) -> Result<Msg, EmbedError> {
565        // Use the parsed key (if any) for routing; if there are
566        // no keys, the dispatcher's `LocalDatastore` short-circuit
567        // applies.
568        let key: Vec<u8> = req
569            .keys()
570            .first()
571            .map(|kp| kp.tag_bytes().to_vec())
572            .unwrap_or_default();
573        let plan = self.inner.dispatcher.plan(&req, &key);
574        match plan {
575            DispatchPlan::Drop => {
576                // Drop: respond with an empty message of the same
577                // id so callers can correlate.
578                let mut rsp = Msg::new(req.id(), crate::msg::MsgType::Unknown, false);
579                rsp.set_parent_id(req.id());
580                Ok(rsp)
581            }
582            DispatchPlan::NoTargets => Err(EmbedError::Inject(
583                "cluster has no quorum-eligible targets".into(),
584            )),
585            DispatchPlan::LocalDatastore => self.inner.dispatch_local(req).await,
586            DispatchPlan::Replicas { targets, .. } => {
587                self.inner.stats.pool_incr_by(PoolField::ForwardError, 0); // touch counter table
588                self.inner.stats.server_incr(ServerField::ReadRequests);
589                // Snapshot the targets' addresses outside any
590                // lock so we don't hold the peer-list read guard
591                // across the awaits below.
592                let resolved: Vec<(bool, Option<SocketAddr>)> = {
593                    let peers = self.inner.pool.peers().read();
594                    targets
595                        .iter()
596                        .map(|t| {
597                            let peer = peers.get(t.peer_idx as usize);
598                            let is_local = peer.is_some_and(crate::cluster::peer::Peer::is_local);
599                            let addr = peer.and_then(|p| p.endpoint().pname().parse().ok());
600                            (is_local, addr)
601                        })
602                        .collect()
603                };
604                let mut last_err: Option<EmbedError> = None;
605                for (is_local, addr) in resolved {
606                    if is_local {
607                        return self.inner.dispatch_local(req).await;
608                    }
609                    if let Some(addr) = addr {
610                        if let Some(remote) = registry_lookup(addr) {
611                            let mut forwarded = Msg::new(req.id(), req.ty(), true);
612                            forwarded.set_parent_id(req.parent_id());
613                            match remote.datastore.dispatch(forwarded).await {
614                                Ok(rsp) => return Ok(rsp),
615                                Err(e) => last_err = Some(EmbedError::Inject(e.to_string())),
616                            }
617                        }
618                    }
619                }
620                if let Some(e) = last_err {
621                    return Err(e);
622                }
623                // No registered remote peer; fall back to the
624                // local datastore so the embed surface stays
625                // useful in mixed deployments.
626                self.inner.dispatch_local(req).await
627            }
628        }
629    }
630
631    /// Reload the configuration. Validates the new [`Config`]
632    /// before any state is touched; on failure the running config
633    /// is left untouched.
634    ///
635    /// This is the in-process equivalent of SIGHUP. The C
636    /// reference's `dynomite_reload_conf` (in
637    /// `_/dynomite/src/dynomite.c`) re-reads the YAML from disk;
638    /// the embedding API takes the `Config` directly.
639    pub async fn reload(&self, mut cfg: Config) -> Result<(), EmbedError> {
640        cfg.finalize();
641        cfg.validate()?;
642        let new_pool = cfg.pool().clone();
643        // Update the stored pool so subsequent .stats() / .peers()
644        // calls reflect the live config. The dispatcher's pool
645        // structure is left untouched here; a future stage will
646        // rebuild it under a write lock.
647        *self.inner.config.lock() = new_pool;
648        let gen_id = next_generation();
649        self.inner.generation.store(gen_id, Ordering::Relaxed);
650        self.inner
651            .bus
652            .send(ServerEvent::ConfigReloaded { generation: gen_id });
653        // Yield once so the broadcast send is observable on the
654        // calling task's scheduler before the function returns.
655        tokio::task::yield_now().await;
656        Ok(())
657    }
658
659    /// Graceful shutdown.
660    ///
661    /// Cancels every background task, deregisters from the
662    /// in-process peer registry, drains the join set, and
663    /// returns. Idempotent: calling `shutdown` after a previous
664    /// `shutdown` (or after [`ServerHandle::join`] returned)
665    /// completes successfully without doing any work.
666    pub async fn shutdown(&self) -> Result<(), EmbedError> {
667        self.inner.cancel.cancel();
668        if let Some(addr) = self.inner.dyn_listen_addr {
669            registry_remove(addr);
670        }
671        let drained: Vec<JoinHandle<()>> = std::mem::take(&mut *self.tasks.lock());
672        for t in drained {
673            // Tasks observe `cancel` and return promptly.
674            let _ = t.await;
675        }
676        Ok(())
677    }
678
679    /// Wait for every background task to complete.
680    ///
681    /// Unlike [`ServerHandle::shutdown`], `join` does not request
682    /// cancellation: it parks the caller until the runtime tasks
683    /// (gossip, stats aggregator, accept loops, metrics flusher)
684    /// finish on their own. The intended use is the
685    /// "start-then-park" pattern in long-running embedders that
686    /// drive shutdown from a separate signal handler:
687    ///
688    /// ```no_run
689    /// # use dynomite::embed::ServerBuilder;
690    /// # use dynomite::conf::DataStore;
691    /// # tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap().block_on(async {
692    /// let handle = ServerBuilder::new("p")
693    ///     .listen("127.0.0.1:0".parse().unwrap())
694    ///     .dyn_listen("127.0.0.1:0".parse().unwrap())
695    ///     .data_store(DataStore::Redis)
696    ///     .servers(vec![dynomite::conf::ConfServer::parse("127.0.0.1:6379:1").unwrap()])
697    ///     .tokens_str("0")
698    ///     .build().unwrap()
699    ///     .start().await.unwrap();
700    /// let shutdown = handle.clone();
701    /// tokio::spawn(async move {
702    ///     // Replace with `tokio::signal::ctrl_c().await.unwrap();`
703    ///     // in a real embedder.
704    ///     shutdown.shutdown().await.unwrap();
705    /// });
706    /// handle.join().await;
707    /// # });
708    /// ```
709    ///
710    /// `join` resolves once every task spawned by
711    /// [`Server::start`] has returned. Calling `join` repeatedly
712    /// is safe; the second call returns immediately because the
713    /// task set is drained on the first.
714    pub async fn join(&self) {
715        let drained: Vec<JoinHandle<()>> = std::mem::take(&mut *self.tasks.lock());
716        for t in drained {
717            let _ = t.await;
718        }
719    }
720}
721
722// -------- helpers ---------------------------------------------------------
723
724fn build_local_peer(pool: &ConfPool, cfg: &PoolConfig) -> Peer {
725    let dyn_listen = pool.dyn_listen.as_ref().map_or_else(
726        || ("127.0.0.1".to_string(), 0u16),
727        |l| (l.name().to_string(), l.port()),
728    );
729    let tokens = pool
730        .tokens
731        .as_ref()
732        .map(|tl| {
733            tl.components()
734                .iter()
735                .map(|c| DynToken::from_u32(c.digits.parse::<u32>().unwrap_or(0)))
736                .collect::<Vec<_>>()
737        })
738        .unwrap_or_default();
739    let mut peer = Peer::new(
740        0,
741        PeerEndpoint::tcp(dyn_listen.0, dyn_listen.1),
742        cfg.rack.clone(),
743        cfg.dc.clone(),
744        if tokens.is_empty() {
745            vec![DynToken::from_u32(0)]
746        } else {
747            tokens
748        },
749        true,
750        true,
751        false,
752    );
753    let now_secs = std::time::SystemTime::now()
754        .duration_since(std::time::UNIX_EPOCH)
755        .map(|d| d.as_secs())
756        .unwrap_or(0);
757    peer.set_state(PeerState::Normal, now_secs);
758    peer
759}
760
761fn peers_from_seeds(cfg: &PoolConfig, seeds: &[ConfDynSeed], start_idx: u32) -> Vec<Peer> {
762    let now_secs = std::time::SystemTime::now()
763        .duration_since(std::time::UNIX_EPOCH)
764        .map(|d| d.as_secs())
765        .unwrap_or(0);
766    seeds
767        .iter()
768        .enumerate()
769        .map(|(i, s)| {
770            let tokens: Vec<DynToken> = s
771                .tokens()
772                .components()
773                .iter()
774                .map(|c| DynToken::from_u32(c.digits.parse::<u32>().unwrap_or(0)))
775                .collect();
776            let idx_off = u32::try_from(i).unwrap_or(0);
777            let mut p = Peer::new(
778                start_idx + idx_off,
779                PeerEndpoint::tcp(s.host().to_string(), s.port()),
780                s.rack().to_string(),
781                s.dc().to_string(),
782                if tokens.is_empty() {
783                    vec![DynToken::from_u32(0)]
784                } else {
785                    tokens
786                },
787                false,
788                s.dc() == cfg.dc,
789                false,
790            );
791            p.set_state(PeerState::Normal, now_secs);
792            p
793        })
794        .collect()
795}
796
797// Unused legacy helpers removed.
798
799async fn bind_listener(
800    listen: Option<&crate::conf::ConfListen>,
801    transport: crate::conf::Transport,
802    #[cfg_attr(not(feature = "quic"), allow(unused_variables))] pool: &crate::conf::ConfPool,
803) -> Result<(Option<TcpListener>, Option<SocketAddr>), EmbedError> {
804    let Some(l) = listen else {
805        return Ok((None, None));
806    };
807    let host = l.name();
808    let port = l.port();
809    if host.is_empty() {
810        return Ok((None, None));
811    }
812    let addr_str = format!("{host}:{port}");
813    let Ok(_addr) = addr_str.parse::<SocketAddr>() else {
814        return Ok((None, None));
815    };
816    match transport {
817        crate::conf::Transport::Tcp => {
818            let listener = TcpListener::bind(&addr_str).await?;
819            let local = listener.local_addr()?;
820            Ok((Some(listener), Some(local)))
821        }
822        #[cfg(feature = "quic")]
823        crate::conf::Transport::Quic => {
824            // The embedded server's accept loop is a stub today
825            // (see `accept_loop` below): it logs and closes
826            // every accepted client. We still bind the QUIC
827            // listener up-front so the embedder sees a clean
828            // bind error if the cert / key paths are bad,
829            // mirroring the TCP path's eager-bind contract;
830            // dropping the listener here is fine because
831            // production QUIC traffic flows through the
832            // `dynomited` binary, not the embedded harness.
833            let cert = pool
834                .quic_cert_file
835                .as_ref()
836                .and_then(|p| p.to_str().map(str::to_owned))
837                .ok_or_else(|| {
838                    EmbedError::Build(
839                        "transport: quic requires quic_cert_file (UTF-8 path)".to_string(),
840                    )
841                })?;
842            let key = pool
843                .quic_key_file
844                .as_ref()
845                .and_then(|p| p.to_str().map(str::to_owned))
846                .ok_or_else(|| {
847                    EmbedError::Build(
848                        "transport: quic requires quic_key_file (UTF-8 path)".to_string(),
849                    )
850                })?;
851            let cfg = crate::net::QuicConfig::server_with_cert_paths(cert, key);
852            let parsed: SocketAddr = addr_str.parse().map_err(|e: std::net::AddrParseError| {
853                EmbedError::Build(format!("invalid quic listen address {addr_str}: {e}"))
854            })?;
855            let listener = crate::net::QuicListener::bind(parsed, cfg).await?;
856            let local = listener.local_addr();
857            // The QUIC listener is dropped on this scope exit;
858            // the in-process embedder does not yet drive a
859            // QUIC accept loop. Return `None` for the listener
860            // so the existing TCP-flavoured accept_loop is
861            // skipped, but report the local address for
862            // observability.
863            drop(listener);
864            Ok((None, Some(local)))
865        }
866        #[cfg(not(feature = "quic"))]
867        crate::conf::Transport::Quic => Err(EmbedError::Build(
868            "transport: quic requires the engine's `quic` Cargo feature".to_string(),
869        )),
870    }
871}
872
873// removed unused try_bind helper
874
875async fn accept_loop(
876    inner: Arc<ServerInner>,
877    listener: TcpListener,
878    addr: SocketAddr,
879    role: ConnRoleTag,
880) {
881    loop {
882        tokio::select! {
883            biased;
884            () = inner.cancel.cancelled() => return,
885            res = listener.accept() => {
886                let Ok((sock, peer)) = res else { return };
887                let conn_id = next_conn_id();
888                inner.bus.send(ServerEvent::ConnectionAccepted {
889                    conn_id,
890                    role,
891                    local_addr: Some(addr),
892                });
893                // The embedded server is in-process only at this
894                // stage: the kernel-bound socket is reserved so
895                // post-bind reporting works, but the per-role
896                // protocol parser is not wired. Cross-process
897                // clients see open-then-immediate-close. Use
898                // `ServerHandle::inject_request` for in-process
899                // traffic; use the `dynomited` binary for the
900                // wire path.
901                tracing::warn!(
902                    listen = %addr,
903                    peer = %peer,
904                    role = ?role,
905                    conn_id,
906                    "embedded listen_addr accepted a connection; embedded mode does not yet \
907                     forward to the dispatcher; use ServerHandle::inject_request instead. \
908                     Closing connection."
909                );
910                let bus = inner.bus.clone();
911                let cancel = inner.cancel.clone();
912                tokio::spawn(async move {
913                    let _ = sock; // drop on close
914                    let close_reason = if cancel.is_cancelled() {
915                        CloseReason::LocalClose
916                    } else {
917                        CloseReason::PeerEof
918                    };
919                    bus.send(ServerEvent::ConnectionClosed { conn_id, reason: close_reason });
920                });
921            }
922        }
923    }
924}
925
926async fn stats_loop(inner: Arc<ServerInner>, interval_ms: Option<i64>) {
927    let interval =
928        Duration::from_millis(u64::try_from(interval_ms.unwrap_or(1_000)).unwrap_or(1_000));
929    let mut ticker = tokio::time::interval(interval);
930    ticker.set_missed_tick_behavior(MissedTickBehavior::Delay);
931    loop {
932        tokio::select! {
933            biased;
934            () = inner.cancel.cancelled() => return,
935            _ = ticker.tick() => {
936                let snap = inner.stats.snapshot();
937                *inner.snapshot_cache.lock() = snap;
938            }
939        }
940    }
941}
942
943async fn metrics_loop(inner: Arc<ServerInner>) {
944    let interval = inner.metrics.flush_interval();
945    let mut ticker = tokio::time::interval(interval.max(Duration::from_millis(50)));
946    ticker.set_missed_tick_behavior(MissedTickBehavior::Delay);
947    loop {
948        tokio::select! {
949            biased;
950            () = inner.cancel.cancelled() => return,
951            _ = ticker.tick() => {
952                let snap = inner.snapshot_cache.lock().clone();
953                let _ = inner.metrics.emit(&snap).await;
954            }
955        }
956    }
957}
958
959async fn gossip_loop(inner: Arc<ServerInner>, interval: Duration) {
960    let mut ticker = tokio::time::interval(interval.max(Duration::from_millis(20)));
961    ticker.set_missed_tick_behavior(MissedTickBehavior::Delay);
962    let mut round: u64 = 0;
963    let mut known: std::collections::HashSet<(String, u16)> = std::collections::HashSet::new();
964    {
965        let peers = inner.pool.peers().read();
966        for p in peers.iter() {
967            known.insert((p.endpoint().host().to_string(), p.endpoint().port()));
968        }
969    }
970    loop {
971        tokio::select! {
972            biased;
973            () = inner.cancel.cancelled() => return,
974            _ = ticker.tick() => {
975                let round_started = std::time::Instant::now();
976                let round_ts = std::time::SystemTime::now();
977                round += 1;
978                let seeds = inner.seeds.fetch().unwrap_or_default();
979                let mut added: u32 = 0;
980                {
981                    let mut peers = inner.pool.peers().write();
982                    let now_secs = std::time::SystemTime::now()
983                        .duration_since(std::time::UNIX_EPOCH)
984                        .map(|d| d.as_secs())
985                        .unwrap_or(0);
986                    let cfg = inner.pool.config().clone();
987                    for seed in &seeds {
988                        let key = (seed.host().to_string(), seed.port());
989                        if known.contains(&key) {
990                            continue;
991                        }
992                        let next_idx = u32::try_from(peers.len()).unwrap_or(u32::MAX);
993                        let tokens: Vec<DynToken> = seed
994                            .tokens()
995                            .components()
996                            .iter()
997                            .map(|c| DynToken::from_u32(c.digits.parse::<u32>().unwrap_or(0)))
998                            .collect();
999                        let mut p = Peer::new(
1000                            next_idx,
1001                            PeerEndpoint::tcp(seed.host().to_string(), seed.port()),
1002                            seed.rack().to_string(),
1003                            seed.dc().to_string(),
1004                            if tokens.is_empty() {
1005                                vec![DynToken::from_u32(0)]
1006                            } else {
1007                                tokens
1008                            },
1009                            false,
1010                            seed.dc() == cfg.dc,
1011                            false,
1012                        );
1013                        p.set_state(PeerState::Normal, now_secs);
1014                        peers.push(p);
1015                        known.insert(key);
1016                        added += 1;
1017                    }
1018                }
1019                if added > 0 {
1020                    // Rebuild the topology + ring under fresh
1021                    // datacenters so the dispatcher sees the new
1022                    // peers.
1023                    rebuild_topology(&inner.pool);
1024                    let peers_now = inner.pool.peers().read();
1025                    for (idx, p) in peers_now.iter().enumerate().rev().take(added as usize) {
1026                        let _ = idx;
1027                        inner.bus.send(ServerEvent::PeerUp(p.idx()));
1028                        inner.events.publish(crate::events::ClusterEvent::PeerUp {
1029                            peer_id: p.idx(),
1030                            dc: p.dc().to_string(),
1031                            ts: round_ts,
1032                        });
1033                    }
1034                    inner.events.publish(crate::events::ClusterEvent::RingChanged {
1035                        tag: "seed-discovery".to_string(),
1036                        ts: round_ts,
1037                    });
1038                }
1039                let count = u32::try_from(inner.pool.peers().read().len()).unwrap_or(u32::MAX);
1040                inner.bus.send(ServerEvent::GossipRound { round, peers: count });
1041                inner.events.publish(crate::events::ClusterEvent::GossipRoundComplete {
1042                    duration: round_started.elapsed(),
1043                    peers_seen: count as usize,
1044                    ts: round_ts,
1045                });
1046                inner.stats.pool_incr(PoolField::StatsCount);
1047            }
1048        }
1049    }
1050}
1051
1052fn rebuild_topology(pool: &Arc<ServerPool>) {
1053    use crate::cluster::Datacenter;
1054    let peers = pool.peers().read();
1055    let mut new_dcs: Vec<Datacenter> = Vec::new();
1056    for p in peers.iter() {
1057        let idx = if let Some(i) = new_dcs.iter().position(|d| d.name() == p.dc()) {
1058            i
1059        } else {
1060            new_dcs.push(Datacenter::new(p.dc().to_string()));
1061            new_dcs.len() - 1
1062        };
1063        new_dcs[idx].upsert_rack(p.rack().to_string());
1064    }
1065    drop(peers);
1066    {
1067        let mut dcs = pool.datacenters().write();
1068        *dcs = new_dcs;
1069    }
1070    pool.rebuild_ring();
1071    pool.preselect_remote_racks();
1072}
1073
1074// Internal helpers (`shutdown_signal`, `_instant_now`) removed:
1075// they were public-but-doc(hidden) escape hatches that risked
1076// SemVer commitments. The `oneshot` channel was not used
1077// outside the (deleted) test helper, and `Instant` was kept in
1078// scope only to silence an unused-import warning. Both have
1079// been dropped.