Skip to main content

dynomite/embed/
builder.rs

1//! Fluent builder for [`crate::embed::Server`].
2//!
3//! [`ServerBuilder`] mirrors every YAML-visible field on
4//! [`crate::conf::ConfPool`] with a typed setter and adds typed
5//! setters for the hook traits that have no YAML form.
6//!
7//! The builder is owned and chainable; `build()` validates the
8//! assembled config (the same [`crate::conf::Config::validate`]
9//! used for YAML) and returns a [`crate::embed::Server`].
10
11use std::net::SocketAddr;
12use std::path::Path;
13use std::time::Duration;
14
15use crate::conf::{
16    ConfDynSeed, ConfListen, ConfPool, ConfServer, Config, ConsistencyLevel, DataStore, HashType,
17    SecureServerOption, Servers, TokenList,
18};
19use crate::embed::error::EmbedError;
20use crate::embed::hooks::{
21    CryptoProvider, Datastore, MemoryDatastore, MetricsSink, SeedsProvider, SimpleSeedsProvider,
22};
23use crate::embed::server::{Server, ServerHooks};
24
25/// Fluent builder for [`Server`].
26///
27/// Every YAML-visible field on [`ConfPool`] has a setter on
28/// `ServerBuilder`; in addition, hook setters accept boxed trait
29/// objects to plug custom implementations.
30///
31/// # Examples
32///
33/// ```
34/// use dynomite::embed::ServerBuilder;
35/// use dynomite::conf::DataStore;
36/// let server = ServerBuilder::new("dyn_o_mite")
37///     .listen("127.0.0.1:18102".parse().unwrap())
38///     .dyn_listen("127.0.0.1:18101".parse().unwrap())
39///     .data_store(DataStore::Redis)
40///     .servers(vec![dynomite::conf::ConfServer::parse("127.0.0.1:6379:1").unwrap()])
41///     .tokens_str("0")
42///     .build()
43///     .unwrap();
44/// drop(server);
45/// ```
46pub struct ServerBuilder {
47    pool_name: String,
48    pool: ConfPool,
49    hooks: ServerHooks,
50    command_extension: Option<std::sync::Arc<dyn crate::embed::CommandExtension>>,
51}
52
53impl std::fmt::Debug for ServerBuilder {
54    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
55        f.debug_struct("ServerBuilder")
56            .field("pool_name", &self.pool_name)
57            .field("pool", &self.pool)
58            .finish_non_exhaustive()
59    }
60}
61
62impl Default for ServerBuilder {
63    /// Build a `ServerBuilder` for the canonical pool name
64    /// `"dyn_o_mite"`. Mirrors the design page's sketch of a
65    /// no-arg `Server::builder()` while keeping the typed
66    /// constructor (`new(pool_name)`) for ergonomics; the C
67    /// reference accepts only one pool per `Config`.
68    fn default() -> Self {
69        Self::new("dyn_o_mite")
70    }
71}
72
73impl ServerBuilder {
74    /// Build an empty `ServerBuilder` for a pool named `pool_name`.
75    ///
76    /// # Examples
77    ///
78    /// ```
79    /// use dynomite::embed::ServerBuilder;
80    /// let b = ServerBuilder::new("dyn_o_mite");
81    /// assert_eq!(b.pool_name(), "dyn_o_mite");
82    /// ```
83    pub fn new(pool_name: impl Into<String>) -> Self {
84        Self {
85            pool_name: pool_name.into(),
86            pool: ConfPool::default(),
87            hooks: ServerHooks::default(),
88            command_extension: None,
89        }
90    }
91
92    /// Build from an existing parsed [`Config`].
93    ///
94    /// # Examples
95    ///
96    /// ```
97    /// use dynomite::conf::Config;
98    /// use dynomite::embed::ServerBuilder;
99    /// let yaml = "p:\n  listen: 127.0.0.1:1\n  dyn_listen: 127.0.0.1:2\n  tokens: '1'\n  servers:\n  - 127.0.0.1:3:1\n  data_store: 0\n";
100    /// let cfg = Config::parse_str(yaml).unwrap();
101    /// let b = ServerBuilder::from_config(&cfg);
102    /// assert_eq!(b.pool_name(), "p");
103    /// ```
104    pub fn from_config(cfg: &Config) -> Self {
105        let pool_name = cfg.pool_name().to_string();
106        Self {
107            pool_name,
108            pool: cfg.pool().clone(),
109            hooks: ServerHooks::default(),
110            command_extension: None,
111        }
112    }
113
114    /// Build by parsing a YAML file.
115    ///
116    /// # Examples
117    ///
118    /// ```
119    /// use std::io::Write;
120    /// use dynomite::embed::ServerBuilder;
121    /// let mut f = tempfile::NamedTempFile::new().unwrap();
122    /// writeln!(f, "p:\n  listen: 127.0.0.1:1\n  dyn_listen: 127.0.0.1:2\n  tokens: '1'\n  servers:\n  - 127.0.0.1:3:1\n  data_store: 0").unwrap();
123    /// let b = ServerBuilder::from_yaml_file(f.path()).unwrap();
124    /// assert_eq!(b.pool_name(), "p");
125    /// ```
126    pub fn from_yaml_file(path: impl AsRef<Path>) -> Result<Self, EmbedError> {
127        let cfg = Config::parse_file(path.as_ref())?;
128        Ok(Self::from_config(&cfg))
129    }
130
131    /// Currently-configured pool name.
132    #[must_use]
133    pub fn pool_name(&self) -> &str {
134        &self.pool_name
135    }
136
137    /// Override the pool name supplied to [`Self::new`].
138    ///
139    /// The C reference accepts only one pool per `Config`, so the
140    /// pool name is part of the constructor for ergonomics. This
141    /// setter exists so callers that build a [`ServerBuilder`]
142    /// from a default-named template (`"dyn_o_mite"`) can rename
143    /// the pool without rebuilding the chain. Recorded as
144    /// Deviation in `docs/parity.md` (the design page sketches
145    /// a no-arg `Server::builder()`).
146    #[must_use]
147    pub fn with_pool_name(mut self, name: impl Into<String>) -> Self {
148        self.pool_name = name.into();
149        self
150    }
151
152    // ---- YAML-mirroring setters ------------------------------------------
153
154    /// `listen:` - client-facing listener address.
155    ///
156    /// Accepts port `0` (kernel-assigned ephemeral port). The
157    /// post-bind address is reported via
158    /// [`ServerHandle::listen_addr`](crate::embed::ServerHandle::listen_addr).
159    #[must_use]
160    pub fn listen(mut self, addr: SocketAddr) -> Self {
161        self.pool.listen = Some(ConfListen::from_socket_addr(addr));
162        self
163    }
164
165    /// `dyn_listen:` - peer-facing listener address.
166    ///
167    /// Accepts port `0` (kernel-assigned ephemeral port). The
168    /// post-bind address is reported via
169    /// [`ServerHandle::dyn_listen_addr`](crate::embed::ServerHandle::dyn_listen_addr).
170    #[must_use]
171    pub fn dyn_listen(mut self, addr: SocketAddr) -> Self {
172        self.pool.dyn_listen = Some(ConfListen::from_socket_addr(addr));
173        self
174    }
175
176    /// `stats_listen:` - HTTP stats listener address.
177    ///
178    /// Accepts port `0` for ephemeral-port semantics. Note that
179    /// the embedded server does not currently bind the stats
180    /// listener; the `dynomited` binary does. Embedders that want
181    /// a Prometheus-style scrape endpoint should plug a
182    /// [`MetricsSink`](crate::embed::MetricsSink) implementation
183    /// instead.
184    #[must_use]
185    pub fn stats_listen(mut self, addr: SocketAddr) -> Self {
186        self.pool.stats_listen = Some(ConfListen::from_socket_addr(addr));
187        self
188    }
189
190    /// `hash:` - hash algorithm.
191    #[must_use]
192    pub fn hash(mut self, h: HashType) -> Self {
193        self.pool.hash = Some(h);
194        self
195    }
196
197    /// `data_store:` - protocol selector.
198    #[must_use]
199    pub fn data_store(mut self, d: DataStore) -> Self {
200        self.pool.data_store = Some(d.as_int());
201        self
202    }
203
204    /// `read_consistency:` - quorum policy for reads.
205    #[must_use]
206    pub fn read_consistency(mut self, c: ConsistencyLevel) -> Self {
207        self.pool.read_consistency = Some(c.as_str().to_string());
208        self
209    }
210
211    /// `write_consistency:` - quorum policy for writes.
212    #[must_use]
213    pub fn write_consistency(mut self, c: ConsistencyLevel) -> Self {
214        self.pool.write_consistency = Some(c.as_str().to_string());
215        self
216    }
217
218    /// `secure_server_option:` - inter-node TLS mode.
219    #[must_use]
220    pub fn secure_server_option(mut self, opt: SecureServerOption) -> Self {
221        self.pool.secure_server_option = Some(opt.as_str().to_string());
222        self
223    }
224
225    /// `pem_key_file:` - path to the PEM private key.
226    #[must_use]
227    pub fn pem_key_file(mut self, path: impl AsRef<Path>) -> Self {
228        self.pool.pem_key_file = Some(path.as_ref().to_string_lossy().into_owned());
229        self
230    }
231
232    /// `servers:` - the (single-element) datastore list.
233    #[must_use]
234    pub fn servers(mut self, servers: Vec<ConfServer>) -> Self {
235        self.pool.servers = Some(Servers::from_vec(servers));
236        self
237    }
238
239    /// `dyn_seeds:` - peer dynomite nodes.
240    #[must_use]
241    pub fn dyn_seeds(mut self, seeds: Vec<ConfDynSeed>) -> Self {
242        self.pool.dyn_seeds = Some(seeds);
243        self
244    }
245
246    /// `dyn_seed_provider:` - seeds backend selector.
247    #[must_use]
248    pub fn dyn_seed_provider(mut self, name: impl Into<String>) -> Self {
249        self.pool.dyn_seed_provider = Some(name.into());
250        self
251    }
252
253    /// `timeout:` - request timeout.
254    #[must_use]
255    pub fn timeout(mut self, d: Duration) -> Self {
256        let ms = i64::try_from(d.as_millis()).unwrap_or(i64::MAX);
257        self.pool.timeout = Some(ms);
258        self
259    }
260
261    /// `auto_eject_hosts:` - automatically eject failing peers.
262    #[must_use]
263    pub fn auto_eject_hosts(mut self, on: bool) -> Self {
264        self.pool.auto_eject_hosts = Some(on);
265        self
266    }
267
268    /// `server_failure_limit:` - consecutive failures before eject.
269    #[must_use]
270    pub fn server_failure_limit(mut self, n: u32) -> Self {
271        self.pool.server_failure_limit = Some(i64::from(n));
272        self
273    }
274
275    /// `server_retry_timeout:` - retry interval for ejected servers.
276    #[must_use]
277    pub fn server_retry_timeout(mut self, d: Duration) -> Self {
278        let ms = i64::try_from(d.as_millis()).unwrap_or(i64::MAX);
279        self.pool.server_retry_timeout = Some(ms);
280        self
281    }
282
283    /// `gos_interval:` - gossip period.
284    #[must_use]
285    pub fn gossip_interval(mut self, d: Duration) -> Self {
286        let ms = i64::try_from(d.as_millis()).unwrap_or(i64::MAX);
287        self.pool.gos_interval = Some(ms);
288        self
289    }
290
291    /// `enable_gossip:` - turn the gossip task on or off.
292    #[must_use]
293    pub fn enable_gossip(mut self, on: bool) -> Self {
294        self.pool.enable_gossip = Some(on);
295        self
296    }
297
298    /// `datacenter:` - this node's datacenter.
299    #[must_use]
300    pub fn datacenter(mut self, dc: impl Into<String>) -> Self {
301        self.pool.datacenter = Some(dc.into());
302        self
303    }
304
305    /// `rack:` - this node's rack.
306    #[must_use]
307    pub fn rack(mut self, rack: impl Into<String>) -> Self {
308        self.pool.rack = Some(rack.into());
309        self
310    }
311
312    /// `tokens:` - this node's tokens, parsed from the YAML
313    /// representation.
314    ///
315    /// Returns the builder unchanged when the tokens string fails
316    /// to parse so callers can chain. Parse failures emit a
317    /// `tracing::warn!` event so a typo in the token string is
318    /// observable in logs even though it does not break the
319    /// chain. Use [`ServerBuilder::tokens`] to set a pre-parsed
320    /// [`TokenList`] when you want a hard error on bad input.
321    ///
322    /// The silent-ignore-with-log behaviour is recorded as a
323    /// SemVer-major candidate in `docs/parity.md`; v0.2 may
324    /// switch to a `Result`-returning variant.
325    #[must_use]
326    pub fn tokens_str(mut self, raw: impl AsRef<str>) -> Self {
327        let raw = raw.as_ref();
328        match TokenList::parse(raw) {
329            Ok(t) => {
330                self.pool.tokens = Some(t);
331            }
332            Err(e) => {
333                tracing::warn!(
334                    raw = %raw,
335                    error = %e,
336                    "ServerBuilder::tokens_str: parse failed; leaving tokens unchanged. \
337                     Use ServerBuilder::tokens(TokenList) for a hard error on bad input."
338                );
339            }
340        }
341        self
342    }
343
344    /// `tokens:` - this node's tokens.
345    #[must_use]
346    pub fn tokens(mut self, tokens: TokenList) -> Self {
347        self.pool.tokens = Some(tokens);
348        self
349    }
350
351    /// `mbuf_size:` - mbuf chunk size in bytes.
352    #[must_use]
353    pub fn mbuf_size(mut self, n: usize) -> Self {
354        self.pool.mbuf_size = Some(i64::try_from(n).unwrap_or(i64::MAX));
355        self
356    }
357
358    /// `max_msgs:` - maximum allocated messages.
359    #[must_use]
360    pub fn max_msgs(mut self, n: usize) -> Self {
361        self.pool.max_msgs = Some(i64::try_from(n).unwrap_or(i64::MAX));
362        self
363    }
364
365    /// `read_repairs_enabled:` - enable read-repair on quorum
366    /// mismatch.
367    #[must_use]
368    pub fn read_repairs_enabled(mut self, on: bool) -> Self {
369        self.pool.read_repairs_enabled = Some(on);
370        self
371    }
372
373    /// `client_connections:` - client connection cap.
374    #[must_use]
375    pub fn client_connections(mut self, n: u32) -> Self {
376        self.pool.client_connections = Some(i64::from(n));
377        self
378    }
379
380    /// `datastore_connections:` - count of datastore-side
381    /// connections.
382    #[must_use]
383    pub fn datastore_connections(mut self, n: u8) -> Self {
384        self.pool.datastore_connections = Some(n);
385        self
386    }
387
388    /// `local_peer_connections:` - count of local-DC peer
389    /// connections.
390    #[must_use]
391    pub fn local_peer_connections(mut self, n: u8) -> Self {
392        self.pool.local_peer_connections = Some(n);
393        self
394    }
395
396    /// `remote_peer_connections:` - count of remote-DC peer
397    /// connections.
398    #[must_use]
399    pub fn remote_peer_connections(mut self, n: u8) -> Self {
400        self.pool.remote_peer_connections = Some(n);
401        self
402    }
403
404    /// `preconnect:` - eagerly establish connections at startup.
405    #[must_use]
406    pub fn preconnect(mut self, on: bool) -> Self {
407        self.pool.preconnect = Some(on);
408        self
409    }
410
411    /// `stats_interval:` - stats aggregation period.
412    #[must_use]
413    pub fn stats_interval(mut self, d: Duration) -> Self {
414        let ms = i64::try_from(d.as_millis()).unwrap_or(i64::MAX);
415        self.pool.stats_interval = Some(ms);
416        self
417    }
418
419    // ---- Hook setters ---------------------------------------------------
420
421    /// Plug a custom [`Datastore`].
422    #[must_use]
423    pub fn datastore(mut self, ds: Box<dyn Datastore>) -> Self {
424        self.hooks.datastore = Some(ds);
425        self
426    }
427
428    /// Plug a custom [`SeedsProvider`].
429    #[must_use]
430    pub fn seeds_provider(mut self, sp: Box<dyn SeedsProvider>) -> Self {
431        self.hooks.seeds = Some(sp);
432        self
433    }
434
435    /// Plug a custom [`CryptoProvider`].
436    #[must_use]
437    pub fn crypto_provider(mut self, cp: Box<dyn CryptoProvider>) -> Self {
438        self.hooks.crypto = Some(cp);
439        self
440    }
441
442    /// Plug a custom [`MetricsSink`].
443    #[must_use]
444    pub fn metrics_sink(mut self, ms: Box<dyn MetricsSink>) -> Self {
445        self.hooks.metrics = Some(ms);
446        self
447    }
448
449    /// Plug a [`crate::embed::CommandExtension`].
450    ///
451    /// The extension is consulted by the dispatcher in the
452    /// hot path: parsed FT.* requests and HSET requests are
453    /// offered to it before the routing planner runs. The
454    /// trait is part of the engine; the standard RediSearch
455    /// implementation lives in the `dynomite-search` crate.
456    /// Without this setter the dispatcher's behaviour is
457    /// unchanged from the substrate's stock behaviour.
458    ///
459    /// # Examples
460    ///
461    /// ```
462    /// use std::sync::Arc;
463    /// use dynomite::embed::{CommandExtension, HsetOutcome, ServerBuilder};
464    /// use dynomite::msg::MsgType;
465    /// #[derive(Debug)]
466    /// struct NoOp;
467    /// impl CommandExtension for NoOp {
468    ///     fn handles_msg_type(&self, _: MsgType) -> bool { false }
469    ///     fn try_dispatch(&self, _: &[&[u8]]) -> Option<Vec<u8>> { None }
470    /// }
471    /// let _b = ServerBuilder::new("p").with_command_extension(Arc::new(NoOp));
472    /// ```
473    #[must_use]
474    pub fn with_command_extension(
475        mut self,
476        ext: std::sync::Arc<dyn crate::embed::CommandExtension>,
477    ) -> Self {
478        self.command_extension = Some(ext);
479        self
480    }
481
482    /// Mutating equivalent of [`Self::with_command_extension`]
483    /// for callers (notably `dynomite-search::install`) that
484    /// want to attach an extension to a builder they hold by
485    /// `&mut`.
486    pub fn set_command_extension(
487        &mut self,
488        ext: std::sync::Arc<dyn crate::embed::CommandExtension>,
489    ) -> &mut Self {
490        self.command_extension = Some(ext);
491        self
492    }
493
494    /// Borrow the configured [`crate::embed::CommandExtension`],
495    /// if one was supplied.
496    #[must_use]
497    pub fn command_extension(&self) -> Option<&std::sync::Arc<dyn crate::embed::CommandExtension>> {
498        self.command_extension.as_ref()
499    }
500
501    // `conf_pool_mut` removed: the escape hatch leaked the
502    // entire internal `ConfPool` shape onto the public API.
503    // Targeted setters land as the missing fields are
504    // identified.
505
506    /// Build the [`Server`].
507    ///
508    /// Applies defaults to any unset field, runs the validation
509    /// pass, fills in any missing hooks with their in-crate
510    /// defaults, and constructs the runtime data structures
511    /// (cluster pool, dispatcher, stats, event bus). The returned
512    /// `Server` is not running yet; call
513    /// [`Server::start`](crate::embed::Server::start) to spawn
514    /// background tasks.
515    pub fn build(mut self) -> Result<Server, EmbedError> {
516        if self.pool.servers.is_none() {
517            // Provide a sentinel so validation passes for the
518            // common in-process case where the embedder plugs a
519            // custom Datastore.
520            self.pool.servers = Some(Servers::from_vec(vec![ConfServer::parse(
521                "127.0.0.1:1:1 stub",
522            )
523            .map_err(EmbedError::Conf)?]));
524        }
525        if self.pool.tokens.is_none() {
526            self.pool.tokens = Some(TokenList::parse("0").map_err(EmbedError::Conf)?);
527        }
528        // Apply defaults + validate directly on the pool.
529        let mut finalized = self.pool.clone();
530        finalized.apply_defaults();
531        finalized.validate(&self.pool_name)?;
532
533        let datastore = self
534            .hooks
535            .datastore
536            .unwrap_or_else(|| Box::new(MemoryDatastore::new()));
537        let seeds = self.hooks.seeds.unwrap_or_else(|| {
538            let raw = finalized
539                .dyn_seeds
540                .as_deref()
541                .map(<[_]>::to_vec)
542                .unwrap_or_default();
543            Box::new(SimpleSeedsProvider::new(raw))
544        });
545        let crypto = self.hooks.crypto;
546        let metrics = self.hooks.metrics;
547        let command_extension = self.command_extension;
548
549        Ok(Server::from_pool(
550            self.pool_name,
551            finalized,
552            ServerHooks {
553                datastore: Some(datastore),
554                seeds: Some(seeds),
555                crypto,
556                metrics,
557            },
558            command_extension,
559        ))
560    }
561}
562
563// `socket_to_listen` was the YAML-validating helper for the
564// `listen:` / `dyn_listen:` / `stats_listen:` setters. The
565// builder now calls `ConfListen::from_socket_addr` directly so
566// that port `0` (kernel-assigned ephemeral) round-trips
567// unchanged through the embed API.
568
569// (synthetic_config helper removed; ConfPool drives validation directly.)
570
571/// Internal convenience: container for hook overrides.
572impl Default for ServerHooks {
573    fn default() -> Self {
574        Self {
575            datastore: None,
576            seeds: None,
577            crypto: None,
578            metrics: None,
579        }
580    }
581}
582
583// `SharedDatastore` removed: it was a `#[doc(hidden)]` `pub`
584// alias with no in-tree consumer. Embedders that need a shared
585// handle build their own `Arc<dyn Datastore>` from a
586// `Box<dyn Datastore>` they construct.