commonware_runtime/
tokio.rs

1//! A production-focused runtime based on [Tokio](https://tokio.rs) with
2//! secure randomness and storage backed by the local filesystem.
3//!
4//! # Panics
5//!
6//! By default, the runtime will catch any panic and log the error. It is
7//! possible to override this behavior in the configuration.
8//!
9//! # Example
10//!
11//! ```rust
12//! use commonware_runtime::{Spawner, Runner, tokio::Executor, Metrics};
13//!
14//! let (executor, runtime) = Executor::default();
15//! executor.start(async move {
16//!     println!("Parent started");
17//!     let result = runtime.with_label("child").spawn(|_| async move {
18//!         println!("Child started");
19//!         "hello"
20//!     });
21//!     println!("Child result: {:?}", result.await);
22//!     println!("Parent exited");
23//! });
24//! ```
25
26use crate::{utils::Signaler, Clock, Error, Handle, Signal, METRICS_PREFIX};
27use commonware_utils::{from_hex, hex};
28use governor::clock::{Clock as GClock, ReasonablyRealtime};
29use prometheus_client::{
30    encoding::{text::encode, EncodeLabelSet},
31    metrics::{counter::Counter, family::Family, gauge::Gauge},
32    registry::{Metric, Registry},
33};
34use rand::{rngs::OsRng, CryptoRng, RngCore};
35use std::{
36    env,
37    future::Future,
38    io::SeekFrom,
39    net::SocketAddr,
40    path::PathBuf,
41    sync::{Arc, Mutex},
42    time::{Duration, SystemTime},
43};
44use tokio::{
45    fs,
46    io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt},
47    net::{tcp::OwnedReadHalf, tcp::OwnedWriteHalf, TcpListener, TcpStream},
48    runtime::{Builder, Runtime},
49    sync::Mutex as AsyncMutex,
50    time::timeout,
51};
52use tracing::warn;
53
54#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
55struct Work {
56    label: String,
57}
58
59#[derive(Debug)]
60struct Metrics {
61    tasks_spawned: Family<Work, Counter>,
62    tasks_running: Family<Work, Gauge>,
63
64    // As nice as it would be to track each of these by socket address,
65    // it quickly becomes an OOM attack vector.
66    inbound_connections: Counter,
67    outbound_connections: Counter,
68    inbound_bandwidth: Counter,
69    outbound_bandwidth: Counter,
70
71    open_blobs: Gauge,
72    storage_reads: Counter,
73    storage_read_bytes: Counter,
74    storage_writes: Counter,
75    storage_write_bytes: Counter,
76}
77
78impl Metrics {
79    pub fn init(registry: &mut Registry) -> Self {
80        let metrics = Self {
81            tasks_spawned: Family::default(),
82            tasks_running: Family::default(),
83            inbound_connections: Counter::default(),
84            outbound_connections: Counter::default(),
85            inbound_bandwidth: Counter::default(),
86            outbound_bandwidth: Counter::default(),
87            open_blobs: Gauge::default(),
88            storage_reads: Counter::default(),
89            storage_read_bytes: Counter::default(),
90            storage_writes: Counter::default(),
91            storage_write_bytes: Counter::default(),
92        };
93        registry.register(
94            "tasks_spawned",
95            "Total number of tasks spawned",
96            metrics.tasks_spawned.clone(),
97        );
98        registry.register(
99            "tasks_running",
100            "Number of tasks currently running",
101            metrics.tasks_running.clone(),
102        );
103        registry.register(
104            "inbound_connections",
105            "Number of connections created by dialing us",
106            metrics.inbound_connections.clone(),
107        );
108        registry.register(
109            "outbound_connections",
110            "Number of connections created by dialing others",
111            metrics.outbound_connections.clone(),
112        );
113        registry.register(
114            "inbound_bandwidth",
115            "Bandwidth used by receiving data from others",
116            metrics.inbound_bandwidth.clone(),
117        );
118        registry.register(
119            "outbound_bandwidth",
120            "Bandwidth used by sending data to others",
121            metrics.outbound_bandwidth.clone(),
122        );
123        registry.register(
124            "open_blobs",
125            "Number of open blobs",
126            metrics.open_blobs.clone(),
127        );
128        registry.register(
129            "storage_reads",
130            "Total number of disk reads",
131            metrics.storage_reads.clone(),
132        );
133        registry.register(
134            "storage_read_bytes",
135            "Total amount of data read from disk",
136            metrics.storage_read_bytes.clone(),
137        );
138        registry.register(
139            "storage_writes",
140            "Total number of disk writes",
141            metrics.storage_writes.clone(),
142        );
143        registry.register(
144            "storage_write_bytes",
145            "Total amount of data written to disk",
146            metrics.storage_write_bytes.clone(),
147        );
148        metrics
149    }
150}
151
152/// Configuration for the `tokio` runtime.
153#[derive(Clone)]
154pub struct Config {
155    /// Number of threads to use for the runtime.
156    pub threads: usize,
157
158    /// Whether or not to catch panics.
159    pub catch_panics: bool,
160
161    /// Duration after which to close the connection if no message is read.
162    pub read_timeout: Duration,
163
164    /// Duration after which to close the connection if a message cannot be written.
165    pub write_timeout: Duration,
166
167    /// Whether or not to disable Nagle's algorithm.
168    ///
169    /// The algorithm combines a series of small network packets into a single packet
170    /// before sending to reduce overhead of sending multiple small packets which might not
171    /// be efficient on slow, congested networks. However, to do so the algorithm introduces
172    /// a slight delay as it waits to accumulate more data. Latency-sensitive networks should
173    /// consider disabling it to send the packets as soon as possible to reduce latency.
174    ///
175    /// Note: Make sure that your compile target has and allows this configuration otherwise
176    /// panics or unexpected behaviours are possible.
177    pub tcp_nodelay: Option<bool>,
178
179    /// Base directory for all storage operations.
180    pub storage_directory: PathBuf,
181
182    /// Maximum buffer size for operations on blobs.
183    ///
184    /// `tokio` defaults this value to 2MB.
185    pub maximum_buffer_size: usize,
186}
187
188impl Default for Config {
189    fn default() -> Self {
190        // Generate a random directory name to avoid conflicts (used in tests, so we shouldn't need to reload)
191        let rng = OsRng.next_u64();
192        let storage_directory = env::temp_dir().join(format!("commonware_tokio_runtime_{}", rng));
193
194        // Return the configuration
195        Self {
196            threads: 2,
197            catch_panics: true,
198            read_timeout: Duration::from_secs(60),
199            write_timeout: Duration::from_secs(30),
200            tcp_nodelay: None,
201            storage_directory,
202            maximum_buffer_size: 2 * 1024 * 1024, // 2 MB
203        }
204    }
205}
206
207/// Runtime based on [Tokio](https://tokio.rs).
208pub struct Executor {
209    cfg: Config,
210    registry: Mutex<Registry>,
211    metrics: Arc<Metrics>,
212    runtime: Runtime,
213    fs: AsyncMutex<()>,
214    signaler: Mutex<Signaler>,
215    signal: Signal,
216}
217
218impl Executor {
219    /// Initialize a new `tokio` runtime with the given number of threads.
220    pub fn init(cfg: Config) -> (Runner, Context) {
221        // Create a new registry
222        let mut registry = Registry::default();
223        let runtime_registry = registry.sub_registry_with_prefix(METRICS_PREFIX);
224
225        // Initialize runtime
226        let metrics = Arc::new(Metrics::init(runtime_registry));
227        let runtime = Builder::new_multi_thread()
228            .worker_threads(cfg.threads)
229            .enable_all()
230            .build()
231            .expect("failed to create Tokio runtime");
232        let (signaler, signal) = Signaler::new();
233        let executor = Arc::new(Self {
234            cfg,
235            registry: Mutex::new(registry),
236            metrics,
237            runtime,
238            fs: AsyncMutex::new(()),
239            signaler: Mutex::new(signaler),
240            signal,
241        });
242        (
243            Runner {
244                executor: executor.clone(),
245            },
246            Context {
247                label: String::new(),
248                spawned: false,
249                executor,
250            },
251        )
252    }
253
254    /// Initialize a new `tokio` runtime with default configuration.
255    // We'd love to implement the trait but we can't because of the return type.
256    #[allow(clippy::should_implement_trait)]
257    pub fn default() -> (Runner, Context) {
258        Self::init(Config::default())
259    }
260}
261
262/// Implementation of [`crate::Runner`] for the `tokio` runtime.
263pub struct Runner {
264    executor: Arc<Executor>,
265}
266
267impl crate::Runner for Runner {
268    fn start<F>(self, f: F) -> F::Output
269    where
270        F: Future + Send + 'static,
271        F::Output: Send + 'static,
272    {
273        self.executor.runtime.block_on(f)
274    }
275}
276
277/// Implementation of [`crate::Spawner`], [`crate::Clock`],
278/// [`crate::Network`], and [`crate::Storage`] for the `tokio`
279/// runtime.
280pub struct Context {
281    label: String,
282    spawned: bool,
283    executor: Arc<Executor>,
284}
285
286impl Clone for Context {
287    fn clone(&self) -> Self {
288        Self {
289            label: self.label.clone(),
290            spawned: false,
291            executor: self.executor.clone(),
292        }
293    }
294}
295
296impl crate::Spawner for Context {
297    fn spawn<F, Fut, T>(self, f: F) -> Handle<T>
298    where
299        F: FnOnce(Self) -> Fut + Send + 'static,
300        Fut: Future<Output = T> + Send + 'static,
301        T: Send + 'static,
302    {
303        // Ensure a context only spawns one task
304        assert!(!self.spawned, "already spawned");
305
306        // Get metrics
307        let work = Work {
308            label: self.label.clone(),
309        };
310        self.executor
311            .metrics
312            .tasks_spawned
313            .get_or_create(&work)
314            .inc();
315        let gauge = self
316            .executor
317            .metrics
318            .tasks_running
319            .get_or_create(&work)
320            .clone();
321
322        // Set up the task
323        let catch_panics = self.executor.cfg.catch_panics;
324        let executor = self.executor.clone();
325        let future = f(self);
326        let (f, handle) = Handle::init(future, gauge, catch_panics);
327
328        // Spawn the task
329        executor.runtime.spawn(f);
330        handle
331    }
332
333    fn spawn_ref<F, T>(&mut self) -> impl FnOnce(F) -> Handle<T> + 'static
334    where
335        F: Future<Output = T> + Send + 'static,
336        T: Send + 'static,
337    {
338        // Ensure a context only spawns one task
339        assert!(!self.spawned, "already spawned");
340        self.spawned = true;
341
342        // Get metrics
343        let work = Work {
344            label: self.label.clone(),
345        };
346        self.executor
347            .metrics
348            .tasks_spawned
349            .get_or_create(&work)
350            .inc();
351        let gauge = self
352            .executor
353            .metrics
354            .tasks_running
355            .get_or_create(&work)
356            .clone();
357
358        // Set up the task
359        let executor = self.executor.clone();
360        move |f: F| {
361            let (f, handle) = Handle::init(f, gauge, executor.cfg.catch_panics);
362
363            // Spawn the task
364            executor.runtime.spawn(f);
365            handle
366        }
367    }
368
369    fn stop(&self, value: i32) {
370        self.executor.signaler.lock().unwrap().signal(value);
371    }
372
373    fn stopped(&self) -> Signal {
374        self.executor.signal.clone()
375    }
376}
377
378impl crate::Metrics for Context {
379    fn with_label(&self, label: &str) -> Self {
380        let label = {
381            let prefix = self.label.clone();
382            if prefix.is_empty() {
383                label.to_string()
384            } else {
385                format!("{}_{}", prefix, label)
386            }
387        };
388        assert!(
389            !label.starts_with(METRICS_PREFIX),
390            "using runtime label is not allowed"
391        );
392        Self {
393            label,
394            spawned: false,
395            executor: self.executor.clone(),
396        }
397    }
398
399    fn label(&self) -> String {
400        self.label.clone()
401    }
402
403    fn register<N: Into<String>, H: Into<String>>(&self, name: N, help: H, metric: impl Metric) {
404        let name = name.into();
405        let prefixed_name = {
406            let prefix = &self.label;
407            if prefix.is_empty() {
408                name
409            } else {
410                format!("{}_{}", *prefix, name)
411            }
412        };
413        self.executor
414            .registry
415            .lock()
416            .unwrap()
417            .register(prefixed_name, help, metric)
418    }
419
420    fn encode(&self) -> String {
421        let mut buffer = String::new();
422        encode(&mut buffer, &self.executor.registry.lock().unwrap()).expect("encoding failed");
423        buffer
424    }
425}
426
427impl Clock for Context {
428    fn current(&self) -> SystemTime {
429        SystemTime::now()
430    }
431
432    fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static {
433        tokio::time::sleep(duration)
434    }
435
436    fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static {
437        let now = SystemTime::now();
438        let duration_until_deadline = match deadline.duration_since(now) {
439            Ok(duration) => duration,
440            Err(_) => Duration::from_secs(0), // Deadline is in the past
441        };
442        let target_instant = tokio::time::Instant::now() + duration_until_deadline;
443        tokio::time::sleep_until(target_instant)
444    }
445}
446
447impl GClock for Context {
448    type Instant = SystemTime;
449
450    fn now(&self) -> Self::Instant {
451        self.current()
452    }
453}
454
455impl ReasonablyRealtime for Context {}
456
457impl crate::Network<Listener, Sink, Stream> for Context {
458    async fn bind(&self, socket: SocketAddr) -> Result<Listener, Error> {
459        TcpListener::bind(socket)
460            .await
461            .map_err(|_| Error::BindFailed)
462            .map(|listener| Listener {
463                context: self.clone(),
464                listener,
465            })
466    }
467
468    async fn dial(&self, socket: SocketAddr) -> Result<(Sink, Stream), Error> {
469        // Create a new TCP stream
470        let stream = TcpStream::connect(socket)
471            .await
472            .map_err(|_| Error::ConnectionFailed)?;
473        self.executor.metrics.outbound_connections.inc();
474
475        // Set TCP_NODELAY if configured
476        if let Some(tcp_nodelay) = self.executor.cfg.tcp_nodelay {
477            if let Err(err) = stream.set_nodelay(tcp_nodelay) {
478                warn!(?err, "failed to set TCP_NODELAY");
479            }
480        }
481
482        // Return the sink and stream
483        let context = self.clone();
484        let (stream, sink) = stream.into_split();
485        Ok((
486            Sink {
487                context: context.clone(),
488                sink,
489            },
490            Stream { context, stream },
491        ))
492    }
493}
494
495/// Implementation of [`crate::Listener`] for the `tokio` runtime.
496pub struct Listener {
497    context: Context,
498    listener: TcpListener,
499}
500
501impl crate::Listener<Sink, Stream> for Listener {
502    async fn accept(&mut self) -> Result<(SocketAddr, Sink, Stream), Error> {
503        // Accept a new TCP stream
504        let (stream, addr) = self.listener.accept().await.map_err(|_| Error::Closed)?;
505        self.context.executor.metrics.inbound_connections.inc();
506
507        // Set TCP_NODELAY if configured
508        if let Some(tcp_nodelay) = self.context.executor.cfg.tcp_nodelay {
509            if let Err(err) = stream.set_nodelay(tcp_nodelay) {
510                warn!(?err, "failed to set TCP_NODELAY");
511            }
512        }
513
514        // Return the sink and stream
515        let context = self.context.clone();
516        let (stream, sink) = stream.into_split();
517        Ok((
518            addr,
519            Sink {
520                context: context.clone(),
521                sink,
522            },
523            Stream { context, stream },
524        ))
525    }
526}
527
528/// Implementation of [`crate::Sink`] for the `tokio` runtime.
529pub struct Sink {
530    context: Context,
531    sink: OwnedWriteHalf,
532}
533
534impl crate::Sink for Sink {
535    async fn send(&mut self, msg: &[u8]) -> Result<(), Error> {
536        let len = msg.len();
537        timeout(
538            self.context.executor.cfg.write_timeout,
539            self.sink.write_all(msg),
540        )
541        .await
542        .map_err(|_| Error::Timeout)?
543        .map_err(|_| Error::SendFailed)?;
544        self.context
545            .executor
546            .metrics
547            .outbound_bandwidth
548            .inc_by(len as u64);
549        Ok(())
550    }
551}
552
553/// Implementation of [`crate::Stream`] for the `tokio` runtime.
554pub struct Stream {
555    context: Context,
556    stream: OwnedReadHalf,
557}
558
559impl crate::Stream for Stream {
560    async fn recv(&mut self, buf: &mut [u8]) -> Result<(), Error> {
561        // Wait for the stream to be readable
562        timeout(
563            self.context.executor.cfg.read_timeout,
564            self.stream.read_exact(buf),
565        )
566        .await
567        .map_err(|_| Error::Timeout)?
568        .map_err(|_| Error::RecvFailed)?;
569
570        // Record metrics
571        self.context
572            .executor
573            .metrics
574            .inbound_bandwidth
575            .inc_by(buf.len() as u64);
576
577        Ok(())
578    }
579}
580
581impl RngCore for Context {
582    fn next_u32(&mut self) -> u32 {
583        OsRng.next_u32()
584    }
585
586    fn next_u64(&mut self) -> u64 {
587        OsRng.next_u64()
588    }
589
590    fn fill_bytes(&mut self, dest: &mut [u8]) {
591        OsRng.fill_bytes(dest);
592    }
593
594    fn try_fill_bytes(&mut self, dest: &mut [u8]) -> Result<(), rand::Error> {
595        OsRng.try_fill_bytes(dest)
596    }
597}
598
599impl CryptoRng for Context {}
600
601/// Implementation of [`crate::Blob`] for the `tokio` runtime.
602pub struct Blob {
603    metrics: Arc<Metrics>,
604
605    partition: String,
606    name: Vec<u8>,
607
608    // Files must be seeked prior to any read or write operation and are thus
609    // not safe to concurrently interact with. If we switched to mapping files
610    // we could remove this lock.
611    //
612    // We also track the virtual file size because metadata isn't updated until
613    // the file is synced (not to mention it is a lot less fs calls).
614    file: Arc<AsyncMutex<(fs::File, u64)>>,
615}
616
617impl Blob {
618    fn new(
619        metrics: Arc<Metrics>,
620        partition: String,
621        name: &[u8],
622        file: fs::File,
623        len: u64,
624    ) -> Self {
625        metrics.open_blobs.inc();
626        Self {
627            metrics,
628            partition,
629            name: name.into(),
630            file: Arc::new(AsyncMutex::new((file, len))),
631        }
632    }
633}
634
635impl Clone for Blob {
636    fn clone(&self) -> Self {
637        // We implement `Clone` manually to ensure the `open_blobs` gauge is updated.
638        self.metrics.open_blobs.inc();
639        Self {
640            metrics: self.metrics.clone(),
641            partition: self.partition.clone(),
642            name: self.name.clone(),
643            file: self.file.clone(),
644        }
645    }
646}
647
648impl crate::Storage<Blob> for Context {
649    async fn open(&self, partition: &str, name: &[u8]) -> Result<Blob, Error> {
650        // Acquire the filesystem lock
651        let _guard = self.executor.fs.lock().await;
652
653        // Construct the full path
654        let path = self
655            .executor
656            .cfg
657            .storage_directory
658            .join(partition)
659            .join(hex(name));
660        let parent = match path.parent() {
661            Some(parent) => parent,
662            None => return Err(Error::PartitionCreationFailed(partition.into())),
663        };
664
665        // Create the partition directory if it does not exist
666        fs::create_dir_all(parent)
667            .await
668            .map_err(|_| Error::PartitionCreationFailed(partition.into()))?;
669
670        // Open the file in read-write mode, create if it does not exist
671        let mut file = fs::OpenOptions::new()
672            .read(true)
673            .write(true)
674            .create(true)
675            .truncate(false)
676            .open(&path)
677            .await
678            .map_err(|_| Error::BlobOpenFailed(partition.into(), hex(name)))?;
679
680        // Set the maximum buffer size
681        file.set_max_buf_size(self.executor.cfg.maximum_buffer_size);
682
683        // Get the file length
684        let len = file.metadata().await.map_err(|_| Error::ReadFailed)?.len();
685
686        // Construct the blob
687        Ok(Blob::new(
688            self.executor.metrics.clone(),
689            partition.into(),
690            name,
691            file,
692            len,
693        ))
694    }
695
696    async fn remove(&self, partition: &str, name: Option<&[u8]>) -> Result<(), Error> {
697        // Acquire the filesystem lock
698        let _guard = self.executor.fs.lock().await;
699
700        // Remove all related files
701        let path = self.executor.cfg.storage_directory.join(partition);
702        if let Some(name) = name {
703            let blob_path = path.join(hex(name));
704            fs::remove_file(blob_path)
705                .await
706                .map_err(|_| Error::BlobMissing(partition.into(), hex(name)))?;
707        } else {
708            fs::remove_dir_all(path)
709                .await
710                .map_err(|_| Error::PartitionMissing(partition.into()))?;
711        }
712        Ok(())
713    }
714
715    async fn scan(&self, partition: &str) -> Result<Vec<Vec<u8>>, Error> {
716        // Acquire the filesystem lock
717        let _guard = self.executor.fs.lock().await;
718
719        // Scan the partition directory
720        let path = self.executor.cfg.storage_directory.join(partition);
721        let mut entries = fs::read_dir(path)
722            .await
723            .map_err(|_| Error::PartitionMissing(partition.into()))?;
724        let mut blobs = Vec::new();
725        while let Some(entry) = entries.next_entry().await.map_err(|_| Error::ReadFailed)? {
726            let file_type = entry.file_type().await.map_err(|_| Error::ReadFailed)?;
727            if !file_type.is_file() {
728                return Err(Error::PartitionCorrupt(partition.into()));
729            }
730            if let Some(name) = entry.file_name().to_str() {
731                let name = from_hex(name).ok_or(Error::PartitionCorrupt(partition.into()))?;
732                blobs.push(name);
733            }
734        }
735        Ok(blobs)
736    }
737}
738
739impl crate::Blob for Blob {
740    async fn len(&self) -> Result<u64, Error> {
741        let (_, len) = *self.file.lock().await;
742        Ok(len)
743    }
744
745    async fn read_at(&self, buf: &mut [u8], offset: u64) -> Result<(), Error> {
746        // Ensure the read is within bounds
747        let mut file = self.file.lock().await;
748        if offset + buf.len() as u64 > file.1 {
749            return Err(Error::BlobInsufficientLength);
750        }
751
752        // Perform the read
753        file.0
754            .seek(SeekFrom::Start(offset))
755            .await
756            .map_err(|_| Error::ReadFailed)?;
757        file.0
758            .read_exact(buf)
759            .await
760            .map_err(|_| Error::ReadFailed)?;
761        self.metrics.storage_reads.inc();
762        self.metrics.storage_read_bytes.inc_by(buf.len() as u64);
763        Ok(())
764    }
765
766    async fn write_at(&self, buf: &[u8], offset: u64) -> Result<(), Error> {
767        // Perform the write
768        let mut file = self.file.lock().await;
769        file.0
770            .seek(SeekFrom::Start(offset))
771            .await
772            .map_err(|_| Error::WriteFailed)?;
773        file.0
774            .write_all(buf)
775            .await
776            .map_err(|_| Error::WriteFailed)?;
777
778        // Update the virtual file size
779        let max_len = offset + buf.len() as u64;
780        if max_len > file.1 {
781            file.1 = max_len;
782        }
783        self.metrics.storage_writes.inc();
784        self.metrics.storage_write_bytes.inc_by(buf.len() as u64);
785        Ok(())
786    }
787
788    async fn truncate(&self, len: u64) -> Result<(), Error> {
789        // Perform the truncate
790        let mut file = self.file.lock().await;
791        file.0
792            .set_len(len)
793            .await
794            .map_err(|_| Error::BlobTruncateFailed(self.partition.clone(), hex(&self.name)))?;
795
796        // Update the virtual file size
797        file.1 = len;
798        Ok(())
799    }
800
801    async fn sync(&self) -> Result<(), Error> {
802        let file = self.file.lock().await;
803        file.0
804            .sync_all()
805            .await
806            .map_err(|_| Error::BlobSyncFailed(self.partition.clone(), hex(&self.name)))
807    }
808
809    async fn close(self) -> Result<(), Error> {
810        let mut file = self.file.lock().await;
811        file.0
812            .sync_all()
813            .await
814            .map_err(|_| Error::BlobSyncFailed(self.partition.clone(), hex(&self.name)))?;
815        file.0
816            .shutdown()
817            .await
818            .map_err(|_| Error::BlobCloseFailed(self.partition.clone(), hex(&self.name)))
819    }
820}
821
822impl Drop for Blob {
823    fn drop(&mut self) {
824        self.metrics.open_blobs.dec();
825    }
826}