Skip to main content

dynomite/embed/
server.rs

1//! Embedding-API server runtime.
2//!
3//! [`Server`] owns the runtime data structures (cluster pool,
4//! dispatcher, stats, event bus, hooks). [`Server::start`] spawns
5//! the background tasks (gossip, stats aggregator, optional TCP
6//! listeners) on the current tokio runtime and returns a
7//! [`ServerHandle`].
8//!
9//! The handle exposes the public control surface documented in
10//! `docs/book/src/embedding/server.md`. It is `Clone + Send +
11//! Sync`; multiple consumers may hold one. Dropping the last
12//! handle does not shut the server down - only
13//! [`ServerHandle::shutdown`] does.
14
15use std::collections::HashMap;
16use std::net::SocketAddr;
17use std::sync::atomic::{AtomicU64, Ordering};
18use std::sync::{Arc, OnceLock, Weak};
19
20use parking_lot::Mutex;
21use tokio::net::TcpListener;
22use tokio::task::JoinHandle;
23use tokio::time::{Duration, MissedTickBehavior};
24use tokio_util::sync::CancellationToken;
25
26use crate::cluster::dispatch::{ClusterDispatcher, DispatchPlan};
27use crate::cluster::peer::{Peer, PeerEndpoint, PeerState};
28use crate::cluster::pool::{PoolConfig, ServerPool};
29use crate::conf::{ConfDynSeed, ConfPool, Config};
30use crate::embed::error::EmbedError;
31use crate::embed::events::{CloseReason, ConnRoleTag, EventBus, EventStream, ServerEvent};
32use crate::embed::hooks::{
33    CryptoProvider, Datastore, LoggingMetricsSink, MetricsSink, SeedsProvider,
34};
35use crate::embed::snapshots::{DatacenterSnapshot, PeerSnapshot, RingSnapshot};
36use crate::events::EventManager;
37use crate::hashkit::DynToken;
38use crate::msg::Msg;
39use crate::stats::{
40    describe_stats, MetricSpec, PoolField, PoolStats, ServerField, ServerStats, ServiceInfo,
41    Snapshot, Stats,
42};
43
44/// Bag of optional hook overrides supplied to a builder.
45///
46/// All fields are private so adding new hooks (for example, a
47/// `Box<dyn TransportListener>` slot) is not a SemVer-breaking
48/// change. Construct a `ServerHooks` only through
49/// [`crate::embed::ServerBuilder`] setters; read it back through
50/// the accessor methods below.
51#[non_exhaustive]
52pub struct ServerHooks {
53    pub(crate) datastore: Option<Box<dyn Datastore>>,
54    pub(crate) seeds: Option<Box<dyn SeedsProvider>>,
55    pub(crate) crypto: Option<Box<dyn CryptoProvider>>,
56    pub(crate) metrics: Option<Box<dyn MetricsSink>>,
57}
58
59impl ServerHooks {
60    /// Borrow the configured [`Datastore`], if any.
61    #[must_use]
62    pub fn datastore(&self) -> Option<&dyn Datastore> {
63        self.datastore.as_deref()
64    }
65
66    /// Borrow the configured [`SeedsProvider`], if any.
67    #[must_use]
68    pub fn seeds(&self) -> Option<&dyn SeedsProvider> {
69        self.seeds.as_deref()
70    }
71
72    /// Borrow the configured [`CryptoProvider`], if any.
73    #[must_use]
74    pub fn crypto(&self) -> Option<&dyn CryptoProvider> {
75        self.crypto.as_deref()
76    }
77
78    /// Borrow the configured [`MetricsSink`], if any.
79    #[must_use]
80    pub fn metrics(&self) -> Option<&dyn MetricsSink> {
81        self.metrics.as_deref()
82    }
83}
84
85impl std::fmt::Debug for ServerHooks {
86    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
87        f.debug_struct("ServerHooks")
88            .field("datastore", &self.datastore.is_some())
89            .field("seeds", &self.seeds.is_some())
90            .field("crypto", &self.crypto.is_some())
91            .field("metrics", &self.metrics.is_some())
92            .finish()
93    }
94}
95
96// -------- in-process registry -------------------------------------------
97
98/// Process-wide registry for in-process clusters.
99///
100/// When `inject_request` produces a `Replicas` plan that targets
101/// a peer co-located in the same process, the embed runtime
102/// looks up the target via this registry and forwards directly
103/// without the dnode wire path. This keeps the in-process tests
104/// from depending on real network I/O while preserving the
105/// production code path semantics (compute plan, deliver to
106/// target peers).
107fn registry() -> &'static Mutex<HashMap<SocketAddr, Weak<ServerInner>>> {
108    static R: OnceLock<Mutex<HashMap<SocketAddr, Weak<ServerInner>>>> = OnceLock::new();
109    R.get_or_init(|| Mutex::new(HashMap::new()))
110}
111
112fn registry_register(addr: SocketAddr, server: &Arc<ServerInner>) {
113    registry().lock().insert(addr, Arc::downgrade(server));
114}
115
116fn registry_remove(addr: SocketAddr) {
117    registry().lock().remove(&addr);
118}
119
120fn registry_lookup(addr: SocketAddr) -> Option<Arc<ServerInner>> {
121    registry().lock().get(&addr).and_then(Weak::upgrade)
122}
123
124// -------- inner state ----------------------------------------------------
125
126/// Generation counter for [`ServerEvent::ConfigReloaded`] /
127/// [`RingSnapshot`].
128fn next_generation() -> u64 {
129    static G: AtomicU64 = AtomicU64::new(0);
130    G.fetch_add(1, Ordering::Relaxed)
131}
132
133/// Connection-id allocator.
134fn next_conn_id() -> u64 {
135    static C: AtomicU64 = AtomicU64::new(0);
136    C.fetch_add(1, Ordering::Relaxed)
137}
138
139/// Inner state shared by every clone of a [`ServerHandle`].
140pub(crate) struct ServerInner {
141    pool: Arc<ServerPool>,
142    dispatcher: ClusterDispatcher,
143    stats: Arc<Stats>,
144    snapshot_cache: Arc<Mutex<Snapshot>>,
145    bus: EventBus,
146    events: Arc<EventManager>,
147    datastore: Box<dyn Datastore>,
148    seeds: Box<dyn SeedsProvider>,
149    metrics: Box<dyn MetricsSink>,
150    crypto: Option<Box<dyn CryptoProvider>>,
151    listen_addr: Option<SocketAddr>,
152    dyn_listen_addr: Option<SocketAddr>,
153    cancel: CancellationToken,
154    pool_name: String,
155    config: Mutex<ConfPool>,
156    generation: AtomicU64,
157    command_extension: Option<Arc<dyn crate::embed::CommandExtension>>,
158}
159
160impl std::fmt::Debug for ServerInner {
161    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
162        f.debug_struct("ServerInner")
163            .field("pool_name", &self.pool_name)
164            .field("listen", &self.listen_addr)
165            .field("dyn_listen", &self.dyn_listen_addr)
166            .finish_non_exhaustive()
167    }
168}
169
170impl ServerInner {
171    fn dispatch_local(&self, req: Msg) -> super::hooks::BoxFuture<'_, Result<Msg, EmbedError>> {
172        let fut = self.datastore.dispatch(req);
173        Box::pin(async move {
174            let rsp = fut.await.map_err(|e| EmbedError::Inject(e.to_string()))?;
175            Ok(rsp)
176        })
177    }
178}
179
180// -------- Server (configured-but-not-running) ----------------------------
181
182/// Configured-but-not-running server.
183///
184/// Build one with [`ServerBuilder`](crate::embed::ServerBuilder),
185/// then call [`Server::start`] to spawn the background tasks.
186#[derive(Debug)]
187pub struct Server {
188    pool_name: String,
189    pool: ConfPool,
190    cluster: Arc<ServerPool>,
191    hooks: ServerHooks,
192    stats: Arc<Stats>,
193    command_extension: Option<Arc<dyn crate::embed::CommandExtension>>,
194}
195
196impl Server {
197    /// Used internally by [`crate::embed::ServerBuilder::build`].
198    pub(crate) fn from_pool(
199        pool_name: String,
200        pool: ConfPool,
201        hooks: ServerHooks,
202        command_extension: Option<Arc<dyn crate::embed::CommandExtension>>,
203    ) -> Self {
204        let pool_cfg = PoolConfig::from_conf(&pool_name, &pool);
205        let local_peer = build_local_peer(&pool, &pool_cfg);
206        let mut peers = vec![local_peer];
207        if let Some(seeds) = pool.dyn_seeds.as_ref() {
208            let start = u32::try_from(peers.len()).unwrap_or(0);
209            peers.extend(peers_from_seeds(&pool_cfg, seeds, start));
210        }
211        let server_pool_arc = Arc::new(ServerPool::new(pool_cfg.clone(), peers));
212        server_pool_arc.preselect_remote_racks();
213
214        let stats = Arc::new(Stats::new(
215            ServiceInfo {
216                source: pool_cfg.name.clone(),
217                version: env!("CARGO_PKG_VERSION").to_string(),
218                rack: pool_cfg.rack.clone(),
219                dc: pool_cfg.dc.clone(),
220            },
221            PoolStats::new(&pool_cfg.name),
222            ServerStats::new("backend"),
223        ));
224
225        Self {
226            pool_name,
227            pool,
228            cluster: server_pool_arc,
229            hooks,
230            stats,
231            command_extension,
232        }
233    }
234
235    /// Borrow the [`crate::embed::CommandExtension`]
236    /// this server was built with, if any.
237    #[must_use]
238    pub fn command_extension(&self) -> Option<&Arc<dyn crate::embed::CommandExtension>> {
239        self.command_extension.as_ref()
240    }
241
242    /// The pool name configured in the YAML / builder.
243    #[must_use]
244    pub fn pool_name(&self) -> &str {
245        &self.pool_name
246    }
247
248    /// Spawn background tasks on the current tokio runtime and
249    /// return a [`ServerHandle`].
250    ///
251    /// `start` is non-blocking. The returned handle is `Clone +
252    /// Send + Sync`.
253    ///
254    /// # In-process only
255    ///
256    /// The embedded server in this stage is **in-process only**:
257    /// the `listen:` and `dyn_listen:` sockets bind so that
258    /// configured ports are reservable and post-bind reporting
259    /// works, but cross-process clients connecting to those
260    /// ports see open-then-immediate-close (with a runtime
261    /// warning logged on each accept). The sanctioned way to
262    /// drive an embedded `Server` from in-process code is
263    /// [`ServerHandle::inject_request`]. Cross-process traffic
264    /// is supported by the `dynomited` binary, which wires the
265    /// proxy module directly. Wiring the embedded accept loop
266    /// to the dispatcher is tracked as a follow-up; the contract
267    /// is documented in `docs/parity.md`.
268    ///
269    /// # Examples
270    ///
271    /// ```no_run
272    /// use dynomite::embed::ServerBuilder;
273    /// use dynomite::conf::DataStore;
274    /// # tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap().block_on(async {
275    /// let server = ServerBuilder::new("dyn_o_mite")
276    ///     .listen("127.0.0.1:0".parse().unwrap())
277    ///     .dyn_listen("127.0.0.1:0".parse().unwrap())
278    ///     .data_store(DataStore::Redis)
279    ///     .servers(vec![dynomite::conf::ConfServer::parse("127.0.0.1:6379:1").unwrap()])
280    ///     .tokens_str("0")
281    ///     .enable_gossip(false)
282    ///     .build()
283    ///     .unwrap();
284    /// let handle = server.start().await.unwrap();
285    /// handle.shutdown().await.unwrap();
286    /// # });
287    /// ```
288    pub async fn start(self) -> Result<ServerHandle, EmbedError> {
289        let Server {
290            pool_name,
291            pool,
292            cluster,
293            mut hooks,
294            stats,
295            command_extension,
296        } = self;
297
298        let mut dispatcher = ClusterDispatcher::new(cluster.clone());
299        if let Some(ext) = command_extension.as_ref() {
300            dispatcher = dispatcher.with_command_extension(ext.clone());
301        }
302        let bus = EventBus::new(64);
303        let events = Arc::new(EventManager::new(64));
304        let cancel = CancellationToken::new();
305        let snapshot_cache = Arc::new(Mutex::new(Snapshot::default()));
306
307        let datastore = hooks
308            .datastore
309            .take()
310            .expect("invariant: builder always populates datastore");
311        let seeds = hooks
312            .seeds
313            .take()
314            .expect("invariant: builder always populates seeds");
315        let metrics = hooks
316            .metrics
317            .take()
318            .unwrap_or_else(|| Box::new(LoggingMetricsSink::new(pool_name.clone())));
319
320        let (listen_listener, listen_addr) = bind_listener(pool.listen.as_ref()).await?;
321        let (dyn_listener, dyn_listen_addr) = bind_listener(pool.dyn_listen.as_ref()).await?;
322
323        let inner = Arc::new(ServerInner {
324            pool: cluster.clone(),
325            dispatcher,
326            stats: stats.clone(),
327            snapshot_cache: snapshot_cache.clone(),
328            bus: bus.clone(),
329            events: events.clone(),
330            datastore,
331            seeds,
332            metrics,
333            crypto: hooks.crypto,
334            listen_addr,
335            dyn_listen_addr,
336            cancel: cancel.clone(),
337            pool_name: pool_name.clone(),
338            config: Mutex::new(pool.clone()),
339            generation: AtomicU64::new(0),
340            command_extension,
341        });
342
343        // Register self in the in-process registry so peer
344        // forwarding can find this node.
345        if let Some(addr) = inner.dyn_listen_addr {
346            registry_register(addr, &inner);
347        }
348
349        // Spawn background tasks.
350        let mut tasks: Vec<JoinHandle<()>> = Vec::new();
351        tasks.push(tokio::spawn(stats_loop(inner.clone(), pool.stats_interval)));
352        tasks.push(tokio::spawn(metrics_loop(inner.clone())));
353        if pool.enable_gossip.unwrap_or(false) {
354            tasks.push(tokio::spawn(gossip_loop(
355                inner.clone(),
356                Duration::from_millis(
357                    u64::try_from(pool.gos_interval.unwrap_or(1_000)).unwrap_or(1_000),
358                ),
359            )));
360        }
361        if let (Some(listener), Some(addr)) = (listen_listener, listen_addr) {
362            tasks.push(tokio::spawn(accept_loop(
363                inner.clone(),
364                listener,
365                addr,
366                ConnRoleTag::Proxy,
367            )));
368        }
369        if let (Some(listener), Some(addr)) = (dyn_listener, dyn_listen_addr) {
370            tasks.push(tokio::spawn(accept_loop(
371                inner.clone(),
372                listener,
373                addr,
374                ConnRoleTag::DnodeProxy,
375            )));
376        }
377
378        Ok(ServerHandle {
379            inner,
380            tasks: Arc::new(Mutex::new(tasks)),
381        })
382    }
383}
384
385// -------- ServerHandle ---------------------------------------------------
386
387/// Cloneable handle to a running [`Server`].
388///
389/// The handle is the public control surface: shutdown, reload,
390/// stats, events, request injection, topology snapshots.
391#[derive(Clone)]
392pub struct ServerHandle {
393    inner: Arc<ServerInner>,
394    tasks: Arc<Mutex<Vec<JoinHandle<()>>>>,
395}
396
397impl std::fmt::Debug for ServerHandle {
398    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
399        f.debug_struct("ServerHandle")
400            .field("pool", &self.inner.pool_name)
401            .field("listen", &self.inner.listen_addr)
402            .field("dyn_listen", &self.inner.dyn_listen_addr)
403            .finish_non_exhaustive()
404    }
405}
406
407impl ServerHandle {
408    /// Borrow the configured [`CryptoProvider`], if one was
409    /// plugged. The default builder does not install one.
410    #[must_use]
411    pub fn crypto_provider(&self) -> Option<&dyn CryptoProvider> {
412        self.inner.crypto.as_deref()
413    }
414
415    /// Borrow the [`crate::embed::CommandExtension`] installed
416    /// at build time, if any.
417    #[must_use]
418    pub fn command_extension(&self) -> Option<Arc<dyn crate::embed::CommandExtension>> {
419        self.inner.command_extension.as_ref().map(Arc::clone)
420    }
421
422    /// Local listen address (post-bind), if a `listen:` was
423    /// configured.
424    #[must_use]
425    pub fn listen_addr(&self) -> Option<SocketAddr> {
426        self.inner.listen_addr
427    }
428
429    /// Local dnode listen address (post-bind), if a
430    /// `dyn_listen:` was configured.
431    #[must_use]
432    pub fn dyn_listen_addr(&self) -> Option<SocketAddr> {
433        self.inner.dyn_listen_addr
434    }
435
436    /// Subscribe to the [`ServerEvent`] broadcast.
437    ///
438    /// # Examples
439    ///
440    /// ```no_run
441    /// # use dynomite::embed::ServerBuilder;
442    /// # use dynomite::conf::DataStore;
443    /// # tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap().block_on(async {
444    /// let server = ServerBuilder::new("p")
445    ///     .listen("127.0.0.1:0".parse().unwrap())
446    ///     .dyn_listen("127.0.0.1:0".parse().unwrap())
447    ///     .data_store(DataStore::Redis)
448    ///     .servers(vec![dynomite::conf::ConfServer::parse("127.0.0.1:6379:1").unwrap()])
449    ///     .tokens_str("0")
450    ///     .build().unwrap();
451    /// let handle = server.start().await.unwrap();
452    /// let _events = handle.subscribe_events();
453    /// handle.shutdown().await.unwrap();
454    /// # });
455    /// ```
456    #[must_use]
457    pub fn subscribe_events(&self) -> EventStream {
458        self.inner.bus.subscribe()
459    }
460
461    /// Borrow the [`EventManager`] handle that publishes
462    /// structured cluster-event payloads (`PeerUp`, `PeerDown`,
463    /// `GossipRoundComplete`, etc.).
464    ///
465    /// Returned as `Arc` so embedders can hold a long-lived
466    /// reference and pass it across crate boundaries (notably to
467    /// `dyniak`, which publishes AAE start/complete events).
468    #[must_use]
469    pub fn events(&self) -> Arc<EventManager> {
470        Arc::clone(&self.inner.events)
471    }
472
473    /// Latest stats snapshot.
474    #[must_use]
475    pub fn stats(&self) -> Snapshot {
476        self.inner.stats.snapshot()
477    }
478
479    /// Borrow the live [`Stats`] aggregator as a shared handle.
480    ///
481    /// Returns the same `Arc<Stats>` the runtime publishes to,
482    /// so an embedder can read counters as they update without
483    /// going through the periodic snapshot path. Useful for
484    /// in-process metrics integrations that prefer a pull model
485    /// (Prometheus scrape, OpenTelemetry pull readers) over the
486    /// push-on-flush path exposed by [`MetricsSink`].
487    ///
488    /// The returned handle is `Clone`; callers may share it
489    /// across threads. The aggregator outlives the handle for as
490    /// long as the [`ServerHandle`] tree is alive.
491    ///
492    /// [`MetricsSink`]: crate::embed::MetricsSink
493    #[must_use]
494    pub fn stats_handle(&self) -> Arc<Stats> {
495        Arc::clone(&self.inner.stats)
496    }
497
498    /// Manifest of every metric the engine emits.
499    #[must_use]
500    pub fn describe_stats(&self) -> Vec<MetricSpec> {
501        let _ = describe_stats(); // ensure the module is exercised
502        let mut out: Vec<MetricSpec> = Vec::new();
503        out.extend(crate::stats::POOL_CODEC.iter().copied());
504        out.extend(crate::stats::SERVER_CODEC.iter().copied());
505        out
506    }
507
508    /// Snapshot of every peer in the cluster.
509    #[must_use]
510    pub fn peers(&self) -> Vec<PeerSnapshot> {
511        self.inner
512            .pool
513            .peers()
514            .read()
515            .iter()
516            .map(PeerSnapshot::from)
517            .collect()
518    }
519
520    /// Snapshot of every datacenter and its racks.
521    #[must_use]
522    pub fn datacenters(&self) -> Vec<DatacenterSnapshot> {
523        self.inner
524            .pool
525            .datacenters()
526            .read()
527            .iter()
528            .map(DatacenterSnapshot::from)
529            .collect()
530    }
531
532    /// Snapshot of the token ring.
533    #[must_use]
534    pub fn ring(&self) -> RingSnapshot {
535        let dcs = self.inner.pool.datacenters().read();
536        let mut entries: Vec<(DynToken, u32)> = Vec::new();
537        for dc in dcs.iter() {
538            for rack in dc.racks() {
539                for c in rack.continuums() {
540                    entries.push((c.token.clone(), c.peer_idx));
541                }
542            }
543        }
544        RingSnapshot {
545            entries,
546            generation: self.inner.generation.load(Ordering::Relaxed),
547        }
548    }
549
550    /// Inject a parsed request as if it had arrived from a
551    /// client. Returns the response message.
552    ///
553    /// This is the test/embedding entry point that bypasses the
554    /// proxy listener. The dispatcher computes a routing plan; if
555    /// the plan resolves to the local datastore, the
556    /// [`Datastore::dispatch`](crate::embed::hooks::Datastore::dispatch)
557    /// hook is invoked. If the plan resolves to a remote peer
558    /// co-located in the same process (via the in-process
559    /// registry), the request is forwarded to that peer's
560    /// datastore and its response is returned.
561    pub async fn inject_request(&self, req: Msg) -> Result<Msg, EmbedError> {
562        // Use the parsed key (if any) for routing; if there are
563        // no keys, the dispatcher's `LocalDatastore` short-circuit
564        // applies.
565        let key: Vec<u8> = req
566            .keys()
567            .first()
568            .map(|kp| kp.tag_bytes().to_vec())
569            .unwrap_or_default();
570        let plan = self.inner.dispatcher.plan(&req, &key);
571        match plan {
572            DispatchPlan::Drop => {
573                // Drop: respond with an empty message of the same
574                // id so callers can correlate.
575                let mut rsp = Msg::new(req.id(), crate::msg::MsgType::Unknown, false);
576                rsp.set_parent_id(req.id());
577                Ok(rsp)
578            }
579            DispatchPlan::NoTargets => Err(EmbedError::Inject(
580                "cluster has no quorum-eligible targets".into(),
581            )),
582            DispatchPlan::LocalDatastore => self.inner.dispatch_local(req).await,
583            DispatchPlan::Replicas { targets, .. } => {
584                self.inner.stats.pool_incr_by(PoolField::ForwardError, 0); // touch counter table
585                self.inner.stats.server_incr(ServerField::ReadRequests);
586                // Snapshot the targets' addresses outside any
587                // lock so we don't hold the peer-list read guard
588                // across the awaits below.
589                let resolved: Vec<(bool, Option<SocketAddr>)> = {
590                    let peers = self.inner.pool.peers().read();
591                    targets
592                        .iter()
593                        .map(|t| {
594                            let peer = peers.get(t.peer_idx as usize);
595                            let is_local = peer.is_some_and(crate::cluster::peer::Peer::is_local);
596                            let addr = peer.and_then(|p| p.endpoint().pname().parse().ok());
597                            (is_local, addr)
598                        })
599                        .collect()
600                };
601                let mut last_err: Option<EmbedError> = None;
602                for (is_local, addr) in resolved {
603                    if is_local {
604                        return self.inner.dispatch_local(req).await;
605                    }
606                    if let Some(addr) = addr {
607                        if let Some(remote) = registry_lookup(addr) {
608                            let mut forwarded = Msg::new(req.id(), req.ty(), true);
609                            forwarded.set_parent_id(req.parent_id());
610                            match remote.datastore.dispatch(forwarded).await {
611                                Ok(rsp) => return Ok(rsp),
612                                Err(e) => last_err = Some(EmbedError::Inject(e.to_string())),
613                            }
614                        }
615                    }
616                }
617                if let Some(e) = last_err {
618                    return Err(e);
619                }
620                // No registered remote peer; fall back to the
621                // local datastore so the embed surface stays
622                // useful in mixed deployments.
623                self.inner.dispatch_local(req).await
624            }
625        }
626    }
627
628    /// Reload the configuration. Validates the new [`Config`]
629    /// before any state is touched; on failure the running config
630    /// is left untouched.
631    ///
632    /// This is the in-process equivalent of SIGHUP. The C
633    /// reference's `dynomite_reload_conf` (in
634    /// `_/dynomite/src/dynomite.c`) re-reads the YAML from disk;
635    /// the embedding API takes the `Config` directly.
636    pub async fn reload(&self, mut cfg: Config) -> Result<(), EmbedError> {
637        cfg.finalize();
638        cfg.validate()?;
639        let new_pool = cfg.pool().clone();
640        // Update the stored pool so subsequent .stats() / .peers()
641        // calls reflect the live config. The dispatcher's pool
642        // structure is left untouched here; a future stage will
643        // rebuild it under a write lock.
644        *self.inner.config.lock() = new_pool;
645        let gen_id = next_generation();
646        self.inner.generation.store(gen_id, Ordering::Relaxed);
647        self.inner
648            .bus
649            .send(ServerEvent::ConfigReloaded { generation: gen_id });
650        // Yield once so the broadcast send is observable on the
651        // calling task's scheduler before the function returns.
652        tokio::task::yield_now().await;
653        Ok(())
654    }
655
656    /// Graceful shutdown.
657    ///
658    /// Cancels every background task, deregisters from the
659    /// in-process peer registry, drains the join set, and
660    /// returns. Idempotent: calling `shutdown` after a previous
661    /// `shutdown` (or after [`ServerHandle::join`] returned)
662    /// completes successfully without doing any work.
663    pub async fn shutdown(&self) -> Result<(), EmbedError> {
664        self.inner.cancel.cancel();
665        if let Some(addr) = self.inner.dyn_listen_addr {
666            registry_remove(addr);
667        }
668        let drained: Vec<JoinHandle<()>> = std::mem::take(&mut *self.tasks.lock());
669        for t in drained {
670            // Tasks observe `cancel` and return promptly.
671            let _ = t.await;
672        }
673        Ok(())
674    }
675
676    /// Wait for every background task to complete.
677    ///
678    /// Unlike [`ServerHandle::shutdown`], `join` does not request
679    /// cancellation: it parks the caller until the runtime tasks
680    /// (gossip, stats aggregator, accept loops, metrics flusher)
681    /// finish on their own. The intended use is the
682    /// "start-then-park" pattern in long-running embedders that
683    /// drive shutdown from a separate signal handler:
684    ///
685    /// ```no_run
686    /// # use dynomite::embed::ServerBuilder;
687    /// # use dynomite::conf::DataStore;
688    /// # tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap().block_on(async {
689    /// let handle = ServerBuilder::new("p")
690    ///     .listen("127.0.0.1:0".parse().unwrap())
691    ///     .dyn_listen("127.0.0.1:0".parse().unwrap())
692    ///     .data_store(DataStore::Redis)
693    ///     .servers(vec![dynomite::conf::ConfServer::parse("127.0.0.1:6379:1").unwrap()])
694    ///     .tokens_str("0")
695    ///     .build().unwrap()
696    ///     .start().await.unwrap();
697    /// let shutdown = handle.clone();
698    /// tokio::spawn(async move {
699    ///     // Replace with `tokio::signal::ctrl_c().await.unwrap();`
700    ///     // in a real embedder.
701    ///     shutdown.shutdown().await.unwrap();
702    /// });
703    /// handle.join().await;
704    /// # });
705    /// ```
706    ///
707    /// `join` resolves once every task spawned by
708    /// [`Server::start`] has returned. Calling `join` repeatedly
709    /// is safe; the second call returns immediately because the
710    /// task set is drained on the first.
711    pub async fn join(&self) {
712        let drained: Vec<JoinHandle<()>> = std::mem::take(&mut *self.tasks.lock());
713        for t in drained {
714            let _ = t.await;
715        }
716    }
717}
718
719// -------- helpers ---------------------------------------------------------
720
721fn build_local_peer(pool: &ConfPool, cfg: &PoolConfig) -> Peer {
722    let dyn_listen = pool.dyn_listen.as_ref().map_or_else(
723        || ("127.0.0.1".to_string(), 0u16),
724        |l| (l.name().to_string(), l.port()),
725    );
726    let tokens = pool
727        .tokens
728        .as_ref()
729        .map(|tl| {
730            tl.components()
731                .iter()
732                .map(|c| DynToken::from_u32(c.digits.parse::<u32>().unwrap_or(0)))
733                .collect::<Vec<_>>()
734        })
735        .unwrap_or_default();
736    let mut peer = Peer::new(
737        0,
738        PeerEndpoint::tcp(dyn_listen.0, dyn_listen.1),
739        cfg.rack.clone(),
740        cfg.dc.clone(),
741        if tokens.is_empty() {
742            vec![DynToken::from_u32(0)]
743        } else {
744            tokens
745        },
746        true,
747        true,
748        false,
749    );
750    let now_secs = std::time::SystemTime::now()
751        .duration_since(std::time::UNIX_EPOCH)
752        .map(|d| d.as_secs())
753        .unwrap_or(0);
754    peer.set_state(PeerState::Normal, now_secs);
755    peer
756}
757
758fn peers_from_seeds(cfg: &PoolConfig, seeds: &[ConfDynSeed], start_idx: u32) -> Vec<Peer> {
759    let now_secs = std::time::SystemTime::now()
760        .duration_since(std::time::UNIX_EPOCH)
761        .map(|d| d.as_secs())
762        .unwrap_or(0);
763    seeds
764        .iter()
765        .enumerate()
766        .map(|(i, s)| {
767            let tokens: Vec<DynToken> = s
768                .tokens()
769                .components()
770                .iter()
771                .map(|c| DynToken::from_u32(c.digits.parse::<u32>().unwrap_or(0)))
772                .collect();
773            let idx_off = u32::try_from(i).unwrap_or(0);
774            let mut p = Peer::new(
775                start_idx + idx_off,
776                PeerEndpoint::tcp(s.host().to_string(), s.port()),
777                s.rack().to_string(),
778                s.dc().to_string(),
779                if tokens.is_empty() {
780                    vec![DynToken::from_u32(0)]
781                } else {
782                    tokens
783                },
784                false,
785                s.dc() == cfg.dc,
786                false,
787            );
788            p.set_state(PeerState::Normal, now_secs);
789            p
790        })
791        .collect()
792}
793
794// Unused legacy helpers removed.
795
796async fn bind_listener(
797    listen: Option<&crate::conf::ConfListen>,
798) -> Result<(Option<TcpListener>, Option<SocketAddr>), EmbedError> {
799    let Some(l) = listen else {
800        return Ok((None, None));
801    };
802    let host = l.name();
803    let port = l.port();
804    if host.is_empty() {
805        return Ok((None, None));
806    }
807    let addr_str = format!("{host}:{port}");
808    let Ok(_addr) = addr_str.parse::<SocketAddr>() else {
809        return Ok((None, None));
810    };
811    let listener = TcpListener::bind(&addr_str).await?;
812    let local = listener.local_addr()?;
813    Ok((Some(listener), Some(local)))
814}
815
816// removed unused try_bind helper
817
818async fn accept_loop(
819    inner: Arc<ServerInner>,
820    listener: TcpListener,
821    addr: SocketAddr,
822    role: ConnRoleTag,
823) {
824    loop {
825        tokio::select! {
826            biased;
827            () = inner.cancel.cancelled() => return,
828            res = listener.accept() => {
829                let Ok((sock, peer)) = res else { return };
830                let conn_id = next_conn_id();
831                inner.bus.send(ServerEvent::ConnectionAccepted {
832                    conn_id,
833                    role,
834                    local_addr: Some(addr),
835                });
836                // The embedded server is in-process only at this
837                // stage: the kernel-bound socket is reserved so
838                // post-bind reporting works, but the per-role
839                // protocol parser is not wired. Cross-process
840                // clients see open-then-immediate-close. Use
841                // `ServerHandle::inject_request` for in-process
842                // traffic; use the `dynomited` binary for the
843                // wire path.
844                tracing::warn!(
845                    listen = %addr,
846                    peer = %peer,
847                    role = ?role,
848                    conn_id,
849                    "embedded listen_addr accepted a connection; embedded mode does not yet \
850                     forward to the dispatcher; use ServerHandle::inject_request instead. \
851                     Closing connection."
852                );
853                let bus = inner.bus.clone();
854                let cancel = inner.cancel.clone();
855                tokio::spawn(async move {
856                    let _ = sock; // drop on close
857                    let close_reason = if cancel.is_cancelled() {
858                        CloseReason::LocalClose
859                    } else {
860                        CloseReason::PeerEof
861                    };
862                    bus.send(ServerEvent::ConnectionClosed { conn_id, reason: close_reason });
863                });
864            }
865        }
866    }
867}
868
869async fn stats_loop(inner: Arc<ServerInner>, interval_ms: Option<i64>) {
870    let interval =
871        Duration::from_millis(u64::try_from(interval_ms.unwrap_or(1_000)).unwrap_or(1_000));
872    let mut ticker = tokio::time::interval(interval);
873    ticker.set_missed_tick_behavior(MissedTickBehavior::Delay);
874    loop {
875        tokio::select! {
876            biased;
877            () = inner.cancel.cancelled() => return,
878            _ = ticker.tick() => {
879                let snap = inner.stats.snapshot();
880                *inner.snapshot_cache.lock() = snap;
881            }
882        }
883    }
884}
885
886async fn metrics_loop(inner: Arc<ServerInner>) {
887    let interval = inner.metrics.flush_interval();
888    let mut ticker = tokio::time::interval(interval.max(Duration::from_millis(50)));
889    ticker.set_missed_tick_behavior(MissedTickBehavior::Delay);
890    loop {
891        tokio::select! {
892            biased;
893            () = inner.cancel.cancelled() => return,
894            _ = ticker.tick() => {
895                let snap = inner.snapshot_cache.lock().clone();
896                let _ = inner.metrics.emit(&snap).await;
897            }
898        }
899    }
900}
901
902async fn gossip_loop(inner: Arc<ServerInner>, interval: Duration) {
903    let mut ticker = tokio::time::interval(interval.max(Duration::from_millis(20)));
904    ticker.set_missed_tick_behavior(MissedTickBehavior::Delay);
905    let mut round: u64 = 0;
906    let mut known: std::collections::HashSet<(String, u16)> = std::collections::HashSet::new();
907    {
908        let peers = inner.pool.peers().read();
909        for p in peers.iter() {
910            known.insert((p.endpoint().host().to_string(), p.endpoint().port()));
911        }
912    }
913    loop {
914        tokio::select! {
915            biased;
916            () = inner.cancel.cancelled() => return,
917            _ = ticker.tick() => {
918                let round_started = std::time::Instant::now();
919                let round_ts = std::time::SystemTime::now();
920                round += 1;
921                let seeds = inner.seeds.fetch().unwrap_or_default();
922                let mut added: u32 = 0;
923                {
924                    let mut peers = inner.pool.peers().write();
925                    let now_secs = std::time::SystemTime::now()
926                        .duration_since(std::time::UNIX_EPOCH)
927                        .map(|d| d.as_secs())
928                        .unwrap_or(0);
929                    let cfg = inner.pool.config().clone();
930                    for seed in &seeds {
931                        let key = (seed.host().to_string(), seed.port());
932                        if known.contains(&key) {
933                            continue;
934                        }
935                        let next_idx = u32::try_from(peers.len()).unwrap_or(u32::MAX);
936                        let tokens: Vec<DynToken> = seed
937                            .tokens()
938                            .components()
939                            .iter()
940                            .map(|c| DynToken::from_u32(c.digits.parse::<u32>().unwrap_or(0)))
941                            .collect();
942                        let mut p = Peer::new(
943                            next_idx,
944                            PeerEndpoint::tcp(seed.host().to_string(), seed.port()),
945                            seed.rack().to_string(),
946                            seed.dc().to_string(),
947                            if tokens.is_empty() {
948                                vec![DynToken::from_u32(0)]
949                            } else {
950                                tokens
951                            },
952                            false,
953                            seed.dc() == cfg.dc,
954                            false,
955                        );
956                        p.set_state(PeerState::Normal, now_secs);
957                        peers.push(p);
958                        known.insert(key);
959                        added += 1;
960                    }
961                }
962                if added > 0 {
963                    // Rebuild the topology + ring under fresh
964                    // datacenters so the dispatcher sees the new
965                    // peers.
966                    rebuild_topology(&inner.pool);
967                    let peers_now = inner.pool.peers().read();
968                    for (idx, p) in peers_now.iter().enumerate().rev().take(added as usize) {
969                        let _ = idx;
970                        inner.bus.send(ServerEvent::PeerUp(p.idx()));
971                        inner.events.publish(crate::events::ClusterEvent::PeerUp {
972                            peer_id: p.idx(),
973                            dc: p.dc().to_string(),
974                            ts: round_ts,
975                        });
976                    }
977                    inner.events.publish(crate::events::ClusterEvent::RingChanged {
978                        tag: "seed-discovery".to_string(),
979                        ts: round_ts,
980                    });
981                }
982                let count = u32::try_from(inner.pool.peers().read().len()).unwrap_or(u32::MAX);
983                inner.bus.send(ServerEvent::GossipRound { round, peers: count });
984                inner.events.publish(crate::events::ClusterEvent::GossipRoundComplete {
985                    duration: round_started.elapsed(),
986                    peers_seen: count as usize,
987                    ts: round_ts,
988                });
989                inner.stats.pool_incr(PoolField::StatsCount);
990            }
991        }
992    }
993}
994
995fn rebuild_topology(pool: &Arc<ServerPool>) {
996    use crate::cluster::Datacenter;
997    let peers = pool.peers().read();
998    let mut new_dcs: Vec<Datacenter> = Vec::new();
999    for p in peers.iter() {
1000        let idx = if let Some(i) = new_dcs.iter().position(|d| d.name() == p.dc()) {
1001            i
1002        } else {
1003            new_dcs.push(Datacenter::new(p.dc().to_string()));
1004            new_dcs.len() - 1
1005        };
1006        new_dcs[idx].upsert_rack(p.rack().to_string());
1007    }
1008    drop(peers);
1009    {
1010        let mut dcs = pool.datacenters().write();
1011        *dcs = new_dcs;
1012    }
1013    pool.rebuild_ring();
1014    pool.preselect_remote_racks();
1015}
1016
1017// Internal helpers (`shutdown_signal`, `_instant_now`) removed:
1018// they were public-but-doc(hidden) escape hatches that risked
1019// SemVer commitments. The `oneshot` channel was not used
1020// outside the (deleted) test helper, and `Instant` was kept in
1021// scope only to silence an unused-import warning. Both have
1022// been dropped.