dynomite/embed/builder.rs
1//! Fluent builder for [`crate::embed::Server`].
2//!
3//! [`ServerBuilder`] mirrors every YAML-visible field on
4//! [`crate::conf::ConfPool`] with a typed setter and adds typed
5//! setters for the hook traits that have no YAML form.
6//!
7//! The builder is owned and chainable; `build()` validates the
8//! assembled config (the same [`crate::conf::Config::validate`]
9//! used for YAML) and returns a [`crate::embed::Server`].
10
11use std::net::SocketAddr;
12use std::path::Path;
13use std::time::Duration;
14
15use crate::conf::{
16 ConfDynSeed, ConfListen, ConfPool, ConfServer, Config, ConsistencyLevel, DataStore, HashType,
17 SecureServerOption, Servers, TokenList, Transport,
18};
19use crate::embed::error::EmbedError;
20use crate::embed::hooks::{
21 CryptoProvider, Datastore, MemoryDatastore, MetricsSink, SeedsProvider, SimpleSeedsProvider,
22};
23use crate::embed::server::{Server, ServerHooks};
24
25/// Fluent builder for [`Server`].
26///
27/// Every YAML-visible field on [`ConfPool`] has a setter on
28/// `ServerBuilder`; in addition, hook setters accept boxed trait
29/// objects to plug custom implementations.
30///
31/// # Examples
32///
33/// ```
34/// use dynomite::embed::ServerBuilder;
35/// use dynomite::conf::DataStore;
36/// let server = ServerBuilder::new("dyn_o_mite")
37/// .listen("127.0.0.1:18102".parse().unwrap())
38/// .dyn_listen("127.0.0.1:18101".parse().unwrap())
39/// .data_store(DataStore::Redis)
40/// .servers(vec![dynomite::conf::ConfServer::parse("127.0.0.1:6379:1").unwrap()])
41/// .tokens_str("0")
42/// .build()
43/// .unwrap();
44/// drop(server);
45/// ```
46pub struct ServerBuilder {
47 pool_name: String,
48 pool: ConfPool,
49 hooks: ServerHooks,
50 command_extension: Option<std::sync::Arc<dyn crate::embed::CommandExtension>>,
51}
52
53impl std::fmt::Debug for ServerBuilder {
54 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
55 f.debug_struct("ServerBuilder")
56 .field("pool_name", &self.pool_name)
57 .field("pool", &self.pool)
58 .finish_non_exhaustive()
59 }
60}
61
62impl Default for ServerBuilder {
63 /// Build a `ServerBuilder` for the canonical pool name
64 /// `"dyn_o_mite"`. Mirrors the design page's sketch of a
65 /// no-arg `Server::builder()` while keeping the typed
66 /// constructor (`new(pool_name)`) for ergonomics; the C
67 /// reference accepts only one pool per `Config`.
68 fn default() -> Self {
69 Self::new("dyn_o_mite")
70 }
71}
72
73impl ServerBuilder {
74 /// Build an empty `ServerBuilder` for a pool named `pool_name`.
75 ///
76 /// # Examples
77 ///
78 /// ```
79 /// use dynomite::embed::ServerBuilder;
80 /// let b = ServerBuilder::new("dyn_o_mite");
81 /// assert_eq!(b.pool_name(), "dyn_o_mite");
82 /// ```
83 pub fn new(pool_name: impl Into<String>) -> Self {
84 Self {
85 pool_name: pool_name.into(),
86 pool: ConfPool::default(),
87 hooks: ServerHooks::default(),
88 command_extension: None,
89 }
90 }
91
92 /// Build from an existing parsed [`Config`].
93 ///
94 /// # Examples
95 ///
96 /// ```
97 /// use dynomite::conf::Config;
98 /// use dynomite::embed::ServerBuilder;
99 /// let yaml = "p:\n listen: 127.0.0.1:1\n dyn_listen: 127.0.0.1:2\n tokens: '1'\n servers:\n - 127.0.0.1:3:1\n data_store: 0\n";
100 /// let cfg = Config::parse_str(yaml).unwrap();
101 /// let b = ServerBuilder::from_config(&cfg);
102 /// assert_eq!(b.pool_name(), "p");
103 /// ```
104 pub fn from_config(cfg: &Config) -> Self {
105 let pool_name = cfg.pool_name().to_string();
106 Self {
107 pool_name,
108 pool: cfg.pool().clone(),
109 hooks: ServerHooks::default(),
110 command_extension: None,
111 }
112 }
113
114 /// Build by parsing a YAML file.
115 ///
116 /// # Examples
117 ///
118 /// ```
119 /// use std::io::Write;
120 /// use dynomite::embed::ServerBuilder;
121 /// let mut f = tempfile::NamedTempFile::new().unwrap();
122 /// writeln!(f, "p:\n listen: 127.0.0.1:1\n dyn_listen: 127.0.0.1:2\n tokens: '1'\n servers:\n - 127.0.0.1:3:1\n data_store: 0").unwrap();
123 /// let b = ServerBuilder::from_yaml_file(f.path()).unwrap();
124 /// assert_eq!(b.pool_name(), "p");
125 /// ```
126 pub fn from_yaml_file(path: impl AsRef<Path>) -> Result<Self, EmbedError> {
127 let cfg = Config::parse_file(path.as_ref())?;
128 Ok(Self::from_config(&cfg))
129 }
130
131 /// Currently-configured pool name.
132 #[must_use]
133 pub fn pool_name(&self) -> &str {
134 &self.pool_name
135 }
136
137 /// Override the pool name supplied to [`Self::new`].
138 ///
139 /// The C reference accepts only one pool per `Config`, so the
140 /// pool name is part of the constructor for ergonomics. This
141 /// setter exists so callers that build a [`ServerBuilder`]
142 /// from a default-named template (`"dyn_o_mite"`) can rename
143 /// the pool without rebuilding the chain. Recorded as
144 /// Deviation in `docs/parity.md` (the design page sketches
145 /// a no-arg `Server::builder()`).
146 #[must_use]
147 pub fn with_pool_name(mut self, name: impl Into<String>) -> Self {
148 self.pool_name = name.into();
149 self
150 }
151
152 // ---- YAML-mirroring setters ------------------------------------------
153
154 /// `listen:` - client-facing listener address.
155 ///
156 /// Accepts port `0` (kernel-assigned ephemeral port). The
157 /// post-bind address is reported via
158 /// [`ServerHandle::listen_addr`](crate::embed::ServerHandle::listen_addr).
159 #[must_use]
160 pub fn listen(mut self, addr: SocketAddr) -> Self {
161 self.pool.listen = Some(ConfListen::from_socket_addr(addr));
162 self
163 }
164
165 /// `dyn_listen:` - peer-facing listener address.
166 ///
167 /// Accepts port `0` (kernel-assigned ephemeral port). The
168 /// post-bind address is reported via
169 /// [`ServerHandle::dyn_listen_addr`](crate::embed::ServerHandle::dyn_listen_addr).
170 #[must_use]
171 pub fn dyn_listen(mut self, addr: SocketAddr) -> Self {
172 self.pool.dyn_listen = Some(ConfListen::from_socket_addr(addr));
173 self
174 }
175
176 /// `stats_listen:` - HTTP stats listener address.
177 ///
178 /// Accepts port `0` for ephemeral-port semantics. Note that
179 /// the embedded server does not currently bind the stats
180 /// listener; the `dynomited` binary does. Embedders that want
181 /// a Prometheus-style scrape endpoint should plug a
182 /// [`MetricsSink`](crate::embed::MetricsSink) implementation
183 /// instead.
184 #[must_use]
185 pub fn stats_listen(mut self, addr: SocketAddr) -> Self {
186 self.pool.stats_listen = Some(ConfListen::from_socket_addr(addr));
187 self
188 }
189
190 /// `hash:` - hash algorithm.
191 #[must_use]
192 pub fn hash(mut self, h: HashType) -> Self {
193 self.pool.hash = Some(h);
194 self
195 }
196
197 /// `data_store:` - protocol selector.
198 #[must_use]
199 pub fn data_store(mut self, d: DataStore) -> Self {
200 self.pool.data_store = Some(d.as_int());
201 self
202 }
203
204 /// `read_consistency:` - quorum policy for reads.
205 #[must_use]
206 pub fn read_consistency(mut self, c: ConsistencyLevel) -> Self {
207 self.pool.read_consistency = Some(c.as_str().to_string());
208 self
209 }
210
211 /// `write_consistency:` - quorum policy for writes.
212 #[must_use]
213 pub fn write_consistency(mut self, c: ConsistencyLevel) -> Self {
214 self.pool.write_consistency = Some(c.as_str().to_string());
215 self
216 }
217
218 /// `secure_server_option:` - inter-node TLS mode.
219 #[must_use]
220 pub fn secure_server_option(mut self, opt: SecureServerOption) -> Self {
221 self.pool.secure_server_option = Some(opt.as_str().to_string());
222 self
223 }
224
225 /// `transport:` - select the proxy listener's network
226 /// stack. Defaults to [`Transport::Tcp`].
227 ///
228 /// `Transport::Quic` requires the engine's `quic` Cargo
229 /// feature; the validator rejects the selection without
230 /// matching `quic_cert_file:` and `quic_key_file:` paths
231 /// (see [`Self::quic_cert_file`] and [`Self::quic_key_file`]).
232 ///
233 /// # Examples
234 ///
235 /// ```
236 /// use dynomite::embed::ServerBuilder;
237 /// use dynomite::conf::Transport;
238 /// let _b = ServerBuilder::new("p").transport(Transport::Tcp);
239 /// ```
240 #[must_use]
241 pub fn transport(mut self, t: Transport) -> Self {
242 self.pool.transport = Some(t);
243 self
244 }
245
246 /// `quic_cert_file:` - PEM certificate chain path used by
247 /// the QUIC listener. Required when `transport: quic` is
248 /// selected; ignored under TCP.
249 #[must_use]
250 pub fn quic_cert_file(mut self, path: impl AsRef<Path>) -> Self {
251 self.pool.quic_cert_file = Some(path.as_ref().to_path_buf());
252 self
253 }
254
255 /// `quic_key_file:` - PEM private-key path matching
256 /// [`Self::quic_cert_file`].
257 #[must_use]
258 pub fn quic_key_file(mut self, path: impl AsRef<Path>) -> Self {
259 self.pool.quic_key_file = Some(path.as_ref().to_path_buf());
260 self
261 }
262
263 /// `pem_key_file:` - path to the PEM private key.
264 #[must_use]
265 pub fn pem_key_file(mut self, path: impl AsRef<Path>) -> Self {
266 self.pool.pem_key_file = Some(path.as_ref().to_string_lossy().into_owned());
267 self
268 }
269
270 /// `servers:` - the (single-element) datastore list.
271 #[must_use]
272 pub fn servers(mut self, servers: Vec<ConfServer>) -> Self {
273 self.pool.servers = Some(Servers::from_vec(servers));
274 self
275 }
276
277 /// `dyn_seeds:` - peer dynomite nodes.
278 #[must_use]
279 pub fn dyn_seeds(mut self, seeds: Vec<ConfDynSeed>) -> Self {
280 self.pool.dyn_seeds = Some(seeds);
281 self
282 }
283
284 /// `dyn_seed_provider:` - seeds backend selector.
285 #[must_use]
286 pub fn dyn_seed_provider(mut self, name: impl Into<String>) -> Self {
287 self.pool.dyn_seed_provider = Some(name.into());
288 self
289 }
290
291 /// `timeout:` - request timeout.
292 #[must_use]
293 pub fn timeout(mut self, d: Duration) -> Self {
294 let ms = i64::try_from(d.as_millis()).unwrap_or(i64::MAX);
295 self.pool.timeout = Some(ms);
296 self
297 }
298
299 /// `auto_eject_hosts:` - automatically eject failing peers.
300 #[must_use]
301 pub fn auto_eject_hosts(mut self, on: bool) -> Self {
302 self.pool.auto_eject_hosts = Some(on);
303 self
304 }
305
306 /// `server_failure_limit:` - consecutive failures before eject.
307 #[must_use]
308 pub fn server_failure_limit(mut self, n: u32) -> Self {
309 self.pool.server_failure_limit = Some(i64::from(n));
310 self
311 }
312
313 /// `server_retry_timeout:` - retry interval for ejected servers.
314 #[must_use]
315 pub fn server_retry_timeout(mut self, d: Duration) -> Self {
316 let ms = i64::try_from(d.as_millis()).unwrap_or(i64::MAX);
317 self.pool.server_retry_timeout = Some(ms);
318 self
319 }
320
321 /// `gos_interval:` - gossip period.
322 #[must_use]
323 pub fn gossip_interval(mut self, d: Duration) -> Self {
324 let ms = i64::try_from(d.as_millis()).unwrap_or(i64::MAX);
325 self.pool.gos_interval = Some(ms);
326 self
327 }
328
329 /// `enable_gossip:` - turn the gossip task on or off.
330 #[must_use]
331 pub fn enable_gossip(mut self, on: bool) -> Self {
332 self.pool.enable_gossip = Some(on);
333 self
334 }
335
336 /// `datacenter:` - this node's datacenter.
337 #[must_use]
338 pub fn datacenter(mut self, dc: impl Into<String>) -> Self {
339 self.pool.datacenter = Some(dc.into());
340 self
341 }
342
343 /// `rack:` - this node's rack.
344 #[must_use]
345 pub fn rack(mut self, rack: impl Into<String>) -> Self {
346 self.pool.rack = Some(rack.into());
347 self
348 }
349
350 /// `tokens:` - this node's tokens, parsed from the YAML
351 /// representation.
352 ///
353 /// Returns the builder unchanged when the tokens string fails
354 /// to parse so callers can chain. Parse failures emit a
355 /// `tracing::warn!` event so a typo in the token string is
356 /// observable in logs even though it does not break the
357 /// chain. Use [`ServerBuilder::tokens`] to set a pre-parsed
358 /// [`TokenList`] when you want a hard error on bad input.
359 ///
360 /// The silent-ignore-with-log behaviour is recorded as a
361 /// SemVer-major candidate in `docs/parity.md`; v0.2 may
362 /// switch to a `Result`-returning variant.
363 #[must_use]
364 pub fn tokens_str(mut self, raw: impl AsRef<str>) -> Self {
365 let raw = raw.as_ref();
366 match TokenList::parse(raw) {
367 Ok(t) => {
368 self.pool.tokens = Some(t);
369 }
370 Err(e) => {
371 tracing::warn!(
372 raw = %raw,
373 error = %e,
374 "ServerBuilder::tokens_str: parse failed; leaving tokens unchanged. \
375 Use ServerBuilder::tokens(TokenList) for a hard error on bad input."
376 );
377 }
378 }
379 self
380 }
381
382 /// `tokens:` - this node's tokens.
383 #[must_use]
384 pub fn tokens(mut self, tokens: TokenList) -> Self {
385 self.pool.tokens = Some(tokens);
386 self
387 }
388
389 /// `mbuf_size:` - mbuf chunk size in bytes.
390 #[must_use]
391 pub fn mbuf_size(mut self, n: usize) -> Self {
392 self.pool.mbuf_size = Some(i64::try_from(n).unwrap_or(i64::MAX));
393 self
394 }
395
396 /// `max_msgs:` - maximum allocated messages.
397 #[must_use]
398 pub fn max_msgs(mut self, n: usize) -> Self {
399 self.pool.max_msgs = Some(i64::try_from(n).unwrap_or(i64::MAX));
400 self
401 }
402
403 /// `read_repairs_enabled:` - enable read-repair on quorum
404 /// mismatch.
405 #[must_use]
406 pub fn read_repairs_enabled(mut self, on: bool) -> Self {
407 self.pool.read_repairs_enabled = Some(on);
408 self
409 }
410
411 /// `client_connections:` - client connection cap.
412 #[must_use]
413 pub fn client_connections(mut self, n: u32) -> Self {
414 self.pool.client_connections = Some(i64::from(n));
415 self
416 }
417
418 /// `datastore_connections:` - count of datastore-side
419 /// connections.
420 #[must_use]
421 pub fn datastore_connections(mut self, n: u8) -> Self {
422 self.pool.datastore_connections = Some(n);
423 self
424 }
425
426 /// `local_peer_connections:` - count of local-DC peer
427 /// connections.
428 #[must_use]
429 pub fn local_peer_connections(mut self, n: u8) -> Self {
430 self.pool.local_peer_connections = Some(n);
431 self
432 }
433
434 /// `remote_peer_connections:` - count of remote-DC peer
435 /// connections.
436 #[must_use]
437 pub fn remote_peer_connections(mut self, n: u8) -> Self {
438 self.pool.remote_peer_connections = Some(n);
439 self
440 }
441
442 /// `preconnect:` - eagerly establish connections at startup.
443 #[must_use]
444 pub fn preconnect(mut self, on: bool) -> Self {
445 self.pool.preconnect = Some(on);
446 self
447 }
448
449 /// `stats_interval:` - stats aggregation period.
450 #[must_use]
451 pub fn stats_interval(mut self, d: Duration) -> Self {
452 let ms = i64::try_from(d.as_millis()).unwrap_or(i64::MAX);
453 self.pool.stats_interval = Some(ms);
454 self
455 }
456
457 // ---- Hook setters ---------------------------------------------------
458
459 /// Plug a custom [`Datastore`].
460 #[must_use]
461 pub fn datastore(mut self, ds: Box<dyn Datastore>) -> Self {
462 self.hooks.datastore = Some(ds);
463 self
464 }
465
466 /// Plug a custom [`SeedsProvider`].
467 #[must_use]
468 pub fn seeds_provider(mut self, sp: Box<dyn SeedsProvider>) -> Self {
469 self.hooks.seeds = Some(sp);
470 self
471 }
472
473 /// Plug a custom [`CryptoProvider`].
474 #[must_use]
475 pub fn crypto_provider(mut self, cp: Box<dyn CryptoProvider>) -> Self {
476 self.hooks.crypto = Some(cp);
477 self
478 }
479
480 /// Plug a custom [`MetricsSink`].
481 #[must_use]
482 pub fn metrics_sink(mut self, ms: Box<dyn MetricsSink>) -> Self {
483 self.hooks.metrics = Some(ms);
484 self
485 }
486
487 /// Plug a [`crate::embed::CommandExtension`].
488 ///
489 /// The extension is consulted by the dispatcher in the
490 /// hot path: parsed FT.* requests and HSET requests are
491 /// offered to it before the routing planner runs. The
492 /// trait is part of the engine; the standard RediSearch
493 /// implementation lives in the `dynomite-search` crate.
494 /// Without this setter the dispatcher's behaviour is
495 /// unchanged from the substrate's stock behaviour.
496 ///
497 /// # Examples
498 ///
499 /// ```
500 /// use std::sync::Arc;
501 /// use dynomite::embed::{CommandExtension, HsetOutcome, ServerBuilder};
502 /// use dynomite::msg::MsgType;
503 /// #[derive(Debug)]
504 /// struct NoOp;
505 /// impl CommandExtension for NoOp {
506 /// fn handles_msg_type(&self, _: MsgType) -> bool { false }
507 /// fn try_dispatch(&self, _: &[&[u8]]) -> Option<Vec<u8>> { None }
508 /// }
509 /// let _b = ServerBuilder::new("p").with_command_extension(Arc::new(NoOp));
510 /// ```
511 #[must_use]
512 pub fn with_command_extension(
513 mut self,
514 ext: std::sync::Arc<dyn crate::embed::CommandExtension>,
515 ) -> Self {
516 self.command_extension = Some(ext);
517 self
518 }
519
520 /// Mutating equivalent of [`Self::with_command_extension`]
521 /// for callers (notably `dynomite-search::install`) that
522 /// want to attach an extension to a builder they hold by
523 /// `&mut`.
524 pub fn set_command_extension(
525 &mut self,
526 ext: std::sync::Arc<dyn crate::embed::CommandExtension>,
527 ) -> &mut Self {
528 self.command_extension = Some(ext);
529 self
530 }
531
532 /// Borrow the configured [`crate::embed::CommandExtension`],
533 /// if one was supplied.
534 #[must_use]
535 pub fn command_extension(&self) -> Option<&std::sync::Arc<dyn crate::embed::CommandExtension>> {
536 self.command_extension.as_ref()
537 }
538
539 // `conf_pool_mut` removed: the escape hatch leaked the
540 // entire internal `ConfPool` shape onto the public API.
541 // Targeted setters land as the missing fields are
542 // identified.
543
544 /// Build the [`Server`].
545 ///
546 /// Applies defaults to any unset field, runs the validation
547 /// pass, fills in any missing hooks with their in-crate
548 /// defaults, and constructs the runtime data structures
549 /// (cluster pool, dispatcher, stats, event bus). The returned
550 /// `Server` is not running yet; call
551 /// [`Server::start`](crate::embed::Server::start) to spawn
552 /// background tasks.
553 pub fn build(mut self) -> Result<Server, EmbedError> {
554 if self.pool.servers.is_none() {
555 // Provide a sentinel so validation passes for the
556 // common in-process case where the embedder plugs a
557 // custom Datastore.
558 self.pool.servers = Some(Servers::from_vec(vec![ConfServer::parse(
559 "127.0.0.1:1:1 stub",
560 )
561 .map_err(EmbedError::Conf)?]));
562 }
563 if self.pool.tokens.is_none() {
564 self.pool.tokens = Some(TokenList::parse("0").map_err(EmbedError::Conf)?);
565 }
566 // Apply defaults + validate directly on the pool.
567 let mut finalized = self.pool.clone();
568 finalized.apply_defaults();
569 finalized.validate(&self.pool_name)?;
570
571 let datastore = self
572 .hooks
573 .datastore
574 .unwrap_or_else(|| Box::new(MemoryDatastore::new()));
575 let seeds = self.hooks.seeds.unwrap_or_else(|| {
576 let raw = finalized
577 .dyn_seeds
578 .as_deref()
579 .map(<[_]>::to_vec)
580 .unwrap_or_default();
581 Box::new(SimpleSeedsProvider::new(raw))
582 });
583 let crypto = self.hooks.crypto;
584 let metrics = self.hooks.metrics;
585 let command_extension = self.command_extension;
586
587 Ok(Server::from_pool(
588 self.pool_name,
589 finalized,
590 ServerHooks {
591 datastore: Some(datastore),
592 seeds: Some(seeds),
593 crypto,
594 metrics,
595 },
596 command_extension,
597 ))
598 }
599}
600
601// `socket_to_listen` was the YAML-validating helper for the
602// `listen:` / `dyn_listen:` / `stats_listen:` setters. The
603// builder now calls `ConfListen::from_socket_addr` directly so
604// that port `0` (kernel-assigned ephemeral) round-trips
605// unchanged through the embed API.
606
607// (synthetic_config helper removed; ConfPool drives validation directly.)
608
609/// Internal convenience: container for hook overrides.
610impl Default for ServerHooks {
611 fn default() -> Self {
612 Self {
613 datastore: None,
614 seeds: None,
615 crypto: None,
616 metrics: None,
617 }
618 }
619}
620
621// `SharedDatastore` removed: it was a `#[doc(hidden)]` `pub`
622// alias with no in-tree consumer. Embedders that need a shared
623// handle build their own `Arc<dyn Datastore>` from a
624// `Box<dyn Datastore>` they construct.