dynomite/embed/builder.rs
1//! Fluent builder for [`crate::embed::Server`].
2//!
3//! [`ServerBuilder`] mirrors every YAML-visible field on
4//! [`crate::conf::ConfPool`] with a typed setter and adds typed
5//! setters for the hook traits that have no YAML form.
6//!
7//! The builder is owned and chainable; `build()` validates the
8//! assembled config (the same [`crate::conf::Config::validate`]
9//! used for YAML) and returns a [`crate::embed::Server`].
10
11use std::net::SocketAddr;
12use std::path::Path;
13use std::time::Duration;
14
15use crate::conf::{
16 ConfDynSeed, ConfListen, ConfPool, ConfServer, Config, ConsistencyLevel, DataStore, HashType,
17 SecureServerOption, Servers, TokenList,
18};
19use crate::embed::error::EmbedError;
20use crate::embed::hooks::{
21 CryptoProvider, Datastore, MemoryDatastore, MetricsSink, SeedsProvider, SimpleSeedsProvider,
22};
23use crate::embed::server::{Server, ServerHooks};
24
25/// Fluent builder for [`Server`].
26///
27/// Every YAML-visible field on [`ConfPool`] has a setter on
28/// `ServerBuilder`; in addition, hook setters accept boxed trait
29/// objects to plug custom implementations.
30///
31/// # Examples
32///
33/// ```
34/// use dynomite::embed::ServerBuilder;
35/// use dynomite::conf::DataStore;
36/// let server = ServerBuilder::new("dyn_o_mite")
37/// .listen("127.0.0.1:18102".parse().unwrap())
38/// .dyn_listen("127.0.0.1:18101".parse().unwrap())
39/// .data_store(DataStore::Redis)
40/// .servers(vec![dynomite::conf::ConfServer::parse("127.0.0.1:6379:1").unwrap()])
41/// .tokens_str("0")
42/// .build()
43/// .unwrap();
44/// drop(server);
45/// ```
46pub struct ServerBuilder {
47 pool_name: String,
48 pool: ConfPool,
49 hooks: ServerHooks,
50 vector_registry: Option<std::sync::Arc<crate::vector::registry::VectorRegistry>>,
51}
52
53impl std::fmt::Debug for ServerBuilder {
54 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
55 f.debug_struct("ServerBuilder")
56 .field("pool_name", &self.pool_name)
57 .field("pool", &self.pool)
58 .finish_non_exhaustive()
59 }
60}
61
62impl Default for ServerBuilder {
63 /// Build a `ServerBuilder` for the canonical pool name
64 /// `"dyn_o_mite"`. Mirrors the design page's sketch of a
65 /// no-arg `Server::builder()` while keeping the typed
66 /// constructor (`new(pool_name)`) for ergonomics; the C
67 /// reference accepts only one pool per `Config`.
68 fn default() -> Self {
69 Self::new("dyn_o_mite")
70 }
71}
72
73impl ServerBuilder {
74 /// Build an empty `ServerBuilder` for a pool named `pool_name`.
75 ///
76 /// # Examples
77 ///
78 /// ```
79 /// use dynomite::embed::ServerBuilder;
80 /// let b = ServerBuilder::new("dyn_o_mite");
81 /// assert_eq!(b.pool_name(), "dyn_o_mite");
82 /// ```
83 pub fn new(pool_name: impl Into<String>) -> Self {
84 Self {
85 pool_name: pool_name.into(),
86 pool: ConfPool::default(),
87 hooks: ServerHooks::default(),
88 vector_registry: None,
89 }
90 }
91
92 /// Build from an existing parsed [`Config`].
93 ///
94 /// # Examples
95 ///
96 /// ```
97 /// use dynomite::conf::Config;
98 /// use dynomite::embed::ServerBuilder;
99 /// let yaml = "p:\n listen: 127.0.0.1:1\n dyn_listen: 127.0.0.1:2\n tokens: '1'\n servers:\n - 127.0.0.1:3:1\n data_store: 0\n";
100 /// let cfg = Config::parse_str(yaml).unwrap();
101 /// let b = ServerBuilder::from_config(&cfg);
102 /// assert_eq!(b.pool_name(), "p");
103 /// ```
104 pub fn from_config(cfg: &Config) -> Self {
105 let pool_name = cfg.pool_name().to_string();
106 Self {
107 pool_name,
108 pool: cfg.pool().clone(),
109 hooks: ServerHooks::default(),
110 vector_registry: None,
111 }
112 }
113
114 /// Build by parsing a YAML file.
115 ///
116 /// # Examples
117 ///
118 /// ```
119 /// use std::io::Write;
120 /// use dynomite::embed::ServerBuilder;
121 /// let mut f = tempfile::NamedTempFile::new().unwrap();
122 /// writeln!(f, "p:\n listen: 127.0.0.1:1\n dyn_listen: 127.0.0.1:2\n tokens: '1'\n servers:\n - 127.0.0.1:3:1\n data_store: 0").unwrap();
123 /// let b = ServerBuilder::from_yaml_file(f.path()).unwrap();
124 /// assert_eq!(b.pool_name(), "p");
125 /// ```
126 pub fn from_yaml_file(path: impl AsRef<Path>) -> Result<Self, EmbedError> {
127 let cfg = Config::parse_file(path.as_ref())?;
128 Ok(Self::from_config(&cfg))
129 }
130
131 /// Currently-configured pool name.
132 #[must_use]
133 pub fn pool_name(&self) -> &str {
134 &self.pool_name
135 }
136
137 /// Override the pool name supplied to [`Self::new`].
138 ///
139 /// The C reference accepts only one pool per `Config`, so the
140 /// pool name is part of the constructor for ergonomics. This
141 /// setter exists so callers that build a [`ServerBuilder`]
142 /// from a default-named template (`"dyn_o_mite"`) can rename
143 /// the pool without rebuilding the chain. Recorded as
144 /// Deviation in `docs/parity.md` (the design page sketches
145 /// a no-arg `Server::builder()`).
146 #[must_use]
147 pub fn with_pool_name(mut self, name: impl Into<String>) -> Self {
148 self.pool_name = name.into();
149 self
150 }
151
152 // ---- YAML-mirroring setters ------------------------------------------
153
154 /// `listen:` - client-facing listener address.
155 ///
156 /// Accepts port `0` (kernel-assigned ephemeral port). The
157 /// post-bind address is reported via
158 /// [`ServerHandle::listen_addr`](crate::embed::ServerHandle::listen_addr).
159 #[must_use]
160 pub fn listen(mut self, addr: SocketAddr) -> Self {
161 self.pool.listen = Some(ConfListen::from_socket_addr(addr));
162 self
163 }
164
165 /// `dyn_listen:` - peer-facing listener address.
166 ///
167 /// Accepts port `0` (kernel-assigned ephemeral port). The
168 /// post-bind address is reported via
169 /// [`ServerHandle::dyn_listen_addr`](crate::embed::ServerHandle::dyn_listen_addr).
170 #[must_use]
171 pub fn dyn_listen(mut self, addr: SocketAddr) -> Self {
172 self.pool.dyn_listen = Some(ConfListen::from_socket_addr(addr));
173 self
174 }
175
176 /// `stats_listen:` - HTTP stats listener address.
177 ///
178 /// Accepts port `0` for ephemeral-port semantics. Note that
179 /// the embedded server does not currently bind the stats
180 /// listener; the `dynomited` binary does. Embedders that want
181 /// a Prometheus-style scrape endpoint should plug a
182 /// [`MetricsSink`](crate::embed::MetricsSink) implementation
183 /// instead.
184 #[must_use]
185 pub fn stats_listen(mut self, addr: SocketAddr) -> Self {
186 self.pool.stats_listen = Some(ConfListen::from_socket_addr(addr));
187 self
188 }
189
190 /// `hash:` - hash algorithm.
191 #[must_use]
192 pub fn hash(mut self, h: HashType) -> Self {
193 self.pool.hash = Some(h);
194 self
195 }
196
197 /// `data_store:` - protocol selector.
198 #[must_use]
199 pub fn data_store(mut self, d: DataStore) -> Self {
200 self.pool.data_store = Some(d.as_int());
201 self
202 }
203
204 /// `read_consistency:` - quorum policy for reads.
205 #[must_use]
206 pub fn read_consistency(mut self, c: ConsistencyLevel) -> Self {
207 self.pool.read_consistency = Some(c.as_str().to_string());
208 self
209 }
210
211 /// `write_consistency:` - quorum policy for writes.
212 #[must_use]
213 pub fn write_consistency(mut self, c: ConsistencyLevel) -> Self {
214 self.pool.write_consistency = Some(c.as_str().to_string());
215 self
216 }
217
218 /// `secure_server_option:` - inter-node TLS mode.
219 #[must_use]
220 pub fn secure_server_option(mut self, opt: SecureServerOption) -> Self {
221 self.pool.secure_server_option = Some(opt.as_str().to_string());
222 self
223 }
224
225 /// `pem_key_file:` - path to the PEM private key.
226 #[must_use]
227 pub fn pem_key_file(mut self, path: impl AsRef<Path>) -> Self {
228 self.pool.pem_key_file = Some(path.as_ref().to_string_lossy().into_owned());
229 self
230 }
231
232 /// `servers:` - the (single-element) datastore list.
233 #[must_use]
234 pub fn servers(mut self, servers: Vec<ConfServer>) -> Self {
235 self.pool.servers = Some(Servers::from_vec(servers));
236 self
237 }
238
239 /// `dyn_seeds:` - peer dynomite nodes.
240 #[must_use]
241 pub fn dyn_seeds(mut self, seeds: Vec<ConfDynSeed>) -> Self {
242 self.pool.dyn_seeds = Some(seeds);
243 self
244 }
245
246 /// `dyn_seed_provider:` - seeds backend selector.
247 #[must_use]
248 pub fn dyn_seed_provider(mut self, name: impl Into<String>) -> Self {
249 self.pool.dyn_seed_provider = Some(name.into());
250 self
251 }
252
253 /// `timeout:` - request timeout.
254 #[must_use]
255 pub fn timeout(mut self, d: Duration) -> Self {
256 let ms = i64::try_from(d.as_millis()).unwrap_or(i64::MAX);
257 self.pool.timeout = Some(ms);
258 self
259 }
260
261 /// `auto_eject_hosts:` - automatically eject failing peers.
262 #[must_use]
263 pub fn auto_eject_hosts(mut self, on: bool) -> Self {
264 self.pool.auto_eject_hosts = Some(on);
265 self
266 }
267
268 /// `server_failure_limit:` - consecutive failures before eject.
269 #[must_use]
270 pub fn server_failure_limit(mut self, n: u32) -> Self {
271 self.pool.server_failure_limit = Some(i64::from(n));
272 self
273 }
274
275 /// `server_retry_timeout:` - retry interval for ejected servers.
276 #[must_use]
277 pub fn server_retry_timeout(mut self, d: Duration) -> Self {
278 let ms = i64::try_from(d.as_millis()).unwrap_or(i64::MAX);
279 self.pool.server_retry_timeout = Some(ms);
280 self
281 }
282
283 /// `gos_interval:` - gossip period.
284 #[must_use]
285 pub fn gossip_interval(mut self, d: Duration) -> Self {
286 let ms = i64::try_from(d.as_millis()).unwrap_or(i64::MAX);
287 self.pool.gos_interval = Some(ms);
288 self
289 }
290
291 /// `enable_gossip:` - turn the gossip task on or off.
292 #[must_use]
293 pub fn enable_gossip(mut self, on: bool) -> Self {
294 self.pool.enable_gossip = Some(on);
295 self
296 }
297
298 /// `datacenter:` - this node's datacenter.
299 #[must_use]
300 pub fn datacenter(mut self, dc: impl Into<String>) -> Self {
301 self.pool.datacenter = Some(dc.into());
302 self
303 }
304
305 /// `rack:` - this node's rack.
306 #[must_use]
307 pub fn rack(mut self, rack: impl Into<String>) -> Self {
308 self.pool.rack = Some(rack.into());
309 self
310 }
311
312 /// `tokens:` - this node's tokens, parsed from the YAML
313 /// representation.
314 ///
315 /// Returns the builder unchanged when the tokens string fails
316 /// to parse so callers can chain. Parse failures emit a
317 /// `tracing::warn!` event so a typo in the token string is
318 /// observable in logs even though it does not break the
319 /// chain. Use [`ServerBuilder::tokens`] to set a pre-parsed
320 /// [`TokenList`] when you want a hard error on bad input.
321 ///
322 /// The silent-ignore-with-log behaviour is recorded as a
323 /// SemVer-major candidate in `docs/parity.md`; v0.2 may
324 /// switch to a `Result`-returning variant.
325 #[must_use]
326 pub fn tokens_str(mut self, raw: impl AsRef<str>) -> Self {
327 let raw = raw.as_ref();
328 match TokenList::parse(raw) {
329 Ok(t) => {
330 self.pool.tokens = Some(t);
331 }
332 Err(e) => {
333 tracing::warn!(
334 raw = %raw,
335 error = %e,
336 "ServerBuilder::tokens_str: parse failed; leaving tokens unchanged. \
337 Use ServerBuilder::tokens(TokenList) for a hard error on bad input."
338 );
339 }
340 }
341 self
342 }
343
344 /// `tokens:` - this node's tokens.
345 #[must_use]
346 pub fn tokens(mut self, tokens: TokenList) -> Self {
347 self.pool.tokens = Some(tokens);
348 self
349 }
350
351 /// `mbuf_size:` - mbuf chunk size in bytes.
352 #[must_use]
353 pub fn mbuf_size(mut self, n: usize) -> Self {
354 self.pool.mbuf_size = Some(i64::try_from(n).unwrap_or(i64::MAX));
355 self
356 }
357
358 /// `max_msgs:` - maximum allocated messages.
359 #[must_use]
360 pub fn max_msgs(mut self, n: usize) -> Self {
361 self.pool.max_msgs = Some(i64::try_from(n).unwrap_or(i64::MAX));
362 self
363 }
364
365 /// `read_repairs_enabled:` - enable read-repair on quorum
366 /// mismatch.
367 #[must_use]
368 pub fn read_repairs_enabled(mut self, on: bool) -> Self {
369 self.pool.read_repairs_enabled = Some(on);
370 self
371 }
372
373 /// `client_connections:` - client connection cap.
374 #[must_use]
375 pub fn client_connections(mut self, n: u32) -> Self {
376 self.pool.client_connections = Some(i64::from(n));
377 self
378 }
379
380 /// `datastore_connections:` - count of datastore-side
381 /// connections.
382 #[must_use]
383 pub fn datastore_connections(mut self, n: u8) -> Self {
384 self.pool.datastore_connections = Some(n);
385 self
386 }
387
388 /// `local_peer_connections:` - count of local-DC peer
389 /// connections.
390 #[must_use]
391 pub fn local_peer_connections(mut self, n: u8) -> Self {
392 self.pool.local_peer_connections = Some(n);
393 self
394 }
395
396 /// `remote_peer_connections:` - count of remote-DC peer
397 /// connections.
398 #[must_use]
399 pub fn remote_peer_connections(mut self, n: u8) -> Self {
400 self.pool.remote_peer_connections = Some(n);
401 self
402 }
403
404 /// `preconnect:` - eagerly establish connections at startup.
405 #[must_use]
406 pub fn preconnect(mut self, on: bool) -> Self {
407 self.pool.preconnect = Some(on);
408 self
409 }
410
411 /// `stats_interval:` - stats aggregation period.
412 #[must_use]
413 pub fn stats_interval(mut self, d: Duration) -> Self {
414 let ms = i64::try_from(d.as_millis()).unwrap_or(i64::MAX);
415 self.pool.stats_interval = Some(ms);
416 self
417 }
418
419 // ---- Hook setters ---------------------------------------------------
420
421 /// Plug a custom [`Datastore`].
422 #[must_use]
423 pub fn datastore(mut self, ds: Box<dyn Datastore>) -> Self {
424 self.hooks.datastore = Some(ds);
425 self
426 }
427
428 /// Plug a custom [`SeedsProvider`].
429 #[must_use]
430 pub fn seeds_provider(mut self, sp: Box<dyn SeedsProvider>) -> Self {
431 self.hooks.seeds = Some(sp);
432 self
433 }
434
435 /// Plug a custom [`CryptoProvider`].
436 #[must_use]
437 pub fn crypto_provider(mut self, cp: Box<dyn CryptoProvider>) -> Self {
438 self.hooks.crypto = Some(cp);
439 self
440 }
441
442 /// Plug a custom [`MetricsSink`].
443 #[must_use]
444 pub fn metrics_sink(mut self, ms: Box<dyn MetricsSink>) -> Self {
445 self.hooks.metrics = Some(ms);
446 self
447 }
448
449 /// Plug a shared [`crate::vector::registry::VectorRegistry`].
450 ///
451 /// Without this setter the builder installs a fresh,
452 /// per-server registry on `build()`. Embedders that want to
453 /// drive multiple servers against a single shared catalog of
454 /// indexes (mirroring, fan-out testing, ...) can pass an
455 /// `Arc` they constructed elsewhere; the same registry will
456 /// then be reachable through
457 /// [`crate::embed::ServerHandle::vector_registry`] on every
458 /// server built with this builder.
459 ///
460 /// # Examples
461 ///
462 /// ```
463 /// use std::sync::Arc;
464 /// use dynomite::embed::ServerBuilder;
465 /// use dynomite::vector::registry::VectorRegistry;
466 /// let registry = Arc::new(VectorRegistry::new());
467 /// let _b = ServerBuilder::new("p").with_vector_registry(registry);
468 /// ```
469 #[must_use]
470 pub fn with_vector_registry(
471 mut self,
472 registry: std::sync::Arc<crate::vector::registry::VectorRegistry>,
473 ) -> Self {
474 self.vector_registry = Some(registry);
475 self
476 }
477
478 /// Borrow the configured [`crate::vector::registry::VectorRegistry`],
479 /// if one was supplied via
480 /// [`Self::with_vector_registry`]. Returns `None` when the
481 /// default-allocated registry will be installed by
482 /// [`Self::build`].
483 #[must_use]
484 pub fn vector_registry(
485 &self,
486 ) -> Option<&std::sync::Arc<crate::vector::registry::VectorRegistry>> {
487 self.vector_registry.as_ref()
488 }
489
490 // `conf_pool_mut` removed: the escape hatch leaked the
491 // entire internal `ConfPool` shape onto the public API.
492 // Targeted setters land as the missing fields are
493 // identified.
494
495 /// Build the [`Server`].
496 ///
497 /// Applies defaults to any unset field, runs the validation
498 /// pass, fills in any missing hooks with their in-crate
499 /// defaults, and constructs the runtime data structures
500 /// (cluster pool, dispatcher, stats, event bus). The returned
501 /// `Server` is not running yet; call
502 /// [`Server::start`](crate::embed::Server::start) to spawn
503 /// background tasks.
504 pub fn build(mut self) -> Result<Server, EmbedError> {
505 if self.pool.servers.is_none() {
506 // Provide a sentinel so validation passes for the
507 // common in-process case where the embedder plugs a
508 // custom Datastore.
509 self.pool.servers = Some(Servers::from_vec(vec![ConfServer::parse(
510 "127.0.0.1:1:1 stub",
511 )
512 .map_err(EmbedError::Conf)?]));
513 }
514 if self.pool.tokens.is_none() {
515 self.pool.tokens = Some(TokenList::parse("0").map_err(EmbedError::Conf)?);
516 }
517 // Apply defaults + validate directly on the pool.
518 let mut finalized = self.pool.clone();
519 finalized.apply_defaults();
520 finalized.validate(&self.pool_name)?;
521
522 let datastore = self
523 .hooks
524 .datastore
525 .unwrap_or_else(|| Box::new(MemoryDatastore::new()));
526 let seeds = self.hooks.seeds.unwrap_or_else(|| {
527 let raw = finalized
528 .dyn_seeds
529 .as_deref()
530 .map(<[_]>::to_vec)
531 .unwrap_or_default();
532 Box::new(SimpleSeedsProvider::new(raw))
533 });
534 let crypto = self.hooks.crypto;
535 let metrics = self.hooks.metrics;
536 let vector_registry = self
537 .vector_registry
538 .unwrap_or_else(|| std::sync::Arc::new(crate::vector::registry::VectorRegistry::new()));
539
540 Ok(Server::from_pool(
541 self.pool_name,
542 finalized,
543 ServerHooks {
544 datastore: Some(datastore),
545 seeds: Some(seeds),
546 crypto,
547 metrics,
548 },
549 vector_registry,
550 ))
551 }
552}
553
554// `socket_to_listen` was the YAML-validating helper for the
555// `listen:` / `dyn_listen:` / `stats_listen:` setters. The
556// builder now calls `ConfListen::from_socket_addr` directly so
557// that port `0` (kernel-assigned ephemeral) round-trips
558// unchanged through the embed API.
559
560// (synthetic_config helper removed; ConfPool drives validation directly.)
561
562/// Internal convenience: container for hook overrides.
563impl Default for ServerHooks {
564 fn default() -> Self {
565 Self {
566 datastore: None,
567 seeds: None,
568 crypto: None,
569 metrics: None,
570 }
571 }
572}
573
574// `SharedDatastore` removed: it was a `#[doc(hidden)]` `pub`
575// alias with no in-tree consumer. Embedders that need a shared
576// handle build their own `Arc<dyn Datastore>` from a
577// `Box<dyn Datastore>` they construct.