bonsaidb_files/
direct.rs

1#[cfg(feature = "async")]
2use std::collections::BTreeMap;
3use std::collections::VecDeque;
4use std::io::{ErrorKind, Read, Seek, SeekFrom, Write};
5use std::marker::PhantomData;
6#[cfg(feature = "async")]
7use std::task::Poll;
8
9use bonsaidb_core::connection::Connection;
10use bonsaidb_core::document::{CollectionDocument, CollectionHeader};
11use bonsaidb_core::key::time::TimestampAsNanoseconds;
12use bonsaidb_core::schema::SerializedCollection;
13#[cfg(feature = "async")]
14use bonsaidb_core::{circulate::flume, connection::AsyncConnection};
15use derive_where::derive_where;
16#[cfg(feature = "async")]
17use futures::{future::BoxFuture, ready, FutureExt};
18#[cfg(feature = "async")]
19use tokio::io::AsyncWriteExt;
20
21use crate::schema::block::BlockAppendInfo;
22use crate::schema::{self};
23use crate::{BonsaiFiles, Error, FileConfig, Statistics, Truncate};
24
25/// A handle to a file stored in a database.
26#[derive_where(Debug, Clone)]
27pub struct File<Database: Clone, Config: FileConfig = BonsaiFiles> {
28    doc: CollectionDocument<schema::file::File<Config>>,
29    #[derive_where(skip(Debug))]
30    database: Database,
31}
32
33impl<Database, Config> PartialEq for File<Database, Config>
34where
35    Database: Clone,
36    Config: FileConfig,
37{
38    fn eq(&self, other: &Self) -> bool {
39        self.doc.header == other.doc.header
40    }
41}
42
43/// A blocking database connection.
44#[derive(Clone)]
45pub struct Blocking<Database: Connection>(Database);
46
47/// An async database connection.
48#[cfg(feature = "async")]
49#[derive(Clone)]
50pub struct Async<Database: AsyncConnection>(Database);
51
52impl<Database, Config> File<Blocking<Database>, Config>
53where
54    Database: Connection + Clone,
55    Config: FileConfig,
56{
57    fn new_file(
58        path: Option<String>,
59        name: String,
60        contents: &[u8],
61        metadata: Config::Metadata,
62        database: Database,
63    ) -> Result<Self, Error> {
64        Ok(Self {
65            doc: schema::file::File::create_file(path, name, contents, metadata, &database)?,
66            database: Blocking(database),
67        })
68    }
69
70    pub(crate) fn get(id: u32, database: &Database) -> Result<Option<Self>, bonsaidb_core::Error> {
71        schema::file::File::<Config>::get(&id, database).map(|doc| {
72            doc.map(|doc| Self {
73                doc,
74                database: Blocking(database.clone()),
75            })
76        })
77    }
78
79    pub(crate) fn load(path: &str, database: &Database) -> Result<Option<Self>, Error> {
80        schema::file::File::<Config>::find(path, database).map(|opt| {
81            opt.map(|doc| Self {
82                doc,
83                database: Blocking(database.clone()),
84            })
85        })
86    }
87
88    pub(crate) fn list(path: &str, database: &Database) -> Result<Vec<Self>, bonsaidb_core::Error> {
89        schema::file::File::<Config>::list_path_contents(path, database).map(|vec| {
90            vec.into_iter()
91                .map(|doc| Self {
92                    doc,
93                    database: Blocking(database.clone()),
94                })
95                .collect()
96        })
97    }
98
99    pub(crate) fn list_recursive(
100        path: &str,
101        database: &Database,
102    ) -> Result<Vec<Self>, bonsaidb_core::Error> {
103        schema::file::File::<Config>::list_recursive_path_contents(path, database).map(|vec| {
104            vec.into_iter()
105                .map(|doc| Self {
106                    doc,
107                    database: Blocking(database.clone()),
108                })
109                .collect()
110        })
111    }
112
113    pub(crate) fn stats_for_path(
114        path: &str,
115        database: &Database,
116    ) -> Result<Statistics, bonsaidb_core::Error> {
117        schema::file::File::<Config>::summarize_recursive_path_contents(path, database)
118    }
119
120    /// Return all direct descendents of this file. For example, consider this
121    /// list of files:
122    ///
123    /// - /top-level
124    /// - /top-level/sub-level
125    /// - /top-level/sub-level/file.txt
126    ///
127    /// If this instance were `/top-level`, this function would return
128    /// `sub-level` but not `sub-level/file.txt`.
129    pub fn children(&self) -> Result<Vec<Self>, bonsaidb_core::Error> {
130        schema::file::File::<Config>::list_path_contents(&self.path(), &self.database.0).map(
131            |docs| {
132                docs.into_iter()
133                    .map(|doc| Self {
134                        doc,
135                        database: self.database.clone(),
136                    })
137                    .collect()
138            },
139        )
140    }
141
142    /// Moves this file to a new location. If `new_path` ends with a `/`, the
143    /// file will be moved to that path with its name preserved. Otherwise, the
144    /// file will be renamed as part of the move.
145    ///
146    /// For example, moving `/a/file.txt` to `/b/` will result in the full path
147    /// being `/b/file.txt`. Moving `/a/file.txt` to `/b/new-name.txt` will
148    /// result in the full path being `/b/new-name.txt`.
149    pub fn move_to(&mut self, new_path: &str) -> Result<(), Error> {
150        if !new_path.as_bytes().starts_with(b"/") {
151            return Err(Error::InvalidPath);
152        }
153
154        let mut doc = self.update_document_for_move(new_path);
155        doc.update(&self.database.0)?;
156        self.doc = doc;
157        Ok(())
158    }
159
160    /// Renames this file to the new name.
161    pub fn rename(&mut self, new_name: String) -> Result<(), Error> {
162        if new_name.as_bytes().contains(&b'/') {
163            return Err(Error::InvalidName);
164        }
165
166        // Prevent mutating self until after the database is updated.
167        let mut doc = self.doc.clone();
168        doc.contents.name = new_name;
169        doc.update(&self.database.0)?;
170        self.doc = doc;
171        Ok(())
172    }
173
174    /// Deletes the file.
175    pub fn delete(&self) -> Result<(), Error> {
176        schema::block::Block::<Config>::delete_for_file(self.doc.header.id, &self.database.0)?;
177        self.doc.delete(&self.database.0)?;
178        Ok(())
179    }
180
181    fn map_block_metadata<F: FnOnce(BlockAppendInfo) -> T, T>(
182        &mut self,
183        callback: F,
184    ) -> Result<T, bonsaidb_core::Error> {
185        let metadata =
186            schema::block::Block::<Config>::summary_for_file(self.doc.header.id, &self.database.0)?;
187
188        Ok(callback(metadata))
189    }
190
191    /// Returns the length of the file.
192    #[allow(clippy::missing_panics_doc)]
193    pub fn len(&mut self) -> Result<u64, bonsaidb_core::Error> {
194        self.map_block_metadata(|metadata| metadata.length)
195    }
196
197    /// Returns true if this file contains no data.
198    #[allow(clippy::missing_panics_doc)]
199    pub fn is_empty(&mut self) -> Result<bool, bonsaidb_core::Error> {
200        Ok(self.len()? == 0)
201    }
202
203    /// Returns the timestamp of the last append to the file. This function
204    /// returns 0 when the file is empty, even if the file was previously
205    /// written to.
206    #[allow(clippy::missing_panics_doc)]
207    pub fn last_appended_at(
208        &mut self,
209    ) -> Result<Option<TimestampAsNanoseconds>, bonsaidb_core::Error> {
210        self.map_block_metadata(|metadata| metadata.timestamp)
211    }
212
213    /// Returns the contents of the file, which allows random and buffered
214    /// access to the file stored in the database.
215    ///
216    /// The default buffer size is ten times
217    /// [`Config::BLOCK_SIZE`](FileConfig::BLOCK_SIZE).
218    pub fn contents(&self) -> Result<Contents<Blocking<Database>, Config>, bonsaidb_core::Error> {
219        let blocks = schema::block::Block::<Config>::for_file(self.id(), &self.database.0)?;
220        Ok(Contents {
221            database: self.database.clone(),
222            blocks,
223            loaded: VecDeque::default(),
224            current_block: 0,
225            offset: 0,
226            buffer_size: Config::BLOCK_SIZE * 10,
227            #[cfg(feature = "async")]
228            async_blocks: None,
229            _config: PhantomData,
230        })
231    }
232
233    /// Truncates the file, removing data from either the start or end of the
234    /// file until the file is within
235    /// [`Config::BLOCK_SIZE`](FileConfig::BLOCK_SIZE) of `new_length`.
236    /// Truncating currently will not split a block, causing the resulting
237    /// length to not always match the length requested.
238    ///
239    /// If `new_length` is 0 and this call succeeds, the file's length is
240    /// guaranteed to be 0.
241    pub fn truncate(&self, new_length: u64, from: Truncate) -> Result<(), bonsaidb_core::Error> {
242        schema::file::File::<Config>::truncate(&self.doc, new_length, from, &self.database.0)
243    }
244
245    /// Appends `data` to the end of the file. The data will be split into
246    /// chunks no larger than [`Config::BLOCK_SIZE`](FileConfig::BLOCK_SIZE)
247    /// when stored in the database.
248    pub fn append(&self, data: &[u8]) -> Result<(), bonsaidb_core::Error> {
249        schema::block::Block::<Config>::append(data, self.doc.header.id, &self.database.0)
250    }
251
252    /// Returns a writer that will buffer writes to the end of the file.
253    pub fn append_buffered(&mut self) -> BufferedAppend<'_, Config, Database> {
254        BufferedAppend {
255            file: self,
256            buffer: Vec::new(),
257            _config: PhantomData,
258        }
259    }
260
261    /// Stores changes to the metadata of this document.
262    pub fn update_metadata(&mut self) -> Result<(), bonsaidb_core::Error> {
263        self.doc.update(&self.database.0)
264    }
265}
266
267#[cfg(feature = "async")]
268impl<Database, Config> File<Async<Database>, Config>
269where
270    Database: AsyncConnection + Clone,
271    Config: FileConfig,
272{
273    async fn new_file_async(
274        path: Option<String>,
275        name: String,
276        contents: &[u8],
277        metadata: Config::Metadata,
278        database: Database,
279    ) -> Result<Self, Error> {
280        Ok(Self {
281            doc: schema::file::File::create_file_async(path, name, contents, metadata, &database)
282                .await?,
283            database: Async(database),
284        })
285    }
286
287    pub(crate) async fn get_async(
288        id: u32,
289        database: &Database,
290    ) -> Result<Option<Self>, bonsaidb_core::Error> {
291        schema::file::File::<Config>::get_async(&id, database)
292            .await
293            .map(|doc| {
294                doc.map(|doc| Self {
295                    doc,
296                    database: Async(database.clone()),
297                })
298            })
299    }
300
301    pub(crate) async fn load_async(path: &str, database: &Database) -> Result<Option<Self>, Error> {
302        schema::file::File::<Config>::find_async(path, database)
303            .await
304            .map(|opt| {
305                opt.map(|doc| Self {
306                    doc,
307                    database: Async(database.clone()),
308                })
309            })
310    }
311
312    pub(crate) async fn list_async(
313        path: &str,
314        database: &Database,
315    ) -> Result<Vec<Self>, bonsaidb_core::Error> {
316        schema::file::File::<Config>::list_path_contents_async(path, database)
317            .await
318            .map(|vec| {
319                vec.into_iter()
320                    .map(|doc| Self {
321                        doc,
322                        database: Async(database.clone()),
323                    })
324                    .collect()
325            })
326    }
327
328    pub(crate) async fn list_recursive_async(
329        path: &str,
330        database: &Database,
331    ) -> Result<Vec<Self>, bonsaidb_core::Error> {
332        schema::file::File::<Config>::list_recursive_path_contents_async(path, database)
333            .await
334            .map(|vec| {
335                vec.into_iter()
336                    .map(|doc| Self {
337                        doc,
338                        database: Async(database.clone()),
339                    })
340                    .collect()
341            })
342    }
343
344    pub(crate) async fn stats_for_path_async(
345        path: &str,
346        database: &Database,
347    ) -> Result<Statistics, bonsaidb_core::Error> {
348        schema::file::File::<Config>::summarize_recursive_path_contents_async(path, database).await
349    }
350
351    /// Return all direct descendents of this file. For example, consider this
352    /// list of files:
353    ///
354    /// - /top-level
355    /// - /top-level/sub-level
356    /// - /top-level/sub-level/file.txt
357    ///
358    /// If this instance were `/top-level`, this function would return
359    /// `sub-level` but not `sub-level/file.txt`.
360    pub async fn children(&self) -> Result<Vec<Self>, bonsaidb_core::Error> {
361        schema::file::File::<Config>::list_path_contents_async(&self.path(), &self.database.0)
362            .await
363            .map(|docs| {
364                docs.into_iter()
365                    .map(|doc| Self {
366                        doc,
367                        database: self.database.clone(),
368                    })
369                    .collect()
370            })
371    }
372
373    /// Moves this file to a new location. If `new_path` ends with a `/`, the
374    /// file will be moved to that path with its name preserved. Otherwise, the
375    /// file will be renamed as part of the move.
376    ///
377    /// For example, moving `/a/file.txt` to `/b/` will result in the full path
378    /// being `/b/file.txt`. Moving `/a/file.txt` to `/b/new-name.txt` will
379    /// result in the full path being `/b/new-name.txt`.
380    pub async fn move_to(&mut self, new_path: &str) -> Result<(), Error> {
381        if !new_path.as_bytes().starts_with(b"/") {
382            return Err(Error::InvalidPath);
383        }
384
385        let mut doc = self.update_document_for_move(new_path);
386        doc.update_async(&self.database.0).await?;
387        self.doc = doc;
388        Ok(())
389    }
390
391    /// Renames this file to the new name.
392    pub async fn rename(&mut self, new_name: String) -> Result<(), Error> {
393        if new_name.as_bytes().contains(&b'/') {
394            return Err(Error::InvalidName);
395        }
396
397        // Prevent mutating self until after the database is updated.
398        let mut doc = self.doc.clone();
399        doc.contents.name = new_name;
400        doc.update_async(&self.database.0).await?;
401        self.doc = doc;
402        Ok(())
403    }
404
405    /// Deletes the file.
406    pub async fn delete(&self) -> Result<(), Error> {
407        schema::block::Block::<Config>::delete_for_file_async(self.doc.header.id, &self.database.0)
408            .await?;
409        self.doc.delete_async(&self.database.0).await?;
410        Ok(())
411    }
412
413    async fn map_block_metadata<F: FnOnce(BlockAppendInfo) -> T, T>(
414        &mut self,
415        callback: F,
416    ) -> Result<T, bonsaidb_core::Error> {
417        let metadata = schema::block::Block::<Config>::summary_for_file_async(
418            self.doc.header.id,
419            &self.database.0,
420        )
421        .await?;
422
423        Ok(callback(metadata))
424    }
425
426    /// Returns the length of the file.
427    #[allow(clippy::missing_panics_doc)]
428    pub async fn len(&mut self) -> Result<u64, bonsaidb_core::Error> {
429        self.map_block_metadata(|metadata| metadata.length).await
430    }
431
432    /// Returns true if this file contains no data.
433    #[allow(clippy::missing_panics_doc)]
434    pub async fn is_empty(&mut self) -> Result<bool, bonsaidb_core::Error> {
435        Ok(self.len().await? == 0)
436    }
437
438    /// Returns the timestamp of the last append to the file. This function
439    /// returns 0 when the file is empty, even if the file was previously
440    /// written to.
441    #[allow(clippy::missing_panics_doc)]
442    pub async fn last_appended_at(
443        &mut self,
444    ) -> Result<Option<TimestampAsNanoseconds>, bonsaidb_core::Error> {
445        self.map_block_metadata(|metadata| metadata.timestamp).await
446    }
447
448    /// Returns the contents of the file, which allows random and buffered
449    /// access to the file stored in the database.
450    ///
451    /// The default buffer size is ten times
452    /// [`Config::BLOCK_SIZE`](FileConfig::BLOCK_SIZE).
453    pub async fn contents(
454        &self,
455    ) -> Result<Contents<Async<Database>, Config>, bonsaidb_core::Error> {
456        let blocks =
457            schema::block::Block::<Config>::for_file_async(self.id(), &self.database.0).await?;
458        Ok(Contents {
459            database: self.database.clone(),
460            blocks,
461            loaded: VecDeque::default(),
462            current_block: 0,
463            offset: 0,
464            buffer_size: Config::BLOCK_SIZE * 10,
465            #[cfg(feature = "async")]
466            async_blocks: None,
467            _config: PhantomData,
468        })
469    }
470
471    /// Truncates the file, removing data from either the start or end of the
472    /// file until the file is within
473    /// [`Config::BLOCK_SIZE`](FileConfig::BLOCK_SIZE) of `new_length`.
474    /// Truncating currently will not split a block, causing the resulting
475    /// length to not always match the length requested.
476    ///
477    /// If `new_length` is 0 and this call succeeds, the file's length is
478    /// guaranteed to be 0.
479    pub async fn truncate(
480        &self,
481        new_length: u64,
482        from: Truncate,
483    ) -> Result<(), bonsaidb_core::Error> {
484        schema::file::File::<Config>::truncate_async(&self.doc, new_length, from, &self.database.0)
485            .await
486    }
487
488    /// Appends `data` to the end of the file. The data will be split into
489    /// chunks no larger than [`Config::BLOCK_SIZE`](FileConfig::BLOCK_SIZE)
490    /// when stored in the database.
491    pub async fn append(&self, data: &[u8]) -> Result<(), bonsaidb_core::Error> {
492        schema::block::Block::<Config>::append_async(data, self.doc.header.id, &self.database.0)
493            .await
494    }
495
496    /// Returns a writer that will buffer writes to the end of the file.
497    pub fn append_buffered(&mut self) -> AsyncBufferedAppend<'_, Config, Database> {
498        AsyncBufferedAppend {
499            file: self,
500            buffer: Vec::new(),
501            flush_future: None,
502            _config: PhantomData,
503        }
504    }
505
506    /// Stores changes to the metadata of this document.
507    pub async fn update_metadata(&mut self) -> Result<(), bonsaidb_core::Error> {
508        self.doc.update_async(&self.database.0).await
509    }
510}
511
512impl<Database, Config> File<Database, Config>
513where
514    Database: Clone,
515    Config: FileConfig,
516{
517    /// Returns the unique id of this file. The file id is only unique within a
518    /// single database and [`FileConfig`].
519    pub fn id(&self) -> u32 {
520        self.doc.header.id
521    }
522
523    /// Returns the path containing this file. For example, if the full path to
524    /// the file is `/some-path/file.txt`, this function will return
525    /// `/some-path/`.
526    pub fn containing_path(&self) -> &str {
527        self.doc.contents.path.as_deref().unwrap_or("/")
528    }
529
530    /// Returns the name of this file.
531    pub fn name(&self) -> &str {
532        &self.doc.contents.name
533    }
534
535    /// Returns the absolute path of this file.
536    pub fn path(&self) -> String {
537        let containing_path = self.containing_path();
538        let ends_in_slash = self.containing_path().ends_with('/');
539        let mut full_path = String::with_capacity(
540            containing_path.len() + usize::from(!ends_in_slash) + self.name().len(),
541        );
542        full_path.push_str(containing_path);
543        if !ends_in_slash {
544            full_path.push('/');
545        }
546        full_path.push_str(self.name());
547
548        full_path
549    }
550
551    /// Returns the timestamp the file was created at.
552    pub fn created_at(&self) -> TimestampAsNanoseconds {
553        self.doc.contents.created_at
554    }
555
556    /// Returns the metadata for this file.
557    pub fn metadata(&self) -> &Config::Metadata {
558        &self.doc.contents.metadata
559    }
560
561    /// Returns mutable access metadata for this file. Modifying the metadata
562    /// will not update it in the database. Be sure to call `update_metadata()`
563    /// or another operation that persists the file.
564    pub fn metadata_mut(&mut self) -> &mut Config::Metadata {
565        &mut self.doc.contents.metadata
566    }
567
568    fn update_document_for_move(
569        &self,
570        new_path: &str,
571    ) -> CollectionDocument<schema::file::File<Config>> {
572        let mut doc = self.doc.clone();
573        if new_path.as_bytes().ends_with(b"/") {
574            if new_path.len() > 1 {
575                doc.contents.path = Some(new_path.to_string());
576            } else {
577                doc.contents.path = None;
578            }
579        } else {
580            let (path, name) = new_path.rsplit_once('/').unwrap();
581            doc.contents.path = (!path.is_empty()).then(|| path.to_string());
582            doc.contents.name = name.to_string();
583        }
584
585        // Force path to end in a slash
586        if let Some(path) = doc.contents.path.as_mut() {
587            if path.bytes().last() != Some(b'/') {
588                path.push('/');
589            }
590        }
591
592        doc
593    }
594}
595
596/// A builder to create a [`File`].
597#[derive(Debug, Clone)]
598#[must_use]
599pub struct FileBuilder<'a, Config>
600where
601    Config: FileConfig,
602{
603    path: Option<String>,
604    name: String,
605    contents: &'a [u8],
606    metadata: Config::Metadata,
607    _config: PhantomData<Config>,
608}
609
610impl<'a, Config: FileConfig> FileBuilder<'a, Config> {
611    pub(crate) fn new<NameOrPath: AsRef<str>>(
612        name_or_path: NameOrPath,
613        metadata: Config::Metadata,
614    ) -> Self {
615        let mut name_or_path = name_or_path.as_ref();
616        let (path, name) = if name_or_path.starts_with('/') {
617            // Trim the trailing / if there is one.
618            if name_or_path.ends_with('/') && name_or_path.len() > 1 {
619                name_or_path = &name_or_path[..name_or_path.len() - 1];
620            }
621            let (path, name) = name_or_path.rsplit_once('/').unwrap();
622            let path = match path {
623                "" | "/" => None,
624                other => Some(other.to_string()),
625            };
626            (path, name.to_string())
627        } else {
628            (None, name_or_path.to_string())
629        };
630        Self {
631            path,
632            name,
633            contents: b"",
634            metadata,
635            _config: PhantomData,
636        }
637    }
638
639    /// Creates this file at `path`. This does not change the file's name
640    /// specified when creating the builder.
641    pub fn at_path<Path: Into<String>>(mut self, path: Path) -> Self {
642        self.path = Some(path.into());
643        self
644    }
645
646    /// Sets the file's initial contents.
647    pub fn contents(mut self, contents: &'a [u8]) -> Self {
648        self.contents = contents;
649        self
650    }
651
652    /// Sets the file's initial metadata.
653    pub fn metadata(mut self, metadata: Config::Metadata) -> Self {
654        self.metadata = metadata;
655        self
656    }
657
658    /// Creates the file and returns a handle to the created file.
659    pub fn create<Database: Connection + Clone>(
660        self,
661        database: &Database,
662    ) -> Result<File<Blocking<Database>, Config>, Error> {
663        File::new_file(
664            self.path,
665            self.name,
666            self.contents,
667            self.metadata,
668            database.clone(),
669        )
670    }
671
672    /// Creates the file and returns a handle to the created file.
673    #[cfg(feature = "async")]
674    pub async fn create_async<Database: bonsaidb_core::connection::AsyncConnection + Clone>(
675        self,
676        database: &Database,
677    ) -> Result<File<Async<Database>, Config>, Error> {
678        File::new_file_async(
679            self.path,
680            self.name,
681            self.contents,
682            self.metadata,
683            database.clone(),
684        )
685        .await
686    }
687}
688
689/// Buffered access to the contents of a [`File`].
690#[must_use]
691pub struct Contents<Database: Clone, Config: FileConfig> {
692    database: Database,
693    blocks: Vec<BlockInfo>,
694    loaded: VecDeque<LoadedBlock>,
695    current_block: usize,
696    offset: usize,
697    buffer_size: usize,
698    #[cfg(feature = "async")]
699    async_blocks: Option<AsyncBlockTask>,
700    _config: PhantomData<Config>,
701}
702
703#[cfg(feature = "async")]
704struct AsyncBlockTask {
705    block_receiver:
706        flume::r#async::RecvFut<'static, Result<BTreeMap<u64, Vec<u8>>, std::io::Error>>,
707    requested: bool,
708    request_sender: flume::Sender<Vec<u64>>,
709}
710
711impl<Database: Clone, Config: FileConfig> Clone for Contents<Database, Config> {
712    fn clone(&self) -> Self {
713        Self {
714            database: self.database.clone(),
715            blocks: self.blocks.clone(),
716            loaded: VecDeque::new(),
717            current_block: self.current_block,
718            offset: self.offset,
719            buffer_size: self.buffer_size,
720            #[cfg(feature = "async")]
721            async_blocks: None,
722            _config: PhantomData,
723        }
724    }
725}
726
727#[derive(Clone)]
728struct LoadedBlock {
729    index: usize,
730    contents: Vec<u8>,
731}
732
733impl<Database: Connection + Clone, Config: FileConfig> Contents<Blocking<Database>, Config> {
734    /// Returns the remaining contents as a `Vec<u8>`. If no bytes have been
735    /// read, this returns the entire contents.
736    pub fn to_vec(&self) -> std::io::Result<Vec<u8>> {
737        self.clone().into_vec()
738    }
739
740    /// Returns the remaining contents as a string. If no bytes have been read,
741    /// this returns the entire contents.
742    pub fn to_string(&self) -> std::io::Result<String> {
743        String::from_utf8(self.to_vec()?)
744            .map_err(|err| std::io::Error::new(ErrorKind::InvalidData, err))
745    }
746
747    /// Returns the remaining contents as a `Vec<u8>`. If no bytes have been
748    /// read, this returns the entire contents.
749    #[allow(clippy::missing_panics_doc)] // Not reachable
750    pub fn into_vec(mut self) -> std::io::Result<Vec<u8>> {
751        let mut contents = Vec::with_capacity(usize::try_from(self.len()).unwrap());
752        self.read_to_end(&mut contents)?;
753        Ok(contents)
754    }
755
756    /// Returns the remaining contents as a string. If no bytes have been read,
757    /// this returns the entire contents.
758    pub fn into_string(self) -> std::io::Result<String> {
759        String::from_utf8(self.into_vec()?)
760            .map_err(|err| std::io::Error::new(ErrorKind::InvalidData, err))
761    }
762
763    fn load_blocks(&mut self) -> std::io::Result<()> {
764        self.loaded.clear();
765        for (index, (_, contents)) in
766            schema::block::Block::<Config>::load(&self.next_blocks(), &self.database.0)
767                .map_err(|err| std::io::Error::new(ErrorKind::Other, err))?
768                .into_iter()
769                .enumerate()
770        {
771            self.loaded.push_back(LoadedBlock {
772                index: self.current_block + index,
773                contents,
774            });
775        }
776
777        Ok(())
778    }
779}
780
781#[cfg(feature = "async")]
782impl<
783        Database: bonsaidb_core::connection::AsyncConnection + Clone + Unpin + 'static,
784        Config: FileConfig,
785    > Contents<Async<Database>, Config>
786{
787    /// Returns the remaining contents as a `Vec<u8>`. If no bytes have been
788    /// read, this returns the entire contents.
789    pub async fn to_vec(&self) -> std::io::Result<Vec<u8>> {
790        self.clone().into_vec().await
791    }
792
793    /// Returns the remaining contents as a `Vec<u8>`. If no bytes have been
794    /// read, this returns the entire contents.
795    #[allow(clippy::missing_panics_doc)] // Not reachable
796    pub async fn into_vec(mut self) -> std::io::Result<Vec<u8>> {
797        let mut contents = vec![0; usize::try_from(self.len()).unwrap()];
798        <Self as tokio::io::AsyncReadExt>::read_exact(&mut self, &mut contents).await?;
799        Ok(contents)
800    }
801
802    /// Returns the remaining contents as a string. If no bytes have been read,
803    /// this returns the entire contents.
804    pub async fn to_string(&self) -> std::io::Result<String> {
805        String::from_utf8(self.to_vec().await?)
806            .map_err(|err| std::io::Error::new(ErrorKind::InvalidData, err))
807    }
808
809    /// Returns the remaining contents as a string. If no bytes have been read,
810    /// this returns the entire contents.
811    pub async fn into_string(self) -> std::io::Result<String> {
812        String::from_utf8(self.into_vec().await?)
813            .map_err(|err| std::io::Error::new(ErrorKind::InvalidData, err))
814    }
815
816    fn spawn_block_fetching_task(&mut self) {
817        if self.async_blocks.is_none() {
818            // Spawn the task
819            let (block_sender, block_receiver) = flume::unbounded();
820            let (request_sender, request_receiver) = flume::unbounded();
821
822            let task_database = self.database.0.clone();
823            tokio::task::spawn(async move {
824                while let Ok(doc_ids) = request_receiver.recv_async().await {
825                    let blocks =
826                        schema::block::Block::<Config>::load_async(&doc_ids, &task_database)
827                            .await
828                            .map_err(|err| std::io::Error::new(ErrorKind::Other, err));
829                    if block_sender.send(blocks).is_err() {
830                        break;
831                    }
832                }
833            });
834
835            self.async_blocks = Some(AsyncBlockTask {
836                block_receiver: block_receiver.into_recv_async(),
837                request_sender,
838                requested: false,
839            });
840        }
841    }
842
843    fn fetch_blocks(
844        &mut self,
845        cx: &mut std::task::Context<'_>,
846    ) -> Poll<Result<bool, std::io::Error>> {
847        if self.async_blocks.as_mut().unwrap().requested {
848            match ready!(self
849                .async_blocks
850                .as_mut()
851                .unwrap()
852                .block_receiver
853                .poll_unpin(cx))
854            {
855                Ok(Ok(blocks)) => {
856                    self.async_blocks.as_mut().unwrap().requested = false;
857                    for (index, (_, contents)) in blocks.into_iter().enumerate() {
858                        let loaded_block = LoadedBlock {
859                            index: self.current_block + index,
860                            contents,
861                        };
862                        self.loaded.push_back(loaded_block);
863                    }
864                    Poll::Ready(Ok(true))
865                }
866                Ok(Err(db_err)) => Poll::Ready(Err(std::io::Error::new(ErrorKind::Other, db_err))),
867                Err(flume_error) => {
868                    Poll::Ready(Err(std::io::Error::new(ErrorKind::BrokenPipe, flume_error)))
869                }
870            }
871        } else {
872            let blocks = self.next_blocks();
873            if blocks.is_empty() {
874                return Poll::Ready(Ok(false));
875            }
876            self.loaded.clear();
877            self.async_blocks.as_mut().unwrap().requested = true;
878            if let Err(err) = self
879                .async_blocks
880                .as_mut()
881                .unwrap()
882                .request_sender
883                .send(blocks)
884            {
885                return Poll::Ready(Err(std::io::Error::new(ErrorKind::BrokenPipe, err)));
886            }
887
888            Poll::Ready(Ok(true))
889        }
890    }
891}
892
893impl<Database: Clone, Config: FileConfig> Contents<Database, Config> {
894    fn next_blocks(&self) -> Vec<u64> {
895        let mut last_block = self.current_block;
896        let mut requesting_size = 0;
897        for index in self.current_block..self.blocks.len() {
898            let size_if_requested = self.blocks[index].length.saturating_add(requesting_size);
899            if size_if_requested > self.buffer_size {
900                break;
901            }
902
903            requesting_size = size_if_requested;
904            last_block = index;
905        }
906
907        self.blocks[self.current_block..=last_block]
908            .iter()
909            .map(|info| info.header.id)
910            .collect()
911    }
912
913    /// Sets the maximum buffer size in bytes and returns `self`. When buffering
914    /// reads from the database, requests will be made to fill at-most
915    /// `size_in_bytes` of memory.
916    pub fn with_buffer_size(mut self, size_in_bytes: usize) -> Self {
917        self.buffer_size = size_in_bytes;
918        self
919    }
920
921    /// Returns the total length of the file.
922    #[allow(clippy::missing_panics_doc)] // Not reachable
923    #[must_use]
924    pub fn len(&self) -> u64 {
925        self.blocks
926            .last()
927            .map(|b| b.offset + u64::try_from(b.length).unwrap())
928            .unwrap_or_default()
929    }
930
931    /// Returns true if the file's length is 0.
932    #[must_use]
933    pub fn is_empty(&self) -> bool {
934        self.blocks.is_empty() || (self.blocks.len() == 1 && self.blocks[0].length == 0)
935    }
936
937    /// Returns the timestamp that the last data was written to the file.
938    /// Returns None if the file is empty.
939    #[must_use]
940    pub fn last_appended_at(&self) -> Option<TimestampAsNanoseconds> {
941        self.blocks.last().map(|b| b.timestamp)
942    }
943
944    fn non_blocking_read_block(&mut self) -> NonBlockingBlockReadResult {
945        let block = self.loaded.pop_front();
946
947        if let Some(mut block) = block {
948            if block.index == self.current_block {
949                self.current_block += 1;
950                if self.offset > 0 {
951                    block.contents.splice(..self.offset, []);
952                    self.offset = 0;
953                }
954                return NonBlockingBlockReadResult::ReadBlock(block.contents);
955            }
956        }
957
958        // We need to load blocks. We need to ensure we aren't in an EOF
959        // position.
960        let is_last_block = self.current_block + 1 == self.blocks.len();
961        if self.current_block < self.blocks.len()
962            || (is_last_block && self.offset < self.blocks.last().unwrap().length)
963        {
964            return NonBlockingBlockReadResult::NeedBlocks;
965        }
966
967        NonBlockingBlockReadResult::Eof
968    }
969
970    fn non_blocking_read<F: FnMut(&[u8]) -> usize>(
971        &mut self,
972        mut read_callback: F,
973    ) -> NonBlockingReadResult {
974        loop {
975            if self.loaded.is_empty() || self.loaded.front().unwrap().index != self.current_block {
976                let is_last_block = self.current_block + 1 == self.blocks.len();
977
978                if self.current_block < self.blocks.len()
979                    || (is_last_block && self.offset < self.blocks.last().unwrap().length)
980                {
981                    return NonBlockingReadResult::NeedBlocks;
982                }
983
984                return NonBlockingReadResult::Eof;
985            }
986            while let Some(block) = self.loaded.front() {
987                let read_length = read_callback(&block.contents[self.offset..]);
988                if read_length > 0 {
989                    self.offset += read_length;
990                    return NonBlockingReadResult::ReadBytes(read_length);
991                }
992
993                self.loaded.pop_front();
994                self.offset = 0;
995                self.current_block += 1;
996            }
997        }
998    }
999}
1000
1001enum NonBlockingBlockReadResult {
1002    NeedBlocks,
1003    ReadBlock(Vec<u8>),
1004    Eof,
1005}
1006
1007enum NonBlockingReadResult {
1008    NeedBlocks,
1009    ReadBytes(usize),
1010    Eof,
1011}
1012
1013impl<Database: Connection + Clone, Config: FileConfig> Read
1014    for Contents<Blocking<Database>, Config>
1015{
1016    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
1017        loop {
1018            match self.non_blocking_read(|block| {
1019                let bytes_to_read = buf.len().min(block.len());
1020                buf[..bytes_to_read].copy_from_slice(&block[..bytes_to_read]);
1021                bytes_to_read
1022            }) {
1023                NonBlockingReadResult::ReadBytes(bytes) => return Ok(bytes),
1024                NonBlockingReadResult::Eof => return Ok(0),
1025                NonBlockingReadResult::NeedBlocks => self.load_blocks()?,
1026            }
1027        }
1028    }
1029}
1030
1031impl<Database: Clone, Config: FileConfig> Seek for Contents<Database, Config> {
1032    fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
1033        let seek_to = match pos {
1034            SeekFrom::Start(offset) => offset,
1035            SeekFrom::End(from_end) => {
1036                if from_end < 0 {
1037                    self.len() - u64::try_from(from_end.saturating_abs()).unwrap()
1038                } else {
1039                    // Seek to the end
1040                    self.len()
1041                }
1042            }
1043            SeekFrom::Current(from_current) => {
1044                if self.blocks.is_empty() {
1045                    return Ok(0);
1046                }
1047
1048                u64::try_from(
1049                    i64::try_from(
1050                        self.blocks[self.current_block].offset
1051                            + u64::try_from(self.offset).unwrap(),
1052                    )
1053                    .unwrap()
1054                        + from_current,
1055                )
1056                .unwrap()
1057            }
1058        };
1059        if let Some((index, block)) = self
1060            .blocks
1061            .iter()
1062            .enumerate()
1063            .find(|b| b.1.offset + u64::try_from(b.1.length).unwrap() > seek_to)
1064        {
1065            self.current_block = index;
1066            self.offset = usize::try_from(seek_to - block.offset).unwrap();
1067            Ok(seek_to)
1068        } else if let Some(last_block) = self.blocks.last() {
1069            // Set to the end of the file
1070            self.current_block = self.blocks.len() - 1;
1071            self.offset = last_block.length;
1072            Ok(last_block.offset + u64::try_from(last_block.length).unwrap())
1073        } else {
1074            // Empty
1075            self.current_block = 0;
1076            self.offset = 0;
1077            Ok(0)
1078        }
1079    }
1080}
1081
1082#[cfg(feature = "async")]
1083impl<
1084        Database: bonsaidb_core::connection::AsyncConnection + Clone + Unpin + 'static,
1085        Config: FileConfig,
1086    > tokio::io::AsyncSeek for Contents<Async<Database>, Config>
1087{
1088    fn start_seek(mut self: std::pin::Pin<&mut Self>, position: SeekFrom) -> std::io::Result<()> {
1089        self.seek(position).map(|_| ())
1090    }
1091
1092    fn poll_complete(
1093        self: std::pin::Pin<&mut Self>,
1094        _cx: &mut std::task::Context<'_>,
1095    ) -> Poll<std::io::Result<u64>> {
1096        if self.blocks.is_empty() {
1097            Poll::Ready(Ok(0))
1098        } else if self.current_block < self.blocks.len() {
1099            Poll::Ready(Ok(
1100                self.blocks[self.current_block].offset + u64::try_from(self.offset).unwrap()
1101            ))
1102        } else {
1103            Poll::Ready(Ok(self.len()))
1104        }
1105    }
1106}
1107
1108#[cfg(feature = "async")]
1109impl<
1110        Database: bonsaidb_core::connection::AsyncConnection + Clone + Unpin + 'static,
1111        Config: FileConfig,
1112    > tokio::io::AsyncRead for Contents<Async<Database>, Config>
1113{
1114    fn poll_read(
1115        mut self: std::pin::Pin<&mut Self>,
1116        cx: &mut std::task::Context<'_>,
1117        buf: &mut tokio::io::ReadBuf<'_>,
1118    ) -> Poll<std::io::Result<()>> {
1119        self.spawn_block_fetching_task();
1120        loop {
1121            match self.non_blocking_read(|block| {
1122                let bytes_to_read = buf.remaining().min(block.len());
1123                buf.put_slice(&block[..bytes_to_read]);
1124                bytes_to_read
1125            }) {
1126                NonBlockingReadResult::NeedBlocks => match self.fetch_blocks(cx) {
1127                    Poll::Ready(Ok(true)) => continue,
1128                    Poll::Pending => return Poll::Pending,
1129                    Poll::Ready(Ok(false)) => return Poll::Ready(Ok(())),
1130                    Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
1131                },
1132                NonBlockingReadResult::ReadBytes(bytes) => {
1133                    if bytes == 0 || buf.remaining() == 0 {
1134                        return Poll::Ready(Ok(()));
1135                    }
1136                }
1137                NonBlockingReadResult::Eof => return Poll::Ready(Ok(())),
1138            }
1139        }
1140    }
1141}
1142
1143impl<Database: Connection + Clone, Config: FileConfig> Iterator
1144    for Contents<Blocking<Database>, Config>
1145{
1146    type Item = std::io::Result<Vec<u8>>;
1147
1148    fn next(&mut self) -> Option<Self::Item> {
1149        loop {
1150            match self.non_blocking_read_block() {
1151                NonBlockingBlockReadResult::ReadBlock(bytes) => return Some(Ok(bytes)),
1152                NonBlockingBlockReadResult::Eof => return None,
1153                NonBlockingBlockReadResult::NeedBlocks => match self.load_blocks() {
1154                    Ok(()) => {}
1155                    Err(err) => return Some(Err(err)),
1156                },
1157            }
1158        }
1159    }
1160}
1161#[cfg(feature = "async")]
1162impl<
1163        Database: bonsaidb_core::connection::AsyncConnection + Unpin + Clone + 'static,
1164        Config: FileConfig,
1165    > futures::Stream for Contents<Async<Database>, Config>
1166{
1167    type Item = std::io::Result<Vec<u8>>;
1168
1169    fn poll_next(
1170        mut self: std::pin::Pin<&mut Self>,
1171        cx: &mut std::task::Context<'_>,
1172    ) -> Poll<Option<Self::Item>> {
1173        self.spawn_block_fetching_task();
1174        loop {
1175            match self.non_blocking_read_block() {
1176                NonBlockingBlockReadResult::NeedBlocks => match self.fetch_blocks(cx) {
1177                    Poll::Ready(Ok(true)) => continue,
1178                    Poll::Pending => return Poll::Pending,
1179                    Poll::Ready(Ok(false)) => return Poll::Ready(None),
1180                    Poll::Ready(Err(err)) => return Poll::Ready(Some(Err(err))),
1181                },
1182                NonBlockingBlockReadResult::ReadBlock(block) => {
1183                    return Poll::Ready(Some(Ok(block)))
1184                }
1185                NonBlockingBlockReadResult::Eof => return Poll::Ready(None),
1186            }
1187        }
1188    }
1189}
1190
1191#[derive(Clone)]
1192pub(crate) struct BlockInfo {
1193    pub offset: u64,
1194    pub length: usize,
1195    pub timestamp: TimestampAsNanoseconds,
1196    pub header: CollectionHeader<u64>,
1197}
1198
1199/// A buffered [`std::io::Write`] and [`std::io::Seek`] implementor for a
1200/// [`File`].
1201pub struct BufferedAppend<'a, Config: FileConfig, Database: Connection + Clone> {
1202    file: &'a mut File<Blocking<Database>, Config>,
1203    pub(crate) buffer: Vec<u8>,
1204    _config: PhantomData<Config>,
1205}
1206
1207impl<'a, Config: FileConfig, Database: Connection + Clone> BufferedAppend<'a, Config, Database> {
1208    /// Sets the size of the buffer. For optimal use, this should be a multiple
1209    /// of [`Config::BLOCK_SIZE`](FileConfig::BLOCK_SIZE).
1210    ///
1211    /// If any data is already buffered, it will be flushed before the buffer is
1212    /// resized.
1213    pub fn set_buffer_size(&mut self, capacity: usize) -> std::io::Result<()> {
1214        if self.buffer.capacity() > 0 {
1215            self.flush()?;
1216        }
1217        self.buffer = Vec::with_capacity(capacity);
1218        Ok(())
1219    }
1220}
1221
1222impl<'a, Config: FileConfig, Database: Connection + Clone> Write
1223    for BufferedAppend<'a, Config, Database>
1224{
1225    fn write(&mut self, data: &[u8]) -> std::io::Result<usize> {
1226        if self.buffer.capacity() == 0 {
1227            const ONE_MEGABYTE: usize = 1024 * 1024;
1228            // By default, reserve the largest multiple of BLOCK_SIZE that is
1229            // less than or equal to 1 megabyte.
1230            self.buffer
1231                .reserve_exact(ONE_MEGABYTE / Config::BLOCK_SIZE * Config::BLOCK_SIZE);
1232        } else if self.buffer.capacity() == self.buffer.len() {
1233            self.flush()?;
1234        }
1235
1236        if data.is_empty() {
1237            Ok(0)
1238        } else {
1239            let bytes_to_write = data.len().min(self.buffer.capacity() - self.buffer.len());
1240            self.buffer.extend(&data[..bytes_to_write]);
1241            Ok(bytes_to_write)
1242        }
1243    }
1244
1245    fn flush(&mut self) -> std::io::Result<()> {
1246        self.file
1247            .append(&self.buffer)
1248            .map_err(|err| std::io::Error::new(ErrorKind::Other, err))?;
1249        self.buffer.clear();
1250        Ok(())
1251    }
1252}
1253
1254impl<'a, Config: FileConfig, Database: Connection + Clone> Drop
1255    for BufferedAppend<'a, Config, Database>
1256{
1257    fn drop(&mut self) {
1258        drop(self.flush());
1259    }
1260}
1261
1262/// A buffered [`tokio::io::AsyncWrite`] and [`std::io::Seek`] implementor for a
1263/// [`File`].
1264#[cfg(feature = "async")]
1265pub struct AsyncBufferedAppend<'a, Config: FileConfig, Database: AsyncConnection + Clone + 'static>
1266{
1267    file: &'a mut File<Async<Database>, Config>,
1268    pub(crate) buffer: Vec<u8>,
1269    flush_future: Option<BoxFuture<'a, Result<(), std::io::Error>>>,
1270    _config: PhantomData<Config>,
1271}
1272
1273#[cfg(feature = "async")]
1274impl<'a, Config: FileConfig, Database: AsyncConnection + Clone + 'static>
1275    AsyncBufferedAppend<'a, Config, Database>
1276{
1277    /// Sets the size of the buffer. For optimal use, this should be a multiple
1278    /// of [`Config::BLOCK_SIZE`](FileConfig::BLOCK_SIZE).
1279    ///
1280    /// If any data is already buffered, it will be flushed before the buffer is
1281    /// resized.
1282    pub async fn set_buffer_size(&mut self, capacity: usize) -> std::io::Result<()> {
1283        if self.buffer.capacity() > 0 {
1284            self.flush().await?;
1285        }
1286        self.buffer = Vec::with_capacity(capacity);
1287        Ok(())
1288    }
1289}
1290
1291#[cfg(feature = "async")]
1292impl<'a, Config: FileConfig, Database: AsyncConnection + Clone + 'static> tokio::io::AsyncWrite
1293    for AsyncBufferedAppend<'a, Config, Database>
1294{
1295    fn poll_write(
1296        mut self: std::pin::Pin<&mut Self>,
1297        cx: &mut std::task::Context<'_>,
1298        data: &[u8],
1299    ) -> Poll<Result<usize, std::io::Error>> {
1300        if self.buffer.capacity() == 0 {
1301            const ONE_MEGABYTE: usize = 1024 * 1024;
1302            // By default, reserve the largest multiple of BLOCK_SIZE that is
1303            // less than or equal to 1 megabyte.
1304            self.buffer
1305                .reserve_exact(ONE_MEGABYTE / Config::BLOCK_SIZE * Config::BLOCK_SIZE);
1306        }
1307
1308        if self.flush_future.is_some() {
1309            if let Err(err) = ready!(std::pin::Pin::new(&mut self).poll_flush(cx)) {
1310                return Poll::Ready(Err(err));
1311            }
1312        } else if self.buffer.capacity() == self.buffer.len() {
1313            match ready!(std::pin::Pin::new(&mut self).poll_flush(cx)) {
1314                Ok(_) => {}
1315                Err(err) => {
1316                    return Poll::Ready(Err(err));
1317                }
1318            }
1319        }
1320
1321        if data.is_empty() {
1322            Poll::Ready(Ok(0))
1323        } else {
1324            let bytes_to_write = data.len().min(self.buffer.capacity() - self.buffer.len());
1325            self.buffer.extend(&data[..bytes_to_write]);
1326            Poll::Ready(Ok(bytes_to_write))
1327        }
1328    }
1329
1330    fn poll_flush(
1331        mut self: std::pin::Pin<&mut Self>,
1332        cx: &mut std::task::Context<'_>,
1333    ) -> Poll<Result<(), std::io::Error>> {
1334        if let Some(flush_future) = &mut self.flush_future {
1335            let result = ready!(flush_future.poll_unpin(cx));
1336            self.flush_future = None;
1337            Poll::Ready(result)
1338        } else if self.buffer.is_empty() {
1339            Poll::Ready(Ok(()))
1340        } else {
1341            let file = self.file.clone();
1342
1343            let mut buffer = Vec::with_capacity(self.buffer.capacity());
1344            std::mem::swap(&mut buffer, &mut self.buffer);
1345
1346            let mut flush_task = async move {
1347                file.append(&buffer)
1348                    .await
1349                    .map_err(|err| std::io::Error::new(ErrorKind::Other, err))
1350            }
1351            .boxed();
1352            let poll_result = flush_task.poll_unpin(cx);
1353            self.flush_future = Some(flush_task);
1354            poll_result
1355        }
1356    }
1357
1358    fn poll_shutdown(
1359        self: std::pin::Pin<&mut Self>,
1360        cx: &mut std::task::Context<'_>,
1361    ) -> Poll<Result<(), std::io::Error>> {
1362        self.poll_flush(cx)
1363    }
1364}
1365
1366#[cfg(feature = "async")]
1367impl<'a, Config: FileConfig, Database: AsyncConnection + Clone + 'static> Drop
1368    for AsyncBufferedAppend<'a, Config, Database>
1369{
1370    fn drop(&mut self) {
1371        if !self.buffer.is_empty() {
1372            assert!(
1373                self.flush_future.is_none(),
1374                "flush() was started but not completed before dropped"
1375            );
1376            let mut buffer = Vec::new();
1377            std::mem::swap(&mut buffer, &mut self.buffer);
1378            let mut file = self.file.clone();
1379
1380            tokio::runtime::Handle::current().spawn(async move {
1381                drop(
1382                    AsyncBufferedAppend {
1383                        file: &mut file,
1384                        buffer,
1385                        flush_future: None,
1386                        _config: PhantomData,
1387                    }
1388                    .flush()
1389                    .await,
1390                );
1391            });
1392        }
1393    }
1394}