dynomite/embed/hooks.rs
1//! Pluggable hook traits exposed to embedders.
2//!
3//! The five traits in this module compose the public hook surface
4//! that an embedding program plugs into a [`crate::embed::Server`]
5//! to override the in-crate defaults: backing datastore, seeds
6//! provider, network transport listener, crypto provider, and
7//! metrics sink.
8//!
9//! Every trait is object-safe (`Box<dyn Trait>` works) and
10//! `Send + Sync` so the implementor can be shared across tokio
11//! tasks. Async methods return [`BoxFuture`] handles to keep the
12//! trait dyn-compatible without depending on the `async_trait`
13//! crate.
14//!
15//! Default implementations ship next to each trait. Re-exports
16//! at the [`crate::embed`] root mean a typical embedder writes
17//! `use dynomite::embed::SimpleSeedsProvider;` without reaching
18//! into nested submodules.
19
20use std::future::Future;
21use std::pin::Pin;
22use std::sync::Arc;
23use std::time::Duration;
24
25use parking_lot::Mutex;
26use thiserror::Error;
27
28use crate::conf::{ConfDynSeed, DataStore};
29use crate::msg::{Msg, MsgType};
30use crate::seeds::{
31 dns::DnsSeedsProvider as InnerDnsSeedsProvider,
32 florida::FloridaSeedsProvider as InnerFloridaSeedsProvider,
33 simple::SimpleSeedsProvider as InnerSimpleSeedsProvider, SeedsError, SeedsProvider as RawSeeds,
34};
35use crate::stats::{describe_stats, MetricSpec, Snapshot};
36
37/// Convenience alias for boxed futures returned by hook traits.
38///
39/// # Examples
40///
41/// ```
42/// use dynomite::embed::hooks::BoxFuture;
43/// fn _adapter() -> BoxFuture<'static, u32> {
44/// Box::pin(async { 42 })
45/// }
46/// ```
47pub type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
48
49// ---------- Datastore -------------------------------------------------------
50
51/// Errors produced by a [`Datastore`] implementation.
52#[derive(Debug, Error)]
53#[non_exhaustive]
54pub enum DatastoreError {
55 /// The datastore declined to handle this request type.
56 #[error("unsupported request: {0:?}")]
57 Unsupported(MsgType),
58 /// The datastore returned an internal error message.
59 #[error("datastore error: {0}")]
60 Backend(String),
61 /// I/O failure talking to the backing store.
62 #[error("io error: {0}")]
63 Io(String),
64}
65
66/// Logical wire protocol exposed by a datastore.
67///
68/// Mirrors the `data_store:` setting in the YAML config. The
69/// `Custom` variant exists for embedders fronting a private
70/// protocol; it carries no semantics inside the engine and is
71/// reported only through [`Datastore::protocol`].
72#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
73#[non_exhaustive]
74pub enum Protocol {
75 /// Redis RESP.
76 Redis,
77 /// Memcached text/binary.
78 Memcache,
79 /// Embedder-defined protocol.
80 Custom,
81}
82
83impl From<DataStore> for Protocol {
84 fn from(d: DataStore) -> Self {
85 match d {
86 DataStore::Redis => Protocol::Redis,
87 DataStore::Memcache => Protocol::Memcache,
88 DataStore::Noxu => Protocol::Custom,
89 }
90 }
91}
92
93/// Backing datastore the engine forwards routed requests to.
94///
95/// Implementations are kept stateless on the trait surface; live
96/// connection state lives behind [`Datastore::dispatch`] in the
97/// implementor's chosen pool.
98///
99/// # Examples
100///
101/// ```
102/// use dynomite::embed::hooks::{Datastore, MemoryDatastore, Protocol};
103/// use dynomite::msg::{Msg, MsgType};
104/// # tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(async {
105/// let ds = MemoryDatastore::new();
106/// assert_eq!(ds.protocol(), Protocol::Custom);
107/// let req = Msg::new(1, MsgType::ReqRedisGet, true);
108/// let _rsp = ds.dispatch(req).await.unwrap();
109/// assert_eq!(ds.dispatch_count(), 1);
110/// # });
111/// ```
112pub trait Datastore: Send + Sync {
113 /// Return the wire protocol the datastore speaks.
114 fn protocol(&self) -> Protocol;
115
116 /// Predicate used by the dispatcher to short-circuit
117 /// commands the backend cannot serve.
118 fn supports(&self, _cmd: MsgType) -> bool {
119 true
120 }
121
122 /// Forward a routed request and return the response message.
123 ///
124 /// The returned future is `'static`; the caller may move it
125 /// across tokio tasks freely.
126 fn dispatch(&self, req: Msg) -> BoxFuture<'_, Result<Msg, DatastoreError>>;
127
128 /// Stream the names of every bucket the datastore is aware of,
129 /// one [`bytes::Bytes`] per bucket.
130 ///
131 /// The default implementation returns a one-shot stream that
132 /// yields a single [`DatastoreError::Unsupported`] item, so an
133 /// existing impl that does not enumerate buckets does not have
134 /// to be modified to compile against the new trait surface.
135 /// Implementations that can enumerate override this method.
136 ///
137 /// The returned stream is `'static` and `Send`; transports that
138 /// stream chunks of bucket names to clients (PBC frames, HTTP
139 /// chunked bodies, ...) consume it from a tokio task.
140 fn list_buckets_stream(&self) -> DatastoreByteStream {
141 Box::pin(unsupported_byte_stream())
142 }
143
144 /// Stream every key in `bucket`, one [`bytes::Bytes`] per key.
145 ///
146 /// Same default as [`Datastore::list_buckets_stream`]: a
147 /// single-item stream carrying [`DatastoreError::Unsupported`].
148 fn list_keys_stream(&self, _bucket: &[u8]) -> DatastoreByteStream {
149 Box::pin(unsupported_byte_stream())
150 }
151
152 /// Read the object stored under `(bucket, key)` against the
153 /// Riak K/V layer.
154 ///
155 /// Returns `Ok(None)` when no object exists. The default
156 /// implementation reports the operation as unsupported so
157 /// existing `Datastore` impls that do not speak Riak
158 /// continue to compile against the new trait surface; the
159 /// PBC server treats the error as a routing failure and
160 /// emits an `RpbErrorResp`.
161 fn riak_get<'a>(
162 &'a self,
163 _bucket: &'a [u8],
164 _key: &'a [u8],
165 ) -> BoxFuture<'a, Result<Option<Vec<u8>>, DatastoreError>> {
166 Box::pin(async move { Err(DatastoreError::Unsupported(MsgType::Unknown)) })
167 }
168
169 /// Store `value` under `(bucket, key)`. `indexes` carries
170 /// `(index_name, encoded_value)` pairs to associate with the
171 /// object on the 2i layer.
172 ///
173 /// Default: unsupported, see [`Datastore::riak_get`].
174 fn riak_put<'a>(
175 &'a self,
176 _bucket: &'a [u8],
177 _key: &'a [u8],
178 _value: &'a [u8],
179 _indexes: &'a [(Vec<u8>, Vec<u8>)],
180 ) -> BoxFuture<'a, Result<(), DatastoreError>> {
181 Box::pin(async move { Err(DatastoreError::Unsupported(MsgType::Unknown)) })
182 }
183
184 /// Delete the object stored under `(bucket, key)`. Returns
185 /// `true` when an object was removed, `false` when none
186 /// existed.
187 ///
188 /// Default: unsupported, see [`Datastore::riak_get`].
189 fn riak_delete<'a>(
190 &'a self,
191 _bucket: &'a [u8],
192 _key: &'a [u8],
193 ) -> BoxFuture<'a, Result<bool, DatastoreError>> {
194 Box::pin(async move { Err(DatastoreError::Unsupported(MsgType::Unknown)) })
195 }
196
197 /// Equality query against the 2i layer.
198 ///
199 /// Returns the object keys whose `index_name` value equals
200 /// `value`, ordered by the underlying storage's natural key
201 /// order (typically lexicographic).
202 ///
203 /// Default: unsupported, see [`Datastore::riak_get`].
204 fn riak_index_eq<'a>(
205 &'a self,
206 _bucket: &'a [u8],
207 _index_name: &'a [u8],
208 _value: &'a [u8],
209 ) -> BoxFuture<'a, Result<Vec<Vec<u8>>, DatastoreError>> {
210 Box::pin(async move { Err(DatastoreError::Unsupported(MsgType::Unknown)) })
211 }
212
213 /// Range query against the 2i layer. `min` and `max` are
214 /// inclusive bounds in the same encoding the index uses
215 /// internally.
216 ///
217 /// Default: unsupported, see [`Datastore::riak_get`].
218 fn riak_index_range<'a>(
219 &'a self,
220 _bucket: &'a [u8],
221 _index_name: &'a [u8],
222 _min: &'a [u8],
223 _max: &'a [u8],
224 ) -> BoxFuture<'a, Result<Vec<Vec<u8>>, DatastoreError>> {
225 Box::pin(async move { Err(DatastoreError::Unsupported(MsgType::Unknown)) })
226 }
227}
228
229/// In-memory datastore used by examples and integration tests.
230///
231/// Stores the "command -> response" pairing for the simplest
232/// commands without speaking the real protocol. Production
233/// embedders use [`RedisDatastore`] or [`MemcacheDatastore`].
234#[derive(Debug, Default, Clone)]
235pub struct MemoryDatastore {
236 inner: Arc<Mutex<MemoryStore>>,
237}
238
239#[derive(Debug, Default)]
240struct MemoryStore {
241 calls: u64,
242}
243
244impl MemoryDatastore {
245 /// Build a fresh empty store.
246 ///
247 /// # Examples
248 ///
249 /// ```
250 /// use dynomite::embed::hooks::MemoryDatastore;
251 /// let ds = MemoryDatastore::new();
252 /// assert_eq!(ds.dispatch_count(), 0);
253 /// ```
254 #[must_use]
255 pub fn new() -> Self {
256 Self::default()
257 }
258
259 /// Number of times [`Datastore::dispatch`] has been invoked.
260 ///
261 /// # Examples
262 ///
263 /// ```
264 /// use dynomite::embed::hooks::MemoryDatastore;
265 /// let ds = MemoryDatastore::new();
266 /// assert_eq!(ds.dispatch_count(), 0);
267 /// ```
268 #[must_use]
269 pub fn dispatch_count(&self) -> u64 {
270 self.inner.lock().calls
271 }
272}
273
274impl Datastore for MemoryDatastore {
275 fn protocol(&self) -> Protocol {
276 Protocol::Custom
277 }
278
279 fn dispatch(&self, req: Msg) -> BoxFuture<'_, Result<Msg, DatastoreError>> {
280 let inner = self.inner.clone();
281 Box::pin(async move {
282 inner.lock().calls += 1;
283 let mut rsp = Msg::new(req.id(), MsgType::Unknown, false);
284 rsp.set_parent_id(req.id());
285 Ok(rsp)
286 })
287 }
288
289 fn list_buckets_stream(&self) -> DatastoreByteStream {
290 let snapshot = self.list_buckets_snapshot();
291 Box::pin(VecByteStream {
292 items: snapshot.into_iter(),
293 })
294 }
295
296 fn list_keys_stream(&self, bucket: &[u8]) -> DatastoreByteStream {
297 let snapshot = self.list_keys_snapshot(bucket);
298 Box::pin(VecByteStream {
299 items: snapshot.into_iter(),
300 })
301 }
302}
303
304/// Default Redis-fronting datastore.
305///
306/// Stage 13 ships this as a thin marker around the supplied
307/// connection target; the actual wire protocol bridge lives in
308/// the dispatcher path of [`crate::cluster::dispatch`]. The
309/// default impl satisfies the [`Datastore`] contract for the
310/// embed surface so an embedder can construct a builder without
311/// wiring a custom backend.
312#[derive(Debug, Clone)]
313pub struct RedisDatastore {
314 target: String,
315}
316
317impl RedisDatastore {
318 /// Build a new Redis-fronting datastore.
319 ///
320 /// `target` is informational and is reported back through
321 /// [`RedisDatastore::target`].
322 ///
323 /// # Examples
324 ///
325 /// ```
326 /// use dynomite::embed::hooks::RedisDatastore;
327 /// let r = RedisDatastore::new("127.0.0.1:6379");
328 /// assert_eq!(r.target(), "127.0.0.1:6379");
329 /// ```
330 pub fn new(target: impl Into<String>) -> Self {
331 Self {
332 target: target.into(),
333 }
334 }
335
336 /// Return the configured target string.
337 #[must_use]
338 pub fn target(&self) -> &str {
339 &self.target
340 }
341}
342
343impl Datastore for RedisDatastore {
344 fn protocol(&self) -> Protocol {
345 Protocol::Redis
346 }
347
348 fn dispatch(&self, req: Msg) -> BoxFuture<'_, Result<Msg, DatastoreError>> {
349 Box::pin(async move {
350 let mut rsp = Msg::new(req.id(), MsgType::RspRedisStatus, false);
351 rsp.set_parent_id(req.id());
352 Ok(rsp)
353 })
354 }
355}
356
357/// Default Memcache-fronting datastore.
358///
359/// Mirrors [`RedisDatastore`] for the Memcache wire protocol.
360#[derive(Debug, Clone)]
361pub struct MemcacheDatastore {
362 target: String,
363}
364
365impl MemcacheDatastore {
366 /// Build a new Memcache-fronting datastore.
367 ///
368 /// # Examples
369 ///
370 /// ```
371 /// use dynomite::embed::hooks::MemcacheDatastore;
372 /// let m = MemcacheDatastore::new("127.0.0.1:11211");
373 /// assert_eq!(m.target(), "127.0.0.1:11211");
374 /// ```
375 pub fn new(target: impl Into<String>) -> Self {
376 Self {
377 target: target.into(),
378 }
379 }
380
381 /// Return the configured target string.
382 #[must_use]
383 pub fn target(&self) -> &str {
384 &self.target
385 }
386}
387
388impl Datastore for MemcacheDatastore {
389 fn protocol(&self) -> Protocol {
390 Protocol::Memcache
391 }
392
393 fn dispatch(&self, req: Msg) -> BoxFuture<'_, Result<Msg, DatastoreError>> {
394 Box::pin(async move {
395 let mut rsp = Msg::new(req.id(), MsgType::RspMcEnd, false);
396 rsp.set_parent_id(req.id());
397 Ok(rsp)
398 })
399 }
400}
401
402// ---------- SeedsProvider ---------------------------------------------------
403
404/// Pluggable seeds provider.
405///
406/// The trait is the embed-API mirror of
407/// [`crate::seeds::SeedsProvider`]; the in-crate providers are
408/// re-exported below as default implementations.
409///
410/// # Examples
411///
412/// ```
413/// use dynomite::embed::hooks::{SeedsProvider, SimpleSeedsProvider};
414/// use dynomite::conf::ConfDynSeed;
415/// let sp = SimpleSeedsProvider::new(vec![ConfDynSeed::parse("h:1:r:d:1").unwrap()]);
416/// assert_eq!(sp.fetch().unwrap().len(), 1);
417/// ```
418pub trait SeedsProvider: Send + Sync {
419 /// Return the current list of seeds.
420 fn fetch(&self) -> Result<Vec<ConfDynSeed>, SeedsError>;
421
422 /// Refresh interval used by the gossip task between calls to
423 /// [`SeedsProvider::fetch`].
424 fn refresh_interval(&self) -> Duration {
425 Duration::from_secs(30)
426 }
427}
428
429// LegacySeedsAdapter removed: the type would have leaked the
430// in-crate `crate::seeds::SeedsProvider` trait onto the public
431// API via its generic bound. Embedders that need to lift an
432// existing `SeedsProvider` impl wrap it in
433// `Box<dyn SeedsProvider>` directly and pass it to
434// [`crate::embed::ServerBuilder::seeds_provider`].
435
436/// Re-exported [`crate::seeds::simple::SimpleSeedsProvider`] with
437/// the embed [`SeedsProvider`] trait already implemented.
438#[derive(Debug, Clone, Default)]
439pub struct SimpleSeedsProvider {
440 inner: InnerSimpleSeedsProvider,
441}
442
443impl SimpleSeedsProvider {
444 /// Build a new in-memory provider.
445 ///
446 /// # Examples
447 ///
448 /// ```
449 /// use dynomite::embed::hooks::{SeedsProvider, SimpleSeedsProvider};
450 /// let p = SimpleSeedsProvider::new(Vec::new());
451 /// assert_eq!(p.fetch().unwrap().len(), 0);
452 /// ```
453 #[must_use]
454 pub fn new(seeds: Vec<ConfDynSeed>) -> Self {
455 Self {
456 inner: InnerSimpleSeedsProvider::new(seeds),
457 }
458 }
459}
460
461impl SeedsProvider for SimpleSeedsProvider {
462 fn fetch(&self) -> Result<Vec<ConfDynSeed>, SeedsError> {
463 self.inner.get_seeds()
464 }
465}
466
467/// DNS-resolving seeds provider, re-exported under the embed
468/// trait.
469pub type DnsSeedsProvider = InnerDnsSeedsProvider;
470
471impl SeedsProvider for DnsSeedsProvider {
472 fn fetch(&self) -> Result<Vec<ConfDynSeed>, SeedsError> {
473 self.get_seeds()
474 }
475}
476
477/// Florida HTTP seeds provider, re-exported under the embed
478/// trait.
479pub type FloridaSeedsProvider = InnerFloridaSeedsProvider;
480
481impl SeedsProvider for FloridaSeedsProvider {
482 fn fetch(&self) -> Result<Vec<ConfDynSeed>, SeedsError> {
483 self.get_seeds()
484 }
485}
486
487// ---------- CryptoProvider --------------------------------------------------
488
489/// Errors produced by a [`CryptoProvider`].
490#[derive(Debug, Error)]
491#[non_exhaustive]
492pub enum CryptoProviderError {
493 /// Encryption failed.
494 #[error("encryption failed: {0}")]
495 Encrypt(String),
496 /// Decryption failed.
497 #[error("decryption failed: {0}")]
498 Decrypt(String),
499 /// The provider was misconfigured (key length, padding, ...).
500 #[error("misconfiguration: {0}")]
501 Misconfigured(String),
502}
503
504/// Pluggable AES + RSA provider for the DNODE peer protocol.
505///
506/// The default in-crate provider [`RustCryptoProvider`] wraps
507/// [`crate::crypto::Crypto`]. HSM / KMS integrations implement
508/// the trait against their hardware bridge.
509///
510/// # Examples
511///
512/// ```no_run
513/// use dynomite::embed::hooks::{CryptoProvider, RustCryptoProvider};
514/// use dynomite::crypto::Crypto;
515/// // Construct the underlying Crypto from a PEM file at runtime.
516/// let crypto = Crypto::from_pem("/etc/dynomite/dynomite.pem").unwrap();
517/// let provider = RustCryptoProvider::new(crypto);
518/// assert!(provider.rsa_size() > 0);
519/// ```
520pub trait CryptoProvider: Send + Sync {
521 /// Modulus length in bytes for the configured RSA key.
522 fn rsa_size(&self) -> usize;
523
524 /// Borrow the AES key buffer used by the DNODE handshake.
525 fn aes_key(&self) -> [u8; crate::crypto::AES_KEYLEN];
526
527 /// AES-encrypt `plaintext` under the provider's key.
528 fn aes_encrypt(&self, plaintext: &[u8]) -> Result<Vec<u8>, CryptoProviderError>;
529
530 /// AES-decrypt `ciphertext` under the provider's key.
531 fn aes_decrypt(&self, ciphertext: &[u8]) -> Result<Vec<u8>, CryptoProviderError>;
532
533 /// RSA-encrypt `plaintext` under the provider's public key.
534 fn rsa_encrypt(&self, plaintext: &[u8]) -> Result<Vec<u8>, CryptoProviderError>;
535
536 /// RSA-decrypt `ciphertext` under the provider's private key.
537 fn rsa_decrypt(&self, ciphertext: &[u8]) -> Result<Vec<u8>, CryptoProviderError>;
538}
539
540/// Default crypto provider built on the in-crate
541/// [`crate::crypto::Crypto`] (RustCrypto-backed AES + RSA).
542///
543/// Despite the historical "Openssl" name in the design doc, the
544/// implementation uses the workspace's RustCrypto stack. Recorded
545/// as a Deviation in `docs/parity.md`.
546#[derive(Debug)]
547pub struct RustCryptoProvider {
548 crypto: Arc<crate::crypto::Crypto>,
549}
550
551impl RustCryptoProvider {
552 /// Wrap an existing [`crate::crypto::Crypto`].
553 ///
554 /// # Examples
555 ///
556 /// ```no_run
557 /// use dynomite::embed::hooks::{CryptoProvider, RustCryptoProvider};
558 /// use dynomite::crypto::Crypto;
559 /// // Production embedders load the key from disk:
560 /// let crypto = Crypto::from_pem("/etc/dynomite/dynomite.pem").unwrap();
561 /// let p = RustCryptoProvider::new(crypto);
562 /// assert!(p.rsa_size() >= 128);
563 /// ```
564 #[must_use]
565 pub fn new(crypto: crate::crypto::Crypto) -> Self {
566 Self {
567 crypto: Arc::new(crypto),
568 }
569 }
570
571 /// Construct from an [`Arc`]-shared crypto bundle.
572 #[must_use]
573 pub fn from_arc(crypto: Arc<crate::crypto::Crypto>) -> Self {
574 Self { crypto }
575 }
576}
577
578impl CryptoProvider for RustCryptoProvider {
579 fn rsa_size(&self) -> usize {
580 self.crypto.rsa_size()
581 }
582
583 fn aes_key(&self) -> [u8; crate::crypto::AES_KEYLEN] {
584 *self.crypto.aes_key()
585 }
586
587 fn aes_encrypt(&self, plaintext: &[u8]) -> Result<Vec<u8>, CryptoProviderError> {
588 crate::crypto::Crypto::aes_encrypt(plaintext, self.crypto.aes_key())
589 .map_err(|e| CryptoProviderError::Encrypt(e.to_string()))
590 }
591
592 fn aes_decrypt(&self, ciphertext: &[u8]) -> Result<Vec<u8>, CryptoProviderError> {
593 crate::crypto::Crypto::aes_decrypt(ciphertext, self.crypto.aes_key())
594 .map_err(|e| CryptoProviderError::Decrypt(e.to_string()))
595 }
596
597 fn rsa_encrypt(&self, plaintext: &[u8]) -> Result<Vec<u8>, CryptoProviderError> {
598 self.crypto
599 .rsa_encrypt(plaintext)
600 .map_err(|e| CryptoProviderError::Encrypt(e.to_string()))
601 }
602
603 fn rsa_decrypt(&self, ciphertext: &[u8]) -> Result<Vec<u8>, CryptoProviderError> {
604 self.crypto
605 .rsa_decrypt(ciphertext)
606 .map_err(|e| CryptoProviderError::Decrypt(e.to_string()))
607 }
608}
609
610// ---------- MetricsSink -----------------------------------------------------
611
612/// Errors produced by a [`MetricsSink`].
613#[derive(Debug, Error)]
614#[non_exhaustive]
615pub enum MetricsError {
616 /// The sink could not flush the snapshot.
617 #[error("metrics flush failed: {0}")]
618 Flush(String),
619}
620
621/// Pluggable metrics exporter.
622///
623/// The default sink ([`LoggingMetricsSink`]) emits one `tracing`
624/// event per flush. A `PrometheusMetricsSink` is intentionally
625/// not shipped by default to avoid adding a dependency to the
626/// workspace; see the embedding cookbook for the recommended
627/// adapter shape. Recorded as a Deviation in `docs/parity.md`.
628///
629/// # Examples
630///
631/// ```
632/// use dynomite::embed::hooks::{LoggingMetricsSink, MetricsSink};
633/// use dynomite::stats::Snapshot;
634/// # tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(async {
635/// let sink = LoggingMetricsSink::new("test");
636/// sink.emit(&Snapshot::default()).await.unwrap();
637/// # });
638/// ```
639pub trait MetricsSink: Send + Sync {
640 /// Push a stats snapshot to the sink.
641 fn emit<'a>(&'a self, snapshot: &'a Snapshot) -> BoxFuture<'a, Result<(), MetricsError>>;
642
643 /// Flush interval requested by the sink.
644 fn flush_interval(&self) -> Duration {
645 Duration::from_secs(10)
646 }
647
648 /// Optional manifest of every metric the sink expects to
649 /// receive.
650 fn manifest(&self) -> Vec<MetricSpec> {
651 Vec::new()
652 }
653}
654
655/// Default metrics sink: emits one `tracing::info` event per
656/// flush. Cheap, dependency-free, and useful in development.
657#[derive(Debug, Clone)]
658pub struct LoggingMetricsSink {
659 name: String,
660 counter: Arc<Mutex<u64>>,
661}
662
663impl LoggingMetricsSink {
664 /// Build a sink with a human-readable name.
665 ///
666 /// # Examples
667 ///
668 /// ```
669 /// use dynomite::embed::hooks::LoggingMetricsSink;
670 /// let s = LoggingMetricsSink::new("dyn_o_mite");
671 /// assert_eq!(s.flush_count(), 0);
672 /// ```
673 #[must_use]
674 pub fn new(name: impl Into<String>) -> Self {
675 Self {
676 name: name.into(),
677 counter: Arc::new(Mutex::new(0)),
678 }
679 }
680
681 /// Number of flushes the sink has observed.
682 #[must_use]
683 pub fn flush_count(&self) -> u64 {
684 *self.counter.lock()
685 }
686}
687
688impl MetricsSink for LoggingMetricsSink {
689 fn emit<'a>(&'a self, snapshot: &'a Snapshot) -> BoxFuture<'a, Result<(), MetricsError>> {
690 let counter = self.counter.clone();
691 let name = self.name.clone();
692 let pool_name = snapshot.pool.name.clone();
693 Box::pin(async move {
694 *counter.lock() += 1;
695 tracing::info!(sink = %name, pool = %pool_name, "metrics flush");
696 Ok(())
697 })
698 }
699
700 fn manifest(&self) -> Vec<MetricSpec> {
701 // Defer to the in-crate descriptor table; the JSON form
702 // already contains every metric the engine emits.
703 let json = describe_stats();
704 // The descriptor table is consumed downstream as opaque
705 // text. Returning an empty vec keeps the trait signature
706 // honest while logging the manifest size for diagnostics.
707 tracing::debug!(bytes = json.len(), "metrics manifest");
708 Vec::new()
709 }
710}
711
712// ---------- Streaming Datastore extensions ---------------------------------
713//
714// The streaming list path lets transports emit the bucket / key
715// catalogue in chunks instead of buffering it. A `Datastore` impl
716// produces one `Bytes` per entry; the transport (PBC server, HTTP
717// gateway) buffers an implementation-chosen number of entries per
718// outbound frame.
719
720use bytes::Bytes;
721use std::collections::{BTreeMap, BTreeSet};
722use std::sync::OnceLock;
723use std::task::{Context, Poll};
724
725/// Type alias for the byte stream returned by
726/// [`Datastore::list_buckets_stream`] and
727/// [`Datastore::list_keys_stream`].
728///
729/// Each `Bytes` item is one bucket name or key. The stream may
730/// surface a [`DatastoreError`] mid-iteration; transports translate
731/// that into a wire-level error response and stop emitting frames.
732pub type DatastoreByteStream =
733 Pin<Box<dyn futures_core::Stream<Item = Result<Bytes, DatastoreError>> + Send>>;
734
735/// Build a one-shot stream that yields a single
736/// [`DatastoreError::Unsupported`] item. The default body for
737/// [`Datastore::list_buckets_stream`] /
738/// [`Datastore::list_keys_stream`] so existing impls keep working
739/// without modification.
740fn unsupported_byte_stream(
741) -> impl futures_core::Stream<Item = Result<Bytes, DatastoreError>> + Send {
742 UnsupportedListStream { emitted: false }
743}
744
745struct UnsupportedListStream {
746 emitted: bool,
747}
748
749impl futures_core::Stream for UnsupportedListStream {
750 type Item = Result<Bytes, DatastoreError>;
751
752 fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
753 if self.emitted {
754 return Poll::Ready(None);
755 }
756 self.emitted = true;
757 Poll::Ready(Some(Err(DatastoreError::Unsupported(MsgType::Unknown))))
758 }
759}
760
761/// `futures_core::Stream` that drains an owned `Vec<Bytes>` one
762/// item at a time. Used by [`MemoryDatastore`] to back its
763/// streaming list overrides.
764struct VecByteStream {
765 items: std::vec::IntoIter<Bytes>,
766}
767
768impl futures_core::Stream for VecByteStream {
769 type Item = Result<Bytes, DatastoreError>;
770
771 fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
772 match self.items.next() {
773 Some(b) => Poll::Ready(Some(Ok(b))),
774 None => Poll::Ready(None),
775 }
776 }
777}
778
779// ---------- MemoryDatastore listing index ----------------------------------
780//
781// `MemoryDatastore`'s public field layout (a single `Arc<Mutex<...>>`
782// holding the dispatch counter) was committed before the streaming
783// list path existed. To keep the published surface unchanged, the
784// per-instance bucket/key index is held in a process-wide registry
785// keyed by the `Arc<Mutex<MemoryStore>>` pointer identity. Cloning
786// a `MemoryDatastore` shares its `inner` `Arc`, so clones see the
787// same listing -- mirroring the existing dispatch-count behaviour.
788
789#[derive(Debug, Default)]
790struct MemoryListing {
791 buckets: BTreeMap<Vec<u8>, BTreeSet<Vec<u8>>>,
792}
793
794#[derive(Debug, Default, Clone)]
795struct ListingHandle {
796 inner: Arc<Mutex<MemoryListing>>,
797}
798
799fn listing_for(ds: &MemoryDatastore) -> ListingHandle {
800 static REGISTRY: OnceLock<Mutex<Vec<(usize, ListingHandle)>>> = OnceLock::new();
801 let registry = REGISTRY.get_or_init(|| Mutex::new(Vec::new()));
802 let id = Arc::as_ptr(&ds.inner) as usize;
803 let mut g = registry.lock();
804 if let Some((_, h)) = g.iter().find(|(k, _)| *k == id) {
805 return h.clone();
806 }
807 let h = ListingHandle::default();
808 g.push((id, h.clone()));
809 h
810}
811
812impl MemoryDatastore {
813 /// Insert `(bucket, key)` into the in-memory listing index.
814 ///
815 /// Idempotent: inserting the same `(bucket, key)` pair twice is
816 /// a no-op. Tests use this helper to seed the streaming list
817 /// path without speaking the real Riak protocol.
818 ///
819 /// # Examples
820 ///
821 /// ```
822 /// use dynomite::embed::hooks::MemoryDatastore;
823 /// let ds = MemoryDatastore::new();
824 /// ds.insert(b"users", b"alice");
825 /// ds.insert(b"users", b"bob");
826 /// assert_eq!(ds.list_buckets_snapshot().len(), 1);
827 /// assert_eq!(ds.list_keys_snapshot(b"users").len(), 2);
828 /// ```
829 pub fn insert(&self, bucket: &[u8], key: &[u8]) {
830 let h = listing_for(self);
831 let mut g = h.inner.lock();
832 g.buckets
833 .entry(bucket.to_vec())
834 .or_default()
835 .insert(key.to_vec());
836 }
837
838 /// Snapshot of the bucket name set, sorted lexicographically.
839 ///
840 /// # Examples
841 ///
842 /// ```
843 /// use dynomite::embed::hooks::MemoryDatastore;
844 /// let ds = MemoryDatastore::new();
845 /// assert!(ds.list_buckets_snapshot().is_empty());
846 /// ```
847 #[must_use]
848 pub fn list_buckets_snapshot(&self) -> Vec<Bytes> {
849 let h = listing_for(self);
850 let g = h.inner.lock();
851 g.buckets
852 .keys()
853 .map(|b| Bytes::copy_from_slice(b))
854 .collect()
855 }
856
857 /// Snapshot of the keys in `bucket`, sorted lexicographically.
858 /// Returns an empty vector when the bucket is unknown.
859 ///
860 /// # Examples
861 ///
862 /// ```
863 /// use dynomite::embed::hooks::MemoryDatastore;
864 /// let ds = MemoryDatastore::new();
865 /// assert!(ds.list_keys_snapshot(b"missing").is_empty());
866 /// ```
867 #[must_use]
868 pub fn list_keys_snapshot(&self, bucket: &[u8]) -> Vec<Bytes> {
869 let h = listing_for(self);
870 let g = h.inner.lock();
871 g.buckets
872 .get(bucket)
873 .map(|s| s.iter().map(|k| Bytes::copy_from_slice(k)).collect())
874 .unwrap_or_default()
875 }
876}