commonware_runtime/tokio/
runtime.rs

1use crate::{utils::Signaler, Clock, Error, Handle, Signal, METRICS_PREFIX};
2use commonware_utils::{from_hex, hex};
3use governor::clock::{Clock as GClock, ReasonablyRealtime};
4use prometheus_client::{
5    encoding::{text::encode, EncodeLabelSet},
6    metrics::{counter::Counter, family::Family, gauge::Gauge},
7    registry::{Metric, Registry},
8};
9use rand::{rngs::OsRng, CryptoRng, RngCore};
10use std::{
11    env,
12    future::Future,
13    io::{self, SeekFrom},
14    net::SocketAddr,
15    path::PathBuf,
16    sync::{Arc, Mutex},
17    time::{Duration, SystemTime},
18};
19use tokio::{
20    fs,
21    io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt},
22    net::{tcp::OwnedReadHalf, tcp::OwnedWriteHalf, TcpListener, TcpStream},
23    runtime::{Builder, Runtime},
24    sync::Mutex as AsyncMutex,
25    time::timeout,
26};
27use tracing::warn;
28
29#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
30struct Work {
31    label: String,
32}
33
34#[derive(Debug)]
35struct Metrics {
36    tasks_spawned: Family<Work, Counter>,
37    tasks_running: Family<Work, Gauge>,
38    blocking_tasks_spawned: Family<Work, Counter>,
39    blocking_tasks_running: Family<Work, Gauge>,
40
41    // As nice as it would be to track each of these by socket address,
42    // it quickly becomes an OOM attack vector.
43    inbound_connections: Counter,
44    outbound_connections: Counter,
45    inbound_bandwidth: Counter,
46    outbound_bandwidth: Counter,
47
48    open_blobs: Gauge,
49    storage_reads: Counter,
50    storage_read_bytes: Counter,
51    storage_writes: Counter,
52    storage_write_bytes: Counter,
53}
54
55impl Metrics {
56    pub fn init(registry: &mut Registry) -> Self {
57        let metrics = Self {
58            tasks_spawned: Family::default(),
59            tasks_running: Family::default(),
60            blocking_tasks_spawned: Family::default(),
61            blocking_tasks_running: Family::default(),
62            inbound_connections: Counter::default(),
63            outbound_connections: Counter::default(),
64            inbound_bandwidth: Counter::default(),
65            outbound_bandwidth: Counter::default(),
66            open_blobs: Gauge::default(),
67            storage_reads: Counter::default(),
68            storage_read_bytes: Counter::default(),
69            storage_writes: Counter::default(),
70            storage_write_bytes: Counter::default(),
71        };
72        registry.register(
73            "tasks_spawned",
74            "Total number of tasks spawned",
75            metrics.tasks_spawned.clone(),
76        );
77        registry.register(
78            "tasks_running",
79            "Number of tasks currently running",
80            metrics.tasks_running.clone(),
81        );
82        registry.register(
83            "blocking_tasks_spawned",
84            "Total number of blocking tasks spawned",
85            metrics.blocking_tasks_spawned.clone(),
86        );
87        registry.register(
88            "blocking_tasks_running",
89            "Number of blocking tasks currently running",
90            metrics.blocking_tasks_running.clone(),
91        );
92        registry.register(
93            "inbound_connections",
94            "Number of connections created by dialing us",
95            metrics.inbound_connections.clone(),
96        );
97        registry.register(
98            "outbound_connections",
99            "Number of connections created by dialing others",
100            metrics.outbound_connections.clone(),
101        );
102        registry.register(
103            "inbound_bandwidth",
104            "Bandwidth used by receiving data from others",
105            metrics.inbound_bandwidth.clone(),
106        );
107        registry.register(
108            "outbound_bandwidth",
109            "Bandwidth used by sending data to others",
110            metrics.outbound_bandwidth.clone(),
111        );
112        registry.register(
113            "open_blobs",
114            "Number of open blobs",
115            metrics.open_blobs.clone(),
116        );
117        registry.register(
118            "storage_reads",
119            "Total number of disk reads",
120            metrics.storage_reads.clone(),
121        );
122        registry.register(
123            "storage_read_bytes",
124            "Total amount of data read from disk",
125            metrics.storage_read_bytes.clone(),
126        );
127        registry.register(
128            "storage_writes",
129            "Total number of disk writes",
130            metrics.storage_writes.clone(),
131        );
132        registry.register(
133            "storage_write_bytes",
134            "Total amount of data written to disk",
135            metrics.storage_write_bytes.clone(),
136        );
137        metrics
138    }
139}
140
141/// Configuration for the `tokio` runtime.
142#[derive(Clone)]
143pub struct Config {
144    /// Number of threads to use for handling async tasks.
145    ///
146    /// Worker threads are always active (waiting for work).
147    ///
148    /// Tokio sets the default value to the number of logical CPUs.
149    pub worker_threads: usize,
150
151    /// Maximum number of threads to use for blocking tasks.
152    ///
153    /// Unlike worker threads, blocking threads are created as needed and
154    /// exit if left idle for too long.
155    ///
156    /// Tokio sets the default value to 512 to avoid hanging on lower-level
157    /// operations that require blocking (like `fs` and writing to `Stdout`).
158    pub max_blocking_threads: usize,
159
160    /// Whether or not to catch panics.
161    pub catch_panics: bool,
162
163    /// Duration after which to close the connection if no message is read.
164    pub read_timeout: Duration,
165
166    /// Duration after which to close the connection if a message cannot be written.
167    pub write_timeout: Duration,
168
169    /// Whether or not to disable Nagle's algorithm.
170    ///
171    /// The algorithm combines a series of small network packets into a single packet
172    /// before sending to reduce overhead of sending multiple small packets which might not
173    /// be efficient on slow, congested networks. However, to do so the algorithm introduces
174    /// a slight delay as it waits to accumulate more data. Latency-sensitive networks should
175    /// consider disabling it to send the packets as soon as possible to reduce latency.
176    ///
177    /// Note: Make sure that your compile target has and allows this configuration otherwise
178    /// panics or unexpected behaviours are possible.
179    pub tcp_nodelay: Option<bool>,
180
181    /// Base directory for all storage operations.
182    pub storage_directory: PathBuf,
183
184    /// Maximum buffer size for operations on blobs.
185    ///
186    /// Tokio sets the default value to 2MB.
187    pub maximum_buffer_size: usize,
188}
189
190impl Default for Config {
191    fn default() -> Self {
192        // Generate a random directory name to avoid conflicts (used in tests, so we shouldn't need to reload)
193        let rng = OsRng.next_u64();
194        let storage_directory = env::temp_dir().join(format!("commonware_tokio_runtime_{}", rng));
195
196        // Return the configuration
197        Self {
198            worker_threads: 2,
199            max_blocking_threads: 512,
200            catch_panics: true,
201            read_timeout: Duration::from_secs(60),
202            write_timeout: Duration::from_secs(30),
203            tcp_nodelay: None,
204            storage_directory,
205            maximum_buffer_size: 2 * 1024 * 1024, // 2 MB
206        }
207    }
208}
209
210/// Runtime based on [Tokio](https://tokio.rs).
211pub struct Executor {
212    cfg: Config,
213    registry: Mutex<Registry>,
214    metrics: Arc<Metrics>,
215    runtime: Runtime,
216    fs: AsyncMutex<()>,
217    signaler: Mutex<Signaler>,
218    signal: Signal,
219}
220
221impl Executor {
222    /// Initialize a new `tokio` runtime with the given number of threads.
223    pub fn init(cfg: Config) -> (Runner, Context) {
224        // Create a new registry
225        let mut registry = Registry::default();
226        let runtime_registry = registry.sub_registry_with_prefix(METRICS_PREFIX);
227
228        // Initialize runtime
229        let metrics = Arc::new(Metrics::init(runtime_registry));
230        let runtime = Builder::new_multi_thread()
231            .worker_threads(cfg.worker_threads)
232            .max_blocking_threads(cfg.max_blocking_threads)
233            .enable_all()
234            .build()
235            .expect("failed to create Tokio runtime");
236        let (signaler, signal) = Signaler::new();
237        let executor = Arc::new(Self {
238            cfg,
239            registry: Mutex::new(registry),
240            metrics,
241            runtime,
242            fs: AsyncMutex::new(()),
243            signaler: Mutex::new(signaler),
244            signal,
245        });
246        (
247            Runner {
248                executor: executor.clone(),
249            },
250            Context {
251                label: String::new(),
252                spawned: false,
253                executor,
254            },
255        )
256    }
257
258    /// Initialize a new `tokio` runtime with default configuration.
259    // We'd love to implement the trait but we can't because of the return type.
260    #[allow(clippy::should_implement_trait)]
261    pub fn default() -> (Runner, Context) {
262        Self::init(Config::default())
263    }
264}
265
266/// Implementation of [`crate::Runner`] for the `tokio` runtime.
267pub struct Runner {
268    executor: Arc<Executor>,
269}
270
271impl crate::Runner for Runner {
272    fn start<F>(self, f: F) -> F::Output
273    where
274        F: Future + Send + 'static,
275        F::Output: Send + 'static,
276    {
277        self.executor.runtime.block_on(f)
278    }
279}
280
281/// Implementation of [`crate::Spawner`], [`crate::Clock`],
282/// [`crate::Network`], and [`crate::Storage`] for the `tokio`
283/// runtime.
284pub struct Context {
285    label: String,
286    spawned: bool,
287    executor: Arc<Executor>,
288}
289
290impl Clone for Context {
291    fn clone(&self) -> Self {
292        Self {
293            label: self.label.clone(),
294            spawned: false,
295            executor: self.executor.clone(),
296        }
297    }
298}
299
300impl crate::Spawner for Context {
301    fn spawn<F, Fut, T>(self, f: F) -> Handle<T>
302    where
303        F: FnOnce(Self) -> Fut + Send + 'static,
304        Fut: Future<Output = T> + Send + 'static,
305        T: Send + 'static,
306    {
307        // Ensure a context only spawns one task
308        assert!(!self.spawned, "already spawned");
309
310        // Get metrics
311        let work = Work {
312            label: self.label.clone(),
313        };
314        self.executor
315            .metrics
316            .tasks_spawned
317            .get_or_create(&work)
318            .inc();
319        let gauge = self
320            .executor
321            .metrics
322            .tasks_running
323            .get_or_create(&work)
324            .clone();
325
326        // Set up the task
327        let catch_panics = self.executor.cfg.catch_panics;
328        let executor = self.executor.clone();
329        let future = f(self);
330        let (f, handle) = Handle::init(future, gauge, catch_panics);
331
332        // Spawn the task
333        executor.runtime.spawn(f);
334        handle
335    }
336
337    fn spawn_ref<F, T>(&mut self) -> impl FnOnce(F) -> Handle<T> + 'static
338    where
339        F: Future<Output = T> + Send + 'static,
340        T: Send + 'static,
341    {
342        // Ensure a context only spawns one task
343        assert!(!self.spawned, "already spawned");
344        self.spawned = true;
345
346        // Get metrics
347        let work = Work {
348            label: self.label.clone(),
349        };
350        self.executor
351            .metrics
352            .tasks_spawned
353            .get_or_create(&work)
354            .inc();
355        let gauge = self
356            .executor
357            .metrics
358            .tasks_running
359            .get_or_create(&work)
360            .clone();
361
362        // Set up the task
363        let executor = self.executor.clone();
364        move |f: F| {
365            let (f, handle) = Handle::init(f, gauge, executor.cfg.catch_panics);
366
367            // Spawn the task
368            executor.runtime.spawn(f);
369            handle
370        }
371    }
372
373    fn spawn_blocking<F, T>(self, f: F) -> Handle<T>
374    where
375        F: FnOnce() -> T + Send + 'static,
376        T: Send + 'static,
377    {
378        // Ensure a context only spawns one task
379        assert!(!self.spawned, "already spawned");
380
381        // Get metrics
382        let work = Work {
383            label: self.label.clone(),
384        };
385        self.executor
386            .metrics
387            .blocking_tasks_spawned
388            .get_or_create(&work)
389            .inc();
390        let gauge = self
391            .executor
392            .metrics
393            .blocking_tasks_running
394            .get_or_create(&work)
395            .clone();
396
397        // Initialize the blocking task using the new function
398        let (f, handle) = Handle::init_blocking(f, gauge, self.executor.cfg.catch_panics);
399
400        // Spawn the blocking task
401        self.executor.runtime.spawn_blocking(f);
402        handle
403    }
404
405    fn stop(&self, value: i32) {
406        self.executor.signaler.lock().unwrap().signal(value);
407    }
408
409    fn stopped(&self) -> Signal {
410        self.executor.signal.clone()
411    }
412}
413
414impl crate::Metrics for Context {
415    fn with_label(&self, label: &str) -> Self {
416        let label = {
417            let prefix = self.label.clone();
418            if prefix.is_empty() {
419                label.to_string()
420            } else {
421                format!("{}_{}", prefix, label)
422            }
423        };
424        assert!(
425            !label.starts_with(METRICS_PREFIX),
426            "using runtime label is not allowed"
427        );
428        Self {
429            label,
430            spawned: false,
431            executor: self.executor.clone(),
432        }
433    }
434
435    fn label(&self) -> String {
436        self.label.clone()
437    }
438
439    fn register<N: Into<String>, H: Into<String>>(&self, name: N, help: H, metric: impl Metric) {
440        let name = name.into();
441        let prefixed_name = {
442            let prefix = &self.label;
443            if prefix.is_empty() {
444                name
445            } else {
446                format!("{}_{}", *prefix, name)
447            }
448        };
449        self.executor
450            .registry
451            .lock()
452            .unwrap()
453            .register(prefixed_name, help, metric)
454    }
455
456    fn encode(&self) -> String {
457        let mut buffer = String::new();
458        encode(&mut buffer, &self.executor.registry.lock().unwrap()).expect("encoding failed");
459        buffer
460    }
461}
462
463impl Clock for Context {
464    fn current(&self) -> SystemTime {
465        SystemTime::now()
466    }
467
468    fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static {
469        tokio::time::sleep(duration)
470    }
471
472    fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static {
473        let now = SystemTime::now();
474        let duration_until_deadline = match deadline.duration_since(now) {
475            Ok(duration) => duration,
476            Err(_) => Duration::from_secs(0), // Deadline is in the past
477        };
478        let target_instant = tokio::time::Instant::now() + duration_until_deadline;
479        tokio::time::sleep_until(target_instant)
480    }
481}
482
483impl GClock for Context {
484    type Instant = SystemTime;
485
486    fn now(&self) -> Self::Instant {
487        self.current()
488    }
489}
490
491impl ReasonablyRealtime for Context {}
492
493impl crate::Network<Listener, Sink, Stream> for Context {
494    async fn bind(&self, socket: SocketAddr) -> Result<Listener, Error> {
495        TcpListener::bind(socket)
496            .await
497            .map_err(|_| Error::BindFailed)
498            .map(|listener| Listener {
499                context: self.clone(),
500                listener,
501            })
502    }
503
504    async fn dial(&self, socket: SocketAddr) -> Result<(Sink, Stream), Error> {
505        // Create a new TCP stream
506        let stream = TcpStream::connect(socket)
507            .await
508            .map_err(|_| Error::ConnectionFailed)?;
509        self.executor.metrics.outbound_connections.inc();
510
511        // Set TCP_NODELAY if configured
512        if let Some(tcp_nodelay) = self.executor.cfg.tcp_nodelay {
513            if let Err(err) = stream.set_nodelay(tcp_nodelay) {
514                warn!(?err, "failed to set TCP_NODELAY");
515            }
516        }
517
518        // Return the sink and stream
519        let context = self.clone();
520        let (stream, sink) = stream.into_split();
521        Ok((
522            Sink {
523                context: context.clone(),
524                sink,
525            },
526            Stream { context, stream },
527        ))
528    }
529}
530
531/// Implementation of [`crate::Listener`] for the `tokio` runtime.
532pub struct Listener {
533    context: Context,
534    listener: TcpListener,
535}
536
537impl crate::Listener<Sink, Stream> for Listener {
538    async fn accept(&mut self) -> Result<(SocketAddr, Sink, Stream), Error> {
539        // Accept a new TCP stream
540        let (stream, addr) = self.listener.accept().await.map_err(|_| Error::Closed)?;
541        self.context.executor.metrics.inbound_connections.inc();
542
543        // Set TCP_NODELAY if configured
544        if let Some(tcp_nodelay) = self.context.executor.cfg.tcp_nodelay {
545            if let Err(err) = stream.set_nodelay(tcp_nodelay) {
546                warn!(?err, "failed to set TCP_NODELAY");
547            }
548        }
549
550        // Return the sink and stream
551        let context = self.context.clone();
552        let (stream, sink) = stream.into_split();
553        Ok((
554            addr,
555            Sink {
556                context: context.clone(),
557                sink,
558            },
559            Stream { context, stream },
560        ))
561    }
562}
563
564impl axum::serve::Listener for Listener {
565    type Io = TcpStream;
566    type Addr = SocketAddr;
567
568    async fn accept(&mut self) -> (Self::Io, Self::Addr) {
569        let (stream, addr) = self.listener.accept().await.unwrap();
570        (stream, addr)
571    }
572
573    fn local_addr(&self) -> io::Result<Self::Addr> {
574        self.listener.local_addr()
575    }
576}
577
578/// Implementation of [`crate::Sink`] for the `tokio` runtime.
579pub struct Sink {
580    context: Context,
581    sink: OwnedWriteHalf,
582}
583
584impl crate::Sink for Sink {
585    async fn send(&mut self, msg: &[u8]) -> Result<(), Error> {
586        let len = msg.len();
587        timeout(
588            self.context.executor.cfg.write_timeout,
589            self.sink.write_all(msg),
590        )
591        .await
592        .map_err(|_| Error::Timeout)?
593        .map_err(|_| Error::SendFailed)?;
594        self.context
595            .executor
596            .metrics
597            .outbound_bandwidth
598            .inc_by(len as u64);
599        Ok(())
600    }
601}
602
603/// Implementation of [`crate::Stream`] for the `tokio` runtime.
604pub struct Stream {
605    context: Context,
606    stream: OwnedReadHalf,
607}
608
609impl crate::Stream for Stream {
610    async fn recv(&mut self, buf: &mut [u8]) -> Result<(), Error> {
611        // Wait for the stream to be readable
612        timeout(
613            self.context.executor.cfg.read_timeout,
614            self.stream.read_exact(buf),
615        )
616        .await
617        .map_err(|_| Error::Timeout)?
618        .map_err(|_| Error::RecvFailed)?;
619
620        // Record metrics
621        self.context
622            .executor
623            .metrics
624            .inbound_bandwidth
625            .inc_by(buf.len() as u64);
626
627        Ok(())
628    }
629}
630
631impl RngCore for Context {
632    fn next_u32(&mut self) -> u32 {
633        OsRng.next_u32()
634    }
635
636    fn next_u64(&mut self) -> u64 {
637        OsRng.next_u64()
638    }
639
640    fn fill_bytes(&mut self, dest: &mut [u8]) {
641        OsRng.fill_bytes(dest);
642    }
643
644    fn try_fill_bytes(&mut self, dest: &mut [u8]) -> Result<(), rand::Error> {
645        OsRng.try_fill_bytes(dest)
646    }
647}
648
649impl CryptoRng for Context {}
650
651/// Implementation of [`crate::Blob`] for the `tokio` runtime.
652pub struct Blob {
653    metrics: Arc<Metrics>,
654
655    partition: String,
656    name: Vec<u8>,
657
658    // Files must be seeked prior to any read or write operation and are thus
659    // not safe to concurrently interact with. If we switched to mapping files
660    // we could remove this lock.
661    //
662    // We also track the virtual file size because metadata isn't updated until
663    // the file is synced (not to mention it is a lot less fs calls).
664    file: Arc<AsyncMutex<(fs::File, u64)>>,
665}
666
667impl Blob {
668    fn new(
669        metrics: Arc<Metrics>,
670        partition: String,
671        name: &[u8],
672        file: fs::File,
673        len: u64,
674    ) -> Self {
675        metrics.open_blobs.inc();
676        Self {
677            metrics,
678            partition,
679            name: name.into(),
680            file: Arc::new(AsyncMutex::new((file, len))),
681        }
682    }
683}
684
685impl Clone for Blob {
686    fn clone(&self) -> Self {
687        // We implement `Clone` manually to ensure the `open_blobs` gauge is updated.
688        self.metrics.open_blobs.inc();
689        Self {
690            metrics: self.metrics.clone(),
691            partition: self.partition.clone(),
692            name: self.name.clone(),
693            file: self.file.clone(),
694        }
695    }
696}
697
698impl crate::Storage<Blob> for Context {
699    async fn open(&self, partition: &str, name: &[u8]) -> Result<Blob, Error> {
700        // Acquire the filesystem lock
701        let _guard = self.executor.fs.lock().await;
702
703        // Construct the full path
704        let path = self
705            .executor
706            .cfg
707            .storage_directory
708            .join(partition)
709            .join(hex(name));
710        let parent = match path.parent() {
711            Some(parent) => parent,
712            None => return Err(Error::PartitionCreationFailed(partition.into())),
713        };
714
715        // Create the partition directory if it does not exist
716        fs::create_dir_all(parent)
717            .await
718            .map_err(|_| Error::PartitionCreationFailed(partition.into()))?;
719
720        // Open the file in read-write mode, create if it does not exist
721        let mut file = fs::OpenOptions::new()
722            .read(true)
723            .write(true)
724            .create(true)
725            .truncate(false)
726            .open(&path)
727            .await
728            .map_err(|_| Error::BlobOpenFailed(partition.into(), hex(name)))?;
729
730        // Set the maximum buffer size
731        file.set_max_buf_size(self.executor.cfg.maximum_buffer_size);
732
733        // Get the file length
734        let len = file.metadata().await.map_err(|_| Error::ReadFailed)?.len();
735
736        // Construct the blob
737        Ok(Blob::new(
738            self.executor.metrics.clone(),
739            partition.into(),
740            name,
741            file,
742            len,
743        ))
744    }
745
746    async fn remove(&self, partition: &str, name: Option<&[u8]>) -> Result<(), Error> {
747        // Acquire the filesystem lock
748        let _guard = self.executor.fs.lock().await;
749
750        // Remove all related files
751        let path = self.executor.cfg.storage_directory.join(partition);
752        if let Some(name) = name {
753            let blob_path = path.join(hex(name));
754            fs::remove_file(blob_path)
755                .await
756                .map_err(|_| Error::BlobMissing(partition.into(), hex(name)))?;
757        } else {
758            fs::remove_dir_all(path)
759                .await
760                .map_err(|_| Error::PartitionMissing(partition.into()))?;
761        }
762        Ok(())
763    }
764
765    async fn scan(&self, partition: &str) -> Result<Vec<Vec<u8>>, Error> {
766        // Acquire the filesystem lock
767        let _guard = self.executor.fs.lock().await;
768
769        // Scan the partition directory
770        let path = self.executor.cfg.storage_directory.join(partition);
771        let mut entries = fs::read_dir(path)
772            .await
773            .map_err(|_| Error::PartitionMissing(partition.into()))?;
774        let mut blobs = Vec::new();
775        while let Some(entry) = entries.next_entry().await.map_err(|_| Error::ReadFailed)? {
776            let file_type = entry.file_type().await.map_err(|_| Error::ReadFailed)?;
777            if !file_type.is_file() {
778                return Err(Error::PartitionCorrupt(partition.into()));
779            }
780            if let Some(name) = entry.file_name().to_str() {
781                let name = from_hex(name).ok_or(Error::PartitionCorrupt(partition.into()))?;
782                blobs.push(name);
783            }
784        }
785        Ok(blobs)
786    }
787}
788
789impl crate::Blob for Blob {
790    async fn len(&self) -> Result<u64, Error> {
791        let (_, len) = *self.file.lock().await;
792        Ok(len)
793    }
794
795    async fn read_at(&self, buf: &mut [u8], offset: u64) -> Result<(), Error> {
796        // Ensure the read is within bounds
797        let mut file = self.file.lock().await;
798        if offset + buf.len() as u64 > file.1 {
799            return Err(Error::BlobInsufficientLength);
800        }
801
802        // Perform the read
803        file.0
804            .seek(SeekFrom::Start(offset))
805            .await
806            .map_err(|_| Error::ReadFailed)?;
807        file.0
808            .read_exact(buf)
809            .await
810            .map_err(|_| Error::ReadFailed)?;
811        self.metrics.storage_reads.inc();
812        self.metrics.storage_read_bytes.inc_by(buf.len() as u64);
813        Ok(())
814    }
815
816    async fn write_at(&self, buf: &[u8], offset: u64) -> Result<(), Error> {
817        // Perform the write
818        let mut file = self.file.lock().await;
819        file.0
820            .seek(SeekFrom::Start(offset))
821            .await
822            .map_err(|_| Error::WriteFailed)?;
823        file.0
824            .write_all(buf)
825            .await
826            .map_err(|_| Error::WriteFailed)?;
827
828        // Update the virtual file size
829        let max_len = offset + buf.len() as u64;
830        if max_len > file.1 {
831            file.1 = max_len;
832        }
833        self.metrics.storage_writes.inc();
834        self.metrics.storage_write_bytes.inc_by(buf.len() as u64);
835        Ok(())
836    }
837
838    async fn truncate(&self, len: u64) -> Result<(), Error> {
839        // Perform the truncate
840        let mut file = self.file.lock().await;
841        file.0
842            .set_len(len)
843            .await
844            .map_err(|_| Error::BlobTruncateFailed(self.partition.clone(), hex(&self.name)))?;
845
846        // Update the virtual file size
847        file.1 = len;
848        Ok(())
849    }
850
851    async fn sync(&self) -> Result<(), Error> {
852        let file = self.file.lock().await;
853        file.0
854            .sync_all()
855            .await
856            .map_err(|_| Error::BlobSyncFailed(self.partition.clone(), hex(&self.name)))
857    }
858
859    async fn close(self) -> Result<(), Error> {
860        let mut file = self.file.lock().await;
861        file.0
862            .sync_all()
863            .await
864            .map_err(|_| Error::BlobSyncFailed(self.partition.clone(), hex(&self.name)))?;
865        file.0
866            .shutdown()
867            .await
868            .map_err(|_| Error::BlobCloseFailed(self.partition.clone(), hex(&self.name)))
869    }
870}
871
872impl Drop for Blob {
873    fn drop(&mut self) {
874        self.metrics.open_blobs.dec();
875    }
876}