Skip to main content

dynomite/embed/
hooks.rs

1//! Pluggable hook traits exposed to embedders.
2//!
3//! The five traits in this module compose the public hook surface
4//! that an embedding program plugs into a [`crate::embed::Server`]
5//! to override the in-crate defaults: backing datastore, seeds
6//! provider, network transport listener, crypto provider, and
7//! metrics sink.
8//!
9//! Every trait is object-safe (`Box<dyn Trait>` works) and
10//! `Send + Sync` so the implementor can be shared across tokio
11//! tasks. Async methods return [`BoxFuture`] handles to keep the
12//! trait dyn-compatible without depending on the `async_trait`
13//! crate.
14//!
15//! Default implementations ship next to each trait. Re-exports
16//! at the [`crate::embed`] root mean a typical embedder writes
17//! `use dynomite::embed::SimpleSeedsProvider;` without reaching
18//! into nested submodules.
19
20use std::future::Future;
21use std::pin::Pin;
22use std::sync::Arc;
23use std::time::Duration;
24
25use parking_lot::Mutex;
26use thiserror::Error;
27
28use crate::conf::{ConfDynSeed, DataStore};
29use crate::msg::{Msg, MsgType};
30use crate::seeds::{
31    dns::DnsSeedsProvider as InnerDnsSeedsProvider,
32    florida::FloridaSeedsProvider as InnerFloridaSeedsProvider,
33    simple::SimpleSeedsProvider as InnerSimpleSeedsProvider, SeedsError, SeedsProvider as RawSeeds,
34};
35use crate::stats::{describe_stats, MetricSpec, Snapshot};
36
37/// Convenience alias for boxed futures returned by hook traits.
38///
39/// # Examples
40///
41/// ```
42/// use dynomite::embed::hooks::BoxFuture;
43/// fn _adapter() -> BoxFuture<'static, u32> {
44///     Box::pin(async { 42 })
45/// }
46/// ```
47pub type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
48
49// ---------- Datastore -------------------------------------------------------
50
51/// Errors produced by a [`Datastore`] implementation.
52#[derive(Debug, Error)]
53#[non_exhaustive]
54pub enum DatastoreError {
55    /// The datastore declined to handle this request type.
56    #[error("unsupported request: {0:?}")]
57    Unsupported(MsgType),
58    /// The datastore returned an internal error message.
59    #[error("datastore error: {0}")]
60    Backend(String),
61    /// I/O failure talking to the backing store.
62    #[error("io error: {0}")]
63    Io(String),
64}
65
66/// Logical wire protocol exposed by a datastore.
67///
68/// Mirrors the `data_store:` setting in the YAML config. The
69/// `Custom` variant exists for embedders fronting a private
70/// protocol; it carries no semantics inside the engine and is
71/// reported only through [`Datastore::protocol`].
72#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
73#[non_exhaustive]
74pub enum Protocol {
75    /// Redis RESP.
76    Redis,
77    /// Memcached text/binary.
78    Memcache,
79    /// Embedder-defined protocol.
80    Custom,
81}
82
83impl From<DataStore> for Protocol {
84    fn from(d: DataStore) -> Self {
85        match d {
86            DataStore::Redis => Protocol::Redis,
87            DataStore::Memcache => Protocol::Memcache,
88            DataStore::Noxu => Protocol::Custom,
89        }
90    }
91}
92
93/// Backing datastore the engine forwards routed requests to.
94///
95/// Implementations are kept stateless on the trait surface; live
96/// connection state lives behind [`Datastore::dispatch`] in the
97/// implementor's chosen pool.
98///
99/// # Examples
100///
101/// ```
102/// use dynomite::embed::hooks::{Datastore, MemoryDatastore, Protocol};
103/// use dynomite::msg::{Msg, MsgType};
104/// # tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(async {
105/// let ds = MemoryDatastore::new();
106/// assert_eq!(ds.protocol(), Protocol::Custom);
107/// let req = Msg::new(1, MsgType::ReqRedisGet, true);
108/// let _rsp = ds.dispatch(req).await.unwrap();
109/// assert_eq!(ds.dispatch_count(), 1);
110/// # });
111/// ```
112pub trait Datastore: Send + Sync {
113    /// Return the wire protocol the datastore speaks.
114    fn protocol(&self) -> Protocol;
115
116    /// Predicate used by the dispatcher to short-circuit
117    /// commands the backend cannot serve.
118    fn supports(&self, _cmd: MsgType) -> bool {
119        true
120    }
121
122    /// Forward a routed request and return the response message.
123    ///
124    /// The returned future is `'static`; the caller may move it
125    /// across tokio tasks freely.
126    fn dispatch(&self, req: Msg) -> BoxFuture<'_, Result<Msg, DatastoreError>>;
127
128    /// Stream the names of every bucket the datastore is aware of,
129    /// one [`bytes::Bytes`] per bucket.
130    ///
131    /// The default implementation returns a one-shot stream that
132    /// yields a single [`DatastoreError::Unsupported`] item, so an
133    /// existing impl that does not enumerate buckets does not have
134    /// to be modified to compile against the new trait surface.
135    /// Implementations that can enumerate override this method.
136    ///
137    /// The returned stream is `'static` and `Send`; transports that
138    /// stream chunks of bucket names to clients (PBC frames, HTTP
139    /// chunked bodies, ...) consume it from a tokio task.
140    fn list_buckets_stream(&self) -> DatastoreByteStream {
141        Box::pin(unsupported_byte_stream())
142    }
143
144    /// Stream every key in `bucket`, one [`bytes::Bytes`] per key.
145    ///
146    /// Same default as [`Datastore::list_buckets_stream`]: a
147    /// single-item stream carrying [`DatastoreError::Unsupported`].
148    fn list_keys_stream(&self, _bucket: &[u8]) -> DatastoreByteStream {
149        Box::pin(unsupported_byte_stream())
150    }
151
152    /// Read the object stored under `(bucket, key)` against the
153    /// Riak K/V layer.
154    ///
155    /// Returns `Ok(None)` when no object exists. The default
156    /// implementation reports the operation as unsupported so
157    /// existing `Datastore` impls that do not speak Riak
158    /// continue to compile against the new trait surface; the
159    /// PBC server treats the error as a routing failure and
160    /// emits an `RpbErrorResp`.
161    fn riak_get<'a>(
162        &'a self,
163        _bucket: &'a [u8],
164        _key: &'a [u8],
165    ) -> BoxFuture<'a, Result<Option<Vec<u8>>, DatastoreError>> {
166        Box::pin(async move { Err(DatastoreError::Unsupported(MsgType::Unknown)) })
167    }
168
169    /// Store `value` under `(bucket, key)`. `indexes` carries
170    /// `(index_name, encoded_value)` pairs to associate with the
171    /// object on the 2i layer.
172    ///
173    /// Default: unsupported, see [`Datastore::riak_get`].
174    fn riak_put<'a>(
175        &'a self,
176        _bucket: &'a [u8],
177        _key: &'a [u8],
178        _value: &'a [u8],
179        _indexes: &'a [(Vec<u8>, Vec<u8>)],
180    ) -> BoxFuture<'a, Result<(), DatastoreError>> {
181        Box::pin(async move { Err(DatastoreError::Unsupported(MsgType::Unknown)) })
182    }
183
184    /// Delete the object stored under `(bucket, key)`. Returns
185    /// `true` when an object was removed, `false` when none
186    /// existed.
187    ///
188    /// Default: unsupported, see [`Datastore::riak_get`].
189    fn riak_delete<'a>(
190        &'a self,
191        _bucket: &'a [u8],
192        _key: &'a [u8],
193    ) -> BoxFuture<'a, Result<bool, DatastoreError>> {
194        Box::pin(async move { Err(DatastoreError::Unsupported(MsgType::Unknown)) })
195    }
196
197    /// Equality query against the 2i layer.
198    ///
199    /// Returns the object keys whose `index_name` value equals
200    /// `value`, ordered by the underlying storage's natural key
201    /// order (typically lexicographic).
202    ///
203    /// Default: unsupported, see [`Datastore::riak_get`].
204    fn riak_index_eq<'a>(
205        &'a self,
206        _bucket: &'a [u8],
207        _index_name: &'a [u8],
208        _value: &'a [u8],
209    ) -> BoxFuture<'a, Result<Vec<Vec<u8>>, DatastoreError>> {
210        Box::pin(async move { Err(DatastoreError::Unsupported(MsgType::Unknown)) })
211    }
212
213    /// Range query against the 2i layer. `min` and `max` are
214    /// inclusive bounds in the same encoding the index uses
215    /// internally.
216    ///
217    /// Default: unsupported, see [`Datastore::riak_get`].
218    fn riak_index_range<'a>(
219        &'a self,
220        _bucket: &'a [u8],
221        _index_name: &'a [u8],
222        _min: &'a [u8],
223        _max: &'a [u8],
224    ) -> BoxFuture<'a, Result<Vec<Vec<u8>>, DatastoreError>> {
225        Box::pin(async move { Err(DatastoreError::Unsupported(MsgType::Unknown)) })
226    }
227}
228
229/// In-memory datastore used by examples and integration tests.
230///
231/// Stores the "command -> response" pairing for the simplest
232/// commands without speaking the real protocol. Production
233/// embedders use [`RedisDatastore`] or [`MemcacheDatastore`].
234#[derive(Debug, Default, Clone)]
235pub struct MemoryDatastore {
236    inner: Arc<Mutex<MemoryStore>>,
237}
238
239#[derive(Debug, Default)]
240struct MemoryStore {
241    calls: u64,
242}
243
244impl MemoryDatastore {
245    /// Build a fresh empty store.
246    ///
247    /// # Examples
248    ///
249    /// ```
250    /// use dynomite::embed::hooks::MemoryDatastore;
251    /// let ds = MemoryDatastore::new();
252    /// assert_eq!(ds.dispatch_count(), 0);
253    /// ```
254    #[must_use]
255    pub fn new() -> Self {
256        Self::default()
257    }
258
259    /// Number of times [`Datastore::dispatch`] has been invoked.
260    ///
261    /// # Examples
262    ///
263    /// ```
264    /// use dynomite::embed::hooks::MemoryDatastore;
265    /// let ds = MemoryDatastore::new();
266    /// assert_eq!(ds.dispatch_count(), 0);
267    /// ```
268    #[must_use]
269    pub fn dispatch_count(&self) -> u64 {
270        self.inner.lock().calls
271    }
272}
273
274impl Datastore for MemoryDatastore {
275    fn protocol(&self) -> Protocol {
276        Protocol::Custom
277    }
278
279    fn dispatch(&self, req: Msg) -> BoxFuture<'_, Result<Msg, DatastoreError>> {
280        let inner = self.inner.clone();
281        Box::pin(async move {
282            inner.lock().calls += 1;
283            let mut rsp = Msg::new(req.id(), MsgType::Unknown, false);
284            rsp.set_parent_id(req.id());
285            Ok(rsp)
286        })
287    }
288
289    fn list_buckets_stream(&self) -> DatastoreByteStream {
290        let snapshot = self.list_buckets_snapshot();
291        Box::pin(VecByteStream {
292            items: snapshot.into_iter(),
293        })
294    }
295
296    fn list_keys_stream(&self, bucket: &[u8]) -> DatastoreByteStream {
297        let snapshot = self.list_keys_snapshot(bucket);
298        Box::pin(VecByteStream {
299            items: snapshot.into_iter(),
300        })
301    }
302}
303
304/// Default Redis-fronting datastore.
305///
306/// Stage 13 ships this as a thin marker around the supplied
307/// connection target; the actual wire protocol bridge lives in
308/// the dispatcher path of [`crate::cluster::dispatch`]. The
309/// default impl satisfies the [`Datastore`] contract for the
310/// embed surface so an embedder can construct a builder without
311/// wiring a custom backend.
312#[derive(Debug, Clone)]
313pub struct RedisDatastore {
314    target: String,
315}
316
317impl RedisDatastore {
318    /// Build a new Redis-fronting datastore.
319    ///
320    /// `target` is informational and is reported back through
321    /// [`RedisDatastore::target`].
322    ///
323    /// # Examples
324    ///
325    /// ```
326    /// use dynomite::embed::hooks::RedisDatastore;
327    /// let r = RedisDatastore::new("127.0.0.1:6379");
328    /// assert_eq!(r.target(), "127.0.0.1:6379");
329    /// ```
330    pub fn new(target: impl Into<String>) -> Self {
331        Self {
332            target: target.into(),
333        }
334    }
335
336    /// Return the configured target string.
337    #[must_use]
338    pub fn target(&self) -> &str {
339        &self.target
340    }
341}
342
343impl Datastore for RedisDatastore {
344    fn protocol(&self) -> Protocol {
345        Protocol::Redis
346    }
347
348    fn dispatch(&self, req: Msg) -> BoxFuture<'_, Result<Msg, DatastoreError>> {
349        Box::pin(async move {
350            let mut rsp = Msg::new(req.id(), MsgType::RspRedisStatus, false);
351            rsp.set_parent_id(req.id());
352            Ok(rsp)
353        })
354    }
355}
356
357/// Default Memcache-fronting datastore.
358///
359/// Mirrors [`RedisDatastore`] for the Memcache wire protocol.
360#[derive(Debug, Clone)]
361pub struct MemcacheDatastore {
362    target: String,
363}
364
365impl MemcacheDatastore {
366    /// Build a new Memcache-fronting datastore.
367    ///
368    /// # Examples
369    ///
370    /// ```
371    /// use dynomite::embed::hooks::MemcacheDatastore;
372    /// let m = MemcacheDatastore::new("127.0.0.1:11211");
373    /// assert_eq!(m.target(), "127.0.0.1:11211");
374    /// ```
375    pub fn new(target: impl Into<String>) -> Self {
376        Self {
377            target: target.into(),
378        }
379    }
380
381    /// Return the configured target string.
382    #[must_use]
383    pub fn target(&self) -> &str {
384        &self.target
385    }
386}
387
388impl Datastore for MemcacheDatastore {
389    fn protocol(&self) -> Protocol {
390        Protocol::Memcache
391    }
392
393    fn dispatch(&self, req: Msg) -> BoxFuture<'_, Result<Msg, DatastoreError>> {
394        Box::pin(async move {
395            let mut rsp = Msg::new(req.id(), MsgType::RspMcEnd, false);
396            rsp.set_parent_id(req.id());
397            Ok(rsp)
398        })
399    }
400}
401
402// ---------- SeedsProvider ---------------------------------------------------
403
404/// Pluggable seeds provider.
405///
406/// The trait is the embed-API mirror of
407/// [`crate::seeds::SeedsProvider`]; the in-crate providers are
408/// re-exported below as default implementations.
409///
410/// # Examples
411///
412/// ```
413/// use dynomite::embed::hooks::{SeedsProvider, SimpleSeedsProvider};
414/// use dynomite::conf::ConfDynSeed;
415/// let sp = SimpleSeedsProvider::new(vec![ConfDynSeed::parse("h:1:r:d:1").unwrap()]);
416/// assert_eq!(sp.fetch().unwrap().len(), 1);
417/// ```
418pub trait SeedsProvider: Send + Sync {
419    /// Return the current list of seeds.
420    fn fetch(&self) -> Result<Vec<ConfDynSeed>, SeedsError>;
421
422    /// Refresh interval used by the gossip task between calls to
423    /// [`SeedsProvider::fetch`].
424    fn refresh_interval(&self) -> Duration {
425        Duration::from_secs(30)
426    }
427}
428
429// LegacySeedsAdapter removed: the type would have leaked the
430// in-crate `crate::seeds::SeedsProvider` trait onto the public
431// API via its generic bound. Embedders that need to lift an
432// existing `SeedsProvider` impl wrap it in
433// `Box<dyn SeedsProvider>` directly and pass it to
434// [`crate::embed::ServerBuilder::seeds_provider`].
435
436/// Re-exported [`crate::seeds::simple::SimpleSeedsProvider`] with
437/// the embed [`SeedsProvider`] trait already implemented.
438#[derive(Debug, Clone, Default)]
439pub struct SimpleSeedsProvider {
440    inner: InnerSimpleSeedsProvider,
441}
442
443impl SimpleSeedsProvider {
444    /// Build a new in-memory provider.
445    ///
446    /// # Examples
447    ///
448    /// ```
449    /// use dynomite::embed::hooks::{SeedsProvider, SimpleSeedsProvider};
450    /// let p = SimpleSeedsProvider::new(Vec::new());
451    /// assert_eq!(p.fetch().unwrap().len(), 0);
452    /// ```
453    #[must_use]
454    pub fn new(seeds: Vec<ConfDynSeed>) -> Self {
455        Self {
456            inner: InnerSimpleSeedsProvider::new(seeds),
457        }
458    }
459}
460
461impl SeedsProvider for SimpleSeedsProvider {
462    fn fetch(&self) -> Result<Vec<ConfDynSeed>, SeedsError> {
463        self.inner.get_seeds()
464    }
465}
466
467/// DNS-resolving seeds provider, re-exported under the embed
468/// trait.
469pub type DnsSeedsProvider = InnerDnsSeedsProvider;
470
471impl SeedsProvider for DnsSeedsProvider {
472    fn fetch(&self) -> Result<Vec<ConfDynSeed>, SeedsError> {
473        self.get_seeds()
474    }
475}
476
477/// Florida HTTP seeds provider, re-exported under the embed
478/// trait.
479pub type FloridaSeedsProvider = InnerFloridaSeedsProvider;
480
481impl SeedsProvider for FloridaSeedsProvider {
482    fn fetch(&self) -> Result<Vec<ConfDynSeed>, SeedsError> {
483        self.get_seeds()
484    }
485}
486
487// ---------- CryptoProvider --------------------------------------------------
488
489/// Errors produced by a [`CryptoProvider`].
490#[derive(Debug, Error)]
491#[non_exhaustive]
492pub enum CryptoProviderError {
493    /// Encryption failed.
494    #[error("encryption failed: {0}")]
495    Encrypt(String),
496    /// Decryption failed.
497    #[error("decryption failed: {0}")]
498    Decrypt(String),
499    /// The provider was misconfigured (key length, padding, ...).
500    #[error("misconfiguration: {0}")]
501    Misconfigured(String),
502}
503
504/// Pluggable AES + RSA provider for the DNODE peer protocol.
505///
506/// The default in-crate provider [`RustCryptoProvider`] wraps
507/// [`crate::crypto::Crypto`]. HSM / KMS integrations implement
508/// the trait against their hardware bridge.
509///
510/// # Examples
511///
512/// ```no_run
513/// use dynomite::embed::hooks::{CryptoProvider, RustCryptoProvider};
514/// use dynomite::crypto::Crypto;
515/// // Construct the underlying Crypto from a PEM file at runtime.
516/// let crypto = Crypto::from_pem("/etc/dynomite/dynomite.pem").unwrap();
517/// let provider = RustCryptoProvider::new(crypto);
518/// assert!(provider.rsa_size() > 0);
519/// ```
520pub trait CryptoProvider: Send + Sync {
521    /// Modulus length in bytes for the configured RSA key.
522    fn rsa_size(&self) -> usize;
523
524    /// Borrow the AES key buffer used by the DNODE handshake.
525    fn aes_key(&self) -> [u8; crate::crypto::AES_KEYLEN];
526
527    /// AES-encrypt `plaintext` under the provider's key.
528    fn aes_encrypt(&self, plaintext: &[u8]) -> Result<Vec<u8>, CryptoProviderError>;
529
530    /// AES-decrypt `ciphertext` under the provider's key.
531    fn aes_decrypt(&self, ciphertext: &[u8]) -> Result<Vec<u8>, CryptoProviderError>;
532
533    /// RSA-encrypt `plaintext` under the provider's public key.
534    fn rsa_encrypt(&self, plaintext: &[u8]) -> Result<Vec<u8>, CryptoProviderError>;
535
536    /// RSA-decrypt `ciphertext` under the provider's private key.
537    fn rsa_decrypt(&self, ciphertext: &[u8]) -> Result<Vec<u8>, CryptoProviderError>;
538}
539
540/// Default crypto provider built on the in-crate
541/// [`crate::crypto::Crypto`] (RustCrypto-backed AES + RSA).
542///
543/// Despite the historical "Openssl" name in the design doc, the
544/// implementation uses the workspace's RustCrypto stack. Recorded
545/// as a Deviation in `docs/parity.md`.
546#[derive(Debug)]
547pub struct RustCryptoProvider {
548    crypto: Arc<crate::crypto::Crypto>,
549}
550
551impl RustCryptoProvider {
552    /// Wrap an existing [`crate::crypto::Crypto`].
553    ///
554    /// # Examples
555    ///
556    /// ```no_run
557    /// use dynomite::embed::hooks::{CryptoProvider, RustCryptoProvider};
558    /// use dynomite::crypto::Crypto;
559    /// // Production embedders load the key from disk:
560    /// let crypto = Crypto::from_pem("/etc/dynomite/dynomite.pem").unwrap();
561    /// let p = RustCryptoProvider::new(crypto);
562    /// assert!(p.rsa_size() >= 128);
563    /// ```
564    #[must_use]
565    pub fn new(crypto: crate::crypto::Crypto) -> Self {
566        Self {
567            crypto: Arc::new(crypto),
568        }
569    }
570
571    /// Construct from an [`Arc`]-shared crypto bundle.
572    #[must_use]
573    pub fn from_arc(crypto: Arc<crate::crypto::Crypto>) -> Self {
574        Self { crypto }
575    }
576}
577
578impl CryptoProvider for RustCryptoProvider {
579    fn rsa_size(&self) -> usize {
580        self.crypto.rsa_size()
581    }
582
583    fn aes_key(&self) -> [u8; crate::crypto::AES_KEYLEN] {
584        *self.crypto.aes_key()
585    }
586
587    fn aes_encrypt(&self, plaintext: &[u8]) -> Result<Vec<u8>, CryptoProviderError> {
588        crate::crypto::Crypto::aes_encrypt(plaintext, self.crypto.aes_key())
589            .map_err(|e| CryptoProviderError::Encrypt(e.to_string()))
590    }
591
592    fn aes_decrypt(&self, ciphertext: &[u8]) -> Result<Vec<u8>, CryptoProviderError> {
593        crate::crypto::Crypto::aes_decrypt(ciphertext, self.crypto.aes_key())
594            .map_err(|e| CryptoProviderError::Decrypt(e.to_string()))
595    }
596
597    fn rsa_encrypt(&self, plaintext: &[u8]) -> Result<Vec<u8>, CryptoProviderError> {
598        self.crypto
599            .rsa_encrypt(plaintext)
600            .map_err(|e| CryptoProviderError::Encrypt(e.to_string()))
601    }
602
603    fn rsa_decrypt(&self, ciphertext: &[u8]) -> Result<Vec<u8>, CryptoProviderError> {
604        self.crypto
605            .rsa_decrypt(ciphertext)
606            .map_err(|e| CryptoProviderError::Decrypt(e.to_string()))
607    }
608}
609
610// ---------- MetricsSink -----------------------------------------------------
611
612/// Errors produced by a [`MetricsSink`].
613#[derive(Debug, Error)]
614#[non_exhaustive]
615pub enum MetricsError {
616    /// The sink could not flush the snapshot.
617    #[error("metrics flush failed: {0}")]
618    Flush(String),
619}
620
621/// Pluggable metrics exporter.
622///
623/// The default sink ([`LoggingMetricsSink`]) emits one `tracing`
624/// event per flush. A `PrometheusMetricsSink` is intentionally
625/// not shipped by default to avoid adding a dependency to the
626/// workspace; see the embedding cookbook for the recommended
627/// adapter shape. Recorded as a Deviation in `docs/parity.md`.
628///
629/// # Examples
630///
631/// ```
632/// use dynomite::embed::hooks::{LoggingMetricsSink, MetricsSink};
633/// use dynomite::stats::Snapshot;
634/// # tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(async {
635/// let sink = LoggingMetricsSink::new("test");
636/// sink.emit(&Snapshot::default()).await.unwrap();
637/// # });
638/// ```
639pub trait MetricsSink: Send + Sync {
640    /// Push a stats snapshot to the sink.
641    fn emit<'a>(&'a self, snapshot: &'a Snapshot) -> BoxFuture<'a, Result<(), MetricsError>>;
642
643    /// Flush interval requested by the sink.
644    fn flush_interval(&self) -> Duration {
645        Duration::from_secs(10)
646    }
647
648    /// Optional manifest of every metric the sink expects to
649    /// receive.
650    fn manifest(&self) -> Vec<MetricSpec> {
651        Vec::new()
652    }
653}
654
655/// Default metrics sink: emits one `tracing::info` event per
656/// flush. Cheap, dependency-free, and useful in development.
657#[derive(Debug, Clone)]
658pub struct LoggingMetricsSink {
659    name: String,
660    counter: Arc<Mutex<u64>>,
661}
662
663impl LoggingMetricsSink {
664    /// Build a sink with a human-readable name.
665    ///
666    /// # Examples
667    ///
668    /// ```
669    /// use dynomite::embed::hooks::LoggingMetricsSink;
670    /// let s = LoggingMetricsSink::new("dyn_o_mite");
671    /// assert_eq!(s.flush_count(), 0);
672    /// ```
673    #[must_use]
674    pub fn new(name: impl Into<String>) -> Self {
675        Self {
676            name: name.into(),
677            counter: Arc::new(Mutex::new(0)),
678        }
679    }
680
681    /// Number of flushes the sink has observed.
682    #[must_use]
683    pub fn flush_count(&self) -> u64 {
684        *self.counter.lock()
685    }
686}
687
688impl MetricsSink for LoggingMetricsSink {
689    fn emit<'a>(&'a self, snapshot: &'a Snapshot) -> BoxFuture<'a, Result<(), MetricsError>> {
690        let counter = self.counter.clone();
691        let name = self.name.clone();
692        let pool_name = snapshot.pool.name.clone();
693        Box::pin(async move {
694            *counter.lock() += 1;
695            tracing::info!(sink = %name, pool = %pool_name, "metrics flush");
696            Ok(())
697        })
698    }
699
700    fn manifest(&self) -> Vec<MetricSpec> {
701        // Defer to the in-crate descriptor table; the JSON form
702        // already contains every metric the engine emits.
703        let json = describe_stats();
704        // The descriptor table is consumed downstream as opaque
705        // text. Returning an empty vec keeps the trait signature
706        // honest while logging the manifest size for diagnostics.
707        tracing::debug!(bytes = json.len(), "metrics manifest");
708        Vec::new()
709    }
710}
711
712// ---------- Streaming Datastore extensions ---------------------------------
713//
714// The streaming list path lets transports emit the bucket / key
715// catalogue in chunks instead of buffering it. A `Datastore` impl
716// produces one `Bytes` per entry; the transport (PBC server, HTTP
717// gateway) buffers an implementation-chosen number of entries per
718// outbound frame.
719
720use bytes::Bytes;
721use std::collections::{BTreeMap, BTreeSet};
722use std::sync::OnceLock;
723use std::task::{Context, Poll};
724
725/// Type alias for the byte stream returned by
726/// [`Datastore::list_buckets_stream`] and
727/// [`Datastore::list_keys_stream`].
728///
729/// Each `Bytes` item is one bucket name or key. The stream may
730/// surface a [`DatastoreError`] mid-iteration; transports translate
731/// that into a wire-level error response and stop emitting frames.
732pub type DatastoreByteStream =
733    Pin<Box<dyn futures_core::Stream<Item = Result<Bytes, DatastoreError>> + Send>>;
734
735/// Build a one-shot stream that yields a single
736/// [`DatastoreError::Unsupported`] item. The default body for
737/// [`Datastore::list_buckets_stream`] /
738/// [`Datastore::list_keys_stream`] so existing impls keep working
739/// without modification.
740fn unsupported_byte_stream(
741) -> impl futures_core::Stream<Item = Result<Bytes, DatastoreError>> + Send {
742    UnsupportedListStream { emitted: false }
743}
744
745struct UnsupportedListStream {
746    emitted: bool,
747}
748
749impl futures_core::Stream for UnsupportedListStream {
750    type Item = Result<Bytes, DatastoreError>;
751
752    fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
753        if self.emitted {
754            return Poll::Ready(None);
755        }
756        self.emitted = true;
757        Poll::Ready(Some(Err(DatastoreError::Unsupported(MsgType::Unknown))))
758    }
759}
760
761/// `futures_core::Stream` that drains an owned `Vec<Bytes>` one
762/// item at a time. Used by [`MemoryDatastore`] to back its
763/// streaming list overrides.
764struct VecByteStream {
765    items: std::vec::IntoIter<Bytes>,
766}
767
768impl futures_core::Stream for VecByteStream {
769    type Item = Result<Bytes, DatastoreError>;
770
771    fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
772        match self.items.next() {
773            Some(b) => Poll::Ready(Some(Ok(b))),
774            None => Poll::Ready(None),
775        }
776    }
777}
778
779// ---------- MemoryDatastore listing index ----------------------------------
780//
781// `MemoryDatastore`'s public field layout (a single `Arc<Mutex<...>>`
782// holding the dispatch counter) was committed before the streaming
783// list path existed. To keep the published surface unchanged, the
784// per-instance bucket/key index is held in a process-wide registry
785// keyed by the `Arc<Mutex<MemoryStore>>` pointer identity. Cloning
786// a `MemoryDatastore` shares its `inner` `Arc`, so clones see the
787// same listing -- mirroring the existing dispatch-count behaviour.
788
789#[derive(Debug, Default)]
790struct MemoryListing {
791    buckets: BTreeMap<Vec<u8>, BTreeSet<Vec<u8>>>,
792}
793
794#[derive(Debug, Default, Clone)]
795struct ListingHandle {
796    inner: Arc<Mutex<MemoryListing>>,
797}
798
799fn listing_for(ds: &MemoryDatastore) -> ListingHandle {
800    static REGISTRY: OnceLock<Mutex<Vec<(usize, ListingHandle)>>> = OnceLock::new();
801    let registry = REGISTRY.get_or_init(|| Mutex::new(Vec::new()));
802    let id = Arc::as_ptr(&ds.inner) as usize;
803    let mut g = registry.lock();
804    if let Some((_, h)) = g.iter().find(|(k, _)| *k == id) {
805        return h.clone();
806    }
807    let h = ListingHandle::default();
808    g.push((id, h.clone()));
809    h
810}
811
812impl MemoryDatastore {
813    /// Insert `(bucket, key)` into the in-memory listing index.
814    ///
815    /// Idempotent: inserting the same `(bucket, key)` pair twice is
816    /// a no-op. Tests use this helper to seed the streaming list
817    /// path without speaking the real Riak protocol.
818    ///
819    /// # Examples
820    ///
821    /// ```
822    /// use dynomite::embed::hooks::MemoryDatastore;
823    /// let ds = MemoryDatastore::new();
824    /// ds.insert(b"users", b"alice");
825    /// ds.insert(b"users", b"bob");
826    /// assert_eq!(ds.list_buckets_snapshot().len(), 1);
827    /// assert_eq!(ds.list_keys_snapshot(b"users").len(), 2);
828    /// ```
829    pub fn insert(&self, bucket: &[u8], key: &[u8]) {
830        let h = listing_for(self);
831        let mut g = h.inner.lock();
832        g.buckets
833            .entry(bucket.to_vec())
834            .or_default()
835            .insert(key.to_vec());
836    }
837
838    /// Snapshot of the bucket name set, sorted lexicographically.
839    ///
840    /// # Examples
841    ///
842    /// ```
843    /// use dynomite::embed::hooks::MemoryDatastore;
844    /// let ds = MemoryDatastore::new();
845    /// assert!(ds.list_buckets_snapshot().is_empty());
846    /// ```
847    #[must_use]
848    pub fn list_buckets_snapshot(&self) -> Vec<Bytes> {
849        let h = listing_for(self);
850        let g = h.inner.lock();
851        g.buckets
852            .keys()
853            .map(|b| Bytes::copy_from_slice(b))
854            .collect()
855    }
856
857    /// Snapshot of the keys in `bucket`, sorted lexicographically.
858    /// Returns an empty vector when the bucket is unknown.
859    ///
860    /// # Examples
861    ///
862    /// ```
863    /// use dynomite::embed::hooks::MemoryDatastore;
864    /// let ds = MemoryDatastore::new();
865    /// assert!(ds.list_keys_snapshot(b"missing").is_empty());
866    /// ```
867    #[must_use]
868    pub fn list_keys_snapshot(&self, bucket: &[u8]) -> Vec<Bytes> {
869        let h = listing_for(self);
870        let g = h.inner.lock();
871        g.buckets
872            .get(bucket)
873            .map(|s| s.iter().map(|k| Bytes::copy_from_slice(k)).collect())
874            .unwrap_or_default()
875    }
876}