Skip to main content

dynomite/embed/
builder.rs

1//! Fluent builder for [`crate::embed::Server`].
2//!
3//! [`ServerBuilder`] mirrors every YAML-visible field on
4//! [`crate::conf::ConfPool`] with a typed setter and adds typed
5//! setters for the hook traits that have no YAML form.
6//!
7//! The builder is owned and chainable; `build()` validates the
8//! assembled config (the same [`crate::conf::Config::validate`]
9//! used for YAML) and returns a [`crate::embed::Server`].
10
11use std::net::SocketAddr;
12use std::path::Path;
13use std::time::Duration;
14
15use crate::conf::{
16    ConfDynSeed, ConfListen, ConfPool, ConfServer, Config, ConsistencyLevel, DataStore, HashType,
17    SecureServerOption, Servers, TokenList, Transport,
18};
19use crate::embed::error::EmbedError;
20use crate::embed::hooks::{
21    CryptoProvider, Datastore, MemoryDatastore, MetricsSink, SeedsProvider, SimpleSeedsProvider,
22};
23use crate::embed::server::{Server, ServerHooks};
24
25/// Fluent builder for [`Server`].
26///
27/// Every YAML-visible field on [`ConfPool`] has a setter on
28/// `ServerBuilder`; in addition, hook setters accept boxed trait
29/// objects to plug custom implementations.
30///
31/// # Examples
32///
33/// ```
34/// use dynomite::embed::ServerBuilder;
35/// use dynomite::conf::DataStore;
36/// let server = ServerBuilder::new("dyn_o_mite")
37///     .listen("127.0.0.1:18102".parse().unwrap())
38///     .dyn_listen("127.0.0.1:18101".parse().unwrap())
39///     .data_store(DataStore::Redis)
40///     .servers(vec![dynomite::conf::ConfServer::parse("127.0.0.1:6379:1").unwrap()])
41///     .tokens_str("0")
42///     .build()
43///     .unwrap();
44/// drop(server);
45/// ```
46pub struct ServerBuilder {
47    pool_name: String,
48    pool: ConfPool,
49    hooks: ServerHooks,
50    command_extension: Option<std::sync::Arc<dyn crate::embed::CommandExtension>>,
51}
52
53impl std::fmt::Debug for ServerBuilder {
54    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
55        f.debug_struct("ServerBuilder")
56            .field("pool_name", &self.pool_name)
57            .field("pool", &self.pool)
58            .finish_non_exhaustive()
59    }
60}
61
62impl Default for ServerBuilder {
63    /// Build a `ServerBuilder` for the canonical pool name
64    /// `"dyn_o_mite"`. Mirrors the design page's sketch of a
65    /// no-arg `Server::builder()` while keeping the typed
66    /// constructor (`new(pool_name)`) for ergonomics; the C
67    /// reference accepts only one pool per `Config`.
68    fn default() -> Self {
69        Self::new("dyn_o_mite")
70    }
71}
72
73impl ServerBuilder {
74    /// Build an empty `ServerBuilder` for a pool named `pool_name`.
75    ///
76    /// # Examples
77    ///
78    /// ```
79    /// use dynomite::embed::ServerBuilder;
80    /// let b = ServerBuilder::new("dyn_o_mite");
81    /// assert_eq!(b.pool_name(), "dyn_o_mite");
82    /// ```
83    pub fn new(pool_name: impl Into<String>) -> Self {
84        Self {
85            pool_name: pool_name.into(),
86            pool: ConfPool::default(),
87            hooks: ServerHooks::default(),
88            command_extension: None,
89        }
90    }
91
92    /// Build from an existing parsed [`Config`].
93    ///
94    /// # Examples
95    ///
96    /// ```
97    /// use dynomite::conf::Config;
98    /// use dynomite::embed::ServerBuilder;
99    /// let yaml = "p:\n  listen: 127.0.0.1:1\n  dyn_listen: 127.0.0.1:2\n  tokens: '1'\n  servers:\n  - 127.0.0.1:3:1\n  data_store: 0\n";
100    /// let cfg = Config::parse_str(yaml).unwrap();
101    /// let b = ServerBuilder::from_config(&cfg);
102    /// assert_eq!(b.pool_name(), "p");
103    /// ```
104    pub fn from_config(cfg: &Config) -> Self {
105        let pool_name = cfg.pool_name().to_string();
106        Self {
107            pool_name,
108            pool: cfg.pool().clone(),
109            hooks: ServerHooks::default(),
110            command_extension: None,
111        }
112    }
113
114    /// Build by parsing a YAML file.
115    ///
116    /// # Examples
117    ///
118    /// ```
119    /// use std::io::Write;
120    /// use dynomite::embed::ServerBuilder;
121    /// let mut f = tempfile::NamedTempFile::new().unwrap();
122    /// writeln!(f, "p:\n  listen: 127.0.0.1:1\n  dyn_listen: 127.0.0.1:2\n  tokens: '1'\n  servers:\n  - 127.0.0.1:3:1\n  data_store: 0").unwrap();
123    /// let b = ServerBuilder::from_yaml_file(f.path()).unwrap();
124    /// assert_eq!(b.pool_name(), "p");
125    /// ```
126    pub fn from_yaml_file(path: impl AsRef<Path>) -> Result<Self, EmbedError> {
127        let cfg = Config::parse_file(path.as_ref())?;
128        Ok(Self::from_config(&cfg))
129    }
130
131    /// Currently-configured pool name.
132    #[must_use]
133    pub fn pool_name(&self) -> &str {
134        &self.pool_name
135    }
136
137    /// Override the pool name supplied to [`Self::new`].
138    ///
139    /// The C reference accepts only one pool per `Config`, so the
140    /// pool name is part of the constructor for ergonomics. This
141    /// setter exists so callers that build a [`ServerBuilder`]
142    /// from a default-named template (`"dyn_o_mite"`) can rename
143    /// the pool without rebuilding the chain. Recorded as
144    /// Deviation in `docs/parity.md` (the design page sketches
145    /// a no-arg `Server::builder()`).
146    #[must_use]
147    pub fn with_pool_name(mut self, name: impl Into<String>) -> Self {
148        self.pool_name = name.into();
149        self
150    }
151
152    // ---- YAML-mirroring setters ------------------------------------------
153
154    /// `listen:` - client-facing listener address.
155    ///
156    /// Accepts port `0` (kernel-assigned ephemeral port). The
157    /// post-bind address is reported via
158    /// [`ServerHandle::listen_addr`](crate::embed::ServerHandle::listen_addr).
159    #[must_use]
160    pub fn listen(mut self, addr: SocketAddr) -> Self {
161        self.pool.listen = Some(ConfListen::from_socket_addr(addr));
162        self
163    }
164
165    /// `dyn_listen:` - peer-facing listener address.
166    ///
167    /// Accepts port `0` (kernel-assigned ephemeral port). The
168    /// post-bind address is reported via
169    /// [`ServerHandle::dyn_listen_addr`](crate::embed::ServerHandle::dyn_listen_addr).
170    #[must_use]
171    pub fn dyn_listen(mut self, addr: SocketAddr) -> Self {
172        self.pool.dyn_listen = Some(ConfListen::from_socket_addr(addr));
173        self
174    }
175
176    /// `stats_listen:` - HTTP stats listener address.
177    ///
178    /// Accepts port `0` for ephemeral-port semantics. Note that
179    /// the embedded server does not currently bind the stats
180    /// listener; the `dynomited` binary does. Embedders that want
181    /// a Prometheus-style scrape endpoint should plug a
182    /// [`MetricsSink`](crate::embed::MetricsSink) implementation
183    /// instead.
184    #[must_use]
185    pub fn stats_listen(mut self, addr: SocketAddr) -> Self {
186        self.pool.stats_listen = Some(ConfListen::from_socket_addr(addr));
187        self
188    }
189
190    /// `hash:` - hash algorithm.
191    #[must_use]
192    pub fn hash(mut self, h: HashType) -> Self {
193        self.pool.hash = Some(h);
194        self
195    }
196
197    /// `data_store:` - protocol selector.
198    #[must_use]
199    pub fn data_store(mut self, d: DataStore) -> Self {
200        self.pool.data_store = Some(d.as_int());
201        self
202    }
203
204    /// `read_consistency:` - quorum policy for reads.
205    #[must_use]
206    pub fn read_consistency(mut self, c: ConsistencyLevel) -> Self {
207        self.pool.read_consistency = Some(c.as_str().to_string());
208        self
209    }
210
211    /// `write_consistency:` - quorum policy for writes.
212    #[must_use]
213    pub fn write_consistency(mut self, c: ConsistencyLevel) -> Self {
214        self.pool.write_consistency = Some(c.as_str().to_string());
215        self
216    }
217
218    /// `secure_server_option:` - inter-node TLS mode.
219    #[must_use]
220    pub fn secure_server_option(mut self, opt: SecureServerOption) -> Self {
221        self.pool.secure_server_option = Some(opt.as_str().to_string());
222        self
223    }
224
225    /// `transport:` - select the proxy listener's network
226    /// stack. Defaults to [`Transport::Tcp`].
227    ///
228    /// `Transport::Quic` requires the engine's `quic` Cargo
229    /// feature; the validator rejects the selection without
230    /// matching `quic_cert_file:` and `quic_key_file:` paths
231    /// (see [`Self::quic_cert_file`] and [`Self::quic_key_file`]).
232    ///
233    /// # Examples
234    ///
235    /// ```
236    /// use dynomite::embed::ServerBuilder;
237    /// use dynomite::conf::Transport;
238    /// let _b = ServerBuilder::new("p").transport(Transport::Tcp);
239    /// ```
240    #[must_use]
241    pub fn transport(mut self, t: Transport) -> Self {
242        self.pool.transport = Some(t);
243        self
244    }
245
246    /// `quic_cert_file:` - PEM certificate chain path used by
247    /// the QUIC listener. Required when `transport: quic` is
248    /// selected; ignored under TCP.
249    #[must_use]
250    pub fn quic_cert_file(mut self, path: impl AsRef<Path>) -> Self {
251        self.pool.quic_cert_file = Some(path.as_ref().to_path_buf());
252        self
253    }
254
255    /// `quic_key_file:` - PEM private-key path matching
256    /// [`Self::quic_cert_file`].
257    #[must_use]
258    pub fn quic_key_file(mut self, path: impl AsRef<Path>) -> Self {
259        self.pool.quic_key_file = Some(path.as_ref().to_path_buf());
260        self
261    }
262
263    /// `pem_key_file:` - path to the PEM private key.
264    #[must_use]
265    pub fn pem_key_file(mut self, path: impl AsRef<Path>) -> Self {
266        self.pool.pem_key_file = Some(path.as_ref().to_string_lossy().into_owned());
267        self
268    }
269
270    /// `servers:` - the (single-element) datastore list.
271    #[must_use]
272    pub fn servers(mut self, servers: Vec<ConfServer>) -> Self {
273        self.pool.servers = Some(Servers::from_vec(servers));
274        self
275    }
276
277    /// `dyn_seeds:` - peer dynomite nodes.
278    #[must_use]
279    pub fn dyn_seeds(mut self, seeds: Vec<ConfDynSeed>) -> Self {
280        self.pool.dyn_seeds = Some(seeds);
281        self
282    }
283
284    /// `dyn_seed_provider:` - seeds backend selector.
285    #[must_use]
286    pub fn dyn_seed_provider(mut self, name: impl Into<String>) -> Self {
287        self.pool.dyn_seed_provider = Some(name.into());
288        self
289    }
290
291    /// `timeout:` - request timeout.
292    #[must_use]
293    pub fn timeout(mut self, d: Duration) -> Self {
294        let ms = i64::try_from(d.as_millis()).unwrap_or(i64::MAX);
295        self.pool.timeout = Some(ms);
296        self
297    }
298
299    /// `auto_eject_hosts:` - automatically eject failing peers.
300    #[must_use]
301    pub fn auto_eject_hosts(mut self, on: bool) -> Self {
302        self.pool.auto_eject_hosts = Some(on);
303        self
304    }
305
306    /// `server_failure_limit:` - consecutive failures before eject.
307    #[must_use]
308    pub fn server_failure_limit(mut self, n: u32) -> Self {
309        self.pool.server_failure_limit = Some(i64::from(n));
310        self
311    }
312
313    /// `server_retry_timeout:` - retry interval for ejected servers.
314    #[must_use]
315    pub fn server_retry_timeout(mut self, d: Duration) -> Self {
316        let ms = i64::try_from(d.as_millis()).unwrap_or(i64::MAX);
317        self.pool.server_retry_timeout = Some(ms);
318        self
319    }
320
321    /// `gos_interval:` - gossip period.
322    #[must_use]
323    pub fn gossip_interval(mut self, d: Duration) -> Self {
324        let ms = i64::try_from(d.as_millis()).unwrap_or(i64::MAX);
325        self.pool.gos_interval = Some(ms);
326        self
327    }
328
329    /// `enable_gossip:` - turn the gossip task on or off.
330    #[must_use]
331    pub fn enable_gossip(mut self, on: bool) -> Self {
332        self.pool.enable_gossip = Some(on);
333        self
334    }
335
336    /// `datacenter:` - this node's datacenter.
337    #[must_use]
338    pub fn datacenter(mut self, dc: impl Into<String>) -> Self {
339        self.pool.datacenter = Some(dc.into());
340        self
341    }
342
343    /// `rack:` - this node's rack.
344    #[must_use]
345    pub fn rack(mut self, rack: impl Into<String>) -> Self {
346        self.pool.rack = Some(rack.into());
347        self
348    }
349
350    /// `tokens:` - this node's tokens, parsed from the YAML
351    /// representation.
352    ///
353    /// Returns the builder unchanged when the tokens string fails
354    /// to parse so callers can chain. Parse failures emit a
355    /// `tracing::warn!` event so a typo in the token string is
356    /// observable in logs even though it does not break the
357    /// chain. Use [`ServerBuilder::tokens`] to set a pre-parsed
358    /// [`TokenList`] when you want a hard error on bad input.
359    ///
360    /// The silent-ignore-with-log behaviour is recorded as a
361    /// SemVer-major candidate in `docs/parity.md`; v0.2 may
362    /// switch to a `Result`-returning variant.
363    #[must_use]
364    pub fn tokens_str(mut self, raw: impl AsRef<str>) -> Self {
365        let raw = raw.as_ref();
366        match TokenList::parse(raw) {
367            Ok(t) => {
368                self.pool.tokens = Some(t);
369            }
370            Err(e) => {
371                tracing::warn!(
372                    raw = %raw,
373                    error = %e,
374                    "ServerBuilder::tokens_str: parse failed; leaving tokens unchanged. \
375                     Use ServerBuilder::tokens(TokenList) for a hard error on bad input."
376                );
377            }
378        }
379        self
380    }
381
382    /// `tokens:` - this node's tokens.
383    #[must_use]
384    pub fn tokens(mut self, tokens: TokenList) -> Self {
385        self.pool.tokens = Some(tokens);
386        self
387    }
388
389    /// `mbuf_size:` - mbuf chunk size in bytes.
390    #[must_use]
391    pub fn mbuf_size(mut self, n: usize) -> Self {
392        self.pool.mbuf_size = Some(i64::try_from(n).unwrap_or(i64::MAX));
393        self
394    }
395
396    /// `max_msgs:` - maximum allocated messages.
397    #[must_use]
398    pub fn max_msgs(mut self, n: usize) -> Self {
399        self.pool.max_msgs = Some(i64::try_from(n).unwrap_or(i64::MAX));
400        self
401    }
402
403    /// `read_repairs_enabled:` - enable read-repair on quorum
404    /// mismatch.
405    #[must_use]
406    pub fn read_repairs_enabled(mut self, on: bool) -> Self {
407        self.pool.read_repairs_enabled = Some(on);
408        self
409    }
410
411    /// `client_connections:` - client connection cap.
412    #[must_use]
413    pub fn client_connections(mut self, n: u32) -> Self {
414        self.pool.client_connections = Some(i64::from(n));
415        self
416    }
417
418    /// `datastore_connections:` - count of datastore-side
419    /// connections.
420    #[must_use]
421    pub fn datastore_connections(mut self, n: u8) -> Self {
422        self.pool.datastore_connections = Some(n);
423        self
424    }
425
426    /// `local_peer_connections:` - count of local-DC peer
427    /// connections.
428    #[must_use]
429    pub fn local_peer_connections(mut self, n: u8) -> Self {
430        self.pool.local_peer_connections = Some(n);
431        self
432    }
433
434    /// `remote_peer_connections:` - count of remote-DC peer
435    /// connections.
436    #[must_use]
437    pub fn remote_peer_connections(mut self, n: u8) -> Self {
438        self.pool.remote_peer_connections = Some(n);
439        self
440    }
441
442    /// `preconnect:` - eagerly establish connections at startup.
443    #[must_use]
444    pub fn preconnect(mut self, on: bool) -> Self {
445        self.pool.preconnect = Some(on);
446        self
447    }
448
449    /// `stats_interval:` - stats aggregation period.
450    #[must_use]
451    pub fn stats_interval(mut self, d: Duration) -> Self {
452        let ms = i64::try_from(d.as_millis()).unwrap_or(i64::MAX);
453        self.pool.stats_interval = Some(ms);
454        self
455    }
456
457    // ---- Hook setters ---------------------------------------------------
458
459    /// Plug a custom [`Datastore`].
460    #[must_use]
461    pub fn datastore(mut self, ds: Box<dyn Datastore>) -> Self {
462        self.hooks.datastore = Some(ds);
463        self
464    }
465
466    /// Plug a custom [`SeedsProvider`].
467    #[must_use]
468    pub fn seeds_provider(mut self, sp: Box<dyn SeedsProvider>) -> Self {
469        self.hooks.seeds = Some(sp);
470        self
471    }
472
473    /// Plug a custom [`CryptoProvider`].
474    #[must_use]
475    pub fn crypto_provider(mut self, cp: Box<dyn CryptoProvider>) -> Self {
476        self.hooks.crypto = Some(cp);
477        self
478    }
479
480    /// Plug a custom [`MetricsSink`].
481    #[must_use]
482    pub fn metrics_sink(mut self, ms: Box<dyn MetricsSink>) -> Self {
483        self.hooks.metrics = Some(ms);
484        self
485    }
486
487    /// Plug a [`crate::embed::CommandExtension`].
488    ///
489    /// The extension is consulted by the dispatcher in the
490    /// hot path: parsed FT.* requests and HSET requests are
491    /// offered to it before the routing planner runs. The
492    /// trait is part of the engine; the standard RediSearch
493    /// implementation lives in the `dynomite-search` crate.
494    /// Without this setter the dispatcher's behaviour is
495    /// unchanged from the substrate's stock behaviour.
496    ///
497    /// # Examples
498    ///
499    /// ```
500    /// use std::sync::Arc;
501    /// use dynomite::embed::{CommandExtension, HsetOutcome, ServerBuilder};
502    /// use dynomite::msg::MsgType;
503    /// #[derive(Debug)]
504    /// struct NoOp;
505    /// impl CommandExtension for NoOp {
506    ///     fn handles_msg_type(&self, _: MsgType) -> bool { false }
507    ///     fn try_dispatch(&self, _: &[&[u8]]) -> Option<Vec<u8>> { None }
508    /// }
509    /// let _b = ServerBuilder::new("p").with_command_extension(Arc::new(NoOp));
510    /// ```
511    #[must_use]
512    pub fn with_command_extension(
513        mut self,
514        ext: std::sync::Arc<dyn crate::embed::CommandExtension>,
515    ) -> Self {
516        self.command_extension = Some(ext);
517        self
518    }
519
520    /// Mutating equivalent of [`Self::with_command_extension`]
521    /// for callers (notably `dynomite-search::install`) that
522    /// want to attach an extension to a builder they hold by
523    /// `&mut`.
524    pub fn set_command_extension(
525        &mut self,
526        ext: std::sync::Arc<dyn crate::embed::CommandExtension>,
527    ) -> &mut Self {
528        self.command_extension = Some(ext);
529        self
530    }
531
532    /// Borrow the configured [`crate::embed::CommandExtension`],
533    /// if one was supplied.
534    #[must_use]
535    pub fn command_extension(&self) -> Option<&std::sync::Arc<dyn crate::embed::CommandExtension>> {
536        self.command_extension.as_ref()
537    }
538
539    // `conf_pool_mut` removed: the escape hatch leaked the
540    // entire internal `ConfPool` shape onto the public API.
541    // Targeted setters land as the missing fields are
542    // identified.
543
544    /// Build the [`Server`].
545    ///
546    /// Applies defaults to any unset field, runs the validation
547    /// pass, fills in any missing hooks with their in-crate
548    /// defaults, and constructs the runtime data structures
549    /// (cluster pool, dispatcher, stats, event bus). The returned
550    /// `Server` is not running yet; call
551    /// [`Server::start`](crate::embed::Server::start) to spawn
552    /// background tasks.
553    pub fn build(mut self) -> Result<Server, EmbedError> {
554        if self.pool.servers.is_none() {
555            // Provide a sentinel so validation passes for the
556            // common in-process case where the embedder plugs a
557            // custom Datastore.
558            self.pool.servers = Some(Servers::from_vec(vec![ConfServer::parse(
559                "127.0.0.1:1:1 stub",
560            )
561            .map_err(EmbedError::Conf)?]));
562        }
563        if self.pool.tokens.is_none() {
564            self.pool.tokens = Some(TokenList::parse("0").map_err(EmbedError::Conf)?);
565        }
566        // Apply defaults + validate directly on the pool.
567        let mut finalized = self.pool.clone();
568        finalized.apply_defaults();
569        finalized.validate(&self.pool_name)?;
570
571        let datastore = self
572            .hooks
573            .datastore
574            .unwrap_or_else(|| Box::new(MemoryDatastore::new()));
575        let seeds = self.hooks.seeds.unwrap_or_else(|| {
576            let raw = finalized
577                .dyn_seeds
578                .as_deref()
579                .map(<[_]>::to_vec)
580                .unwrap_or_default();
581            Box::new(SimpleSeedsProvider::new(raw))
582        });
583        let crypto = self.hooks.crypto;
584        let metrics = self.hooks.metrics;
585        let command_extension = self.command_extension;
586
587        Ok(Server::from_pool(
588            self.pool_name,
589            finalized,
590            ServerHooks {
591                datastore: Some(datastore),
592                seeds: Some(seeds),
593                crypto,
594                metrics,
595            },
596            command_extension,
597        ))
598    }
599}
600
601// `socket_to_listen` was the YAML-validating helper for the
602// `listen:` / `dyn_listen:` / `stats_listen:` setters. The
603// builder now calls `ConfListen::from_socket_addr` directly so
604// that port `0` (kernel-assigned ephemeral) round-trips
605// unchanged through the embed API.
606
607// (synthetic_config helper removed; ConfPool drives validation directly.)
608
609/// Internal convenience: container for hook overrides.
610impl Default for ServerHooks {
611    fn default() -> Self {
612        Self {
613            datastore: None,
614            seeds: None,
615            crypto: None,
616            metrics: None,
617        }
618    }
619}
620
621// `SharedDatastore` removed: it was a `#[doc(hidden)]` `pub`
622// alias with no in-tree consumer. Embedders that need a shared
623// handle build their own `Arc<dyn Datastore>` from a
624// `Box<dyn Datastore>` they construct.