1#[cfg(feature = "async")]
2use std::collections::BTreeMap;
3use std::collections::VecDeque;
4use std::io::{ErrorKind, Read, Seek, SeekFrom, Write};
5use std::marker::PhantomData;
6#[cfg(feature = "async")]
7use std::task::Poll;
8
9use bonsaidb_core::connection::Connection;
10use bonsaidb_core::document::{CollectionDocument, CollectionHeader};
11use bonsaidb_core::key::time::TimestampAsNanoseconds;
12use bonsaidb_core::schema::SerializedCollection;
13#[cfg(feature = "async")]
14use bonsaidb_core::{circulate::flume, connection::AsyncConnection};
15use derive_where::derive_where;
16#[cfg(feature = "async")]
17use futures::{future::BoxFuture, ready, FutureExt};
18#[cfg(feature = "async")]
19use tokio::io::AsyncWriteExt;
20
21use crate::schema::block::BlockAppendInfo;
22use crate::schema::{self};
23use crate::{BonsaiFiles, Error, FileConfig, Statistics, Truncate};
24
25#[derive_where(Debug, Clone)]
27pub struct File<Database: Clone, Config: FileConfig = BonsaiFiles> {
28 doc: CollectionDocument<schema::file::File<Config>>,
29 #[derive_where(skip(Debug))]
30 database: Database,
31}
32
33impl<Database, Config> PartialEq for File<Database, Config>
34where
35 Database: Clone,
36 Config: FileConfig,
37{
38 fn eq(&self, other: &Self) -> bool {
39 self.doc.header == other.doc.header
40 }
41}
42
43#[derive(Clone)]
45pub struct Blocking<Database: Connection>(Database);
46
47#[cfg(feature = "async")]
49#[derive(Clone)]
50pub struct Async<Database: AsyncConnection>(Database);
51
52impl<Database, Config> File<Blocking<Database>, Config>
53where
54 Database: Connection + Clone,
55 Config: FileConfig,
56{
57 fn new_file(
58 path: Option<String>,
59 name: String,
60 contents: &[u8],
61 metadata: Config::Metadata,
62 database: Database,
63 ) -> Result<Self, Error> {
64 Ok(Self {
65 doc: schema::file::File::create_file(path, name, contents, metadata, &database)?,
66 database: Blocking(database),
67 })
68 }
69
70 pub(crate) fn get(id: u32, database: &Database) -> Result<Option<Self>, bonsaidb_core::Error> {
71 schema::file::File::<Config>::get(&id, database).map(|doc| {
72 doc.map(|doc| Self {
73 doc,
74 database: Blocking(database.clone()),
75 })
76 })
77 }
78
79 pub(crate) fn load(path: &str, database: &Database) -> Result<Option<Self>, Error> {
80 schema::file::File::<Config>::find(path, database).map(|opt| {
81 opt.map(|doc| Self {
82 doc,
83 database: Blocking(database.clone()),
84 })
85 })
86 }
87
88 pub(crate) fn list(path: &str, database: &Database) -> Result<Vec<Self>, bonsaidb_core::Error> {
89 schema::file::File::<Config>::list_path_contents(path, database).map(|vec| {
90 vec.into_iter()
91 .map(|doc| Self {
92 doc,
93 database: Blocking(database.clone()),
94 })
95 .collect()
96 })
97 }
98
99 pub(crate) fn list_recursive(
100 path: &str,
101 database: &Database,
102 ) -> Result<Vec<Self>, bonsaidb_core::Error> {
103 schema::file::File::<Config>::list_recursive_path_contents(path, database).map(|vec| {
104 vec.into_iter()
105 .map(|doc| Self {
106 doc,
107 database: Blocking(database.clone()),
108 })
109 .collect()
110 })
111 }
112
113 pub(crate) fn stats_for_path(
114 path: &str,
115 database: &Database,
116 ) -> Result<Statistics, bonsaidb_core::Error> {
117 schema::file::File::<Config>::summarize_recursive_path_contents(path, database)
118 }
119
120 pub fn children(&self) -> Result<Vec<Self>, bonsaidb_core::Error> {
130 schema::file::File::<Config>::list_path_contents(&self.path(), &self.database.0).map(
131 |docs| {
132 docs.into_iter()
133 .map(|doc| Self {
134 doc,
135 database: self.database.clone(),
136 })
137 .collect()
138 },
139 )
140 }
141
142 pub fn move_to(&mut self, new_path: &str) -> Result<(), Error> {
150 if !new_path.as_bytes().starts_with(b"/") {
151 return Err(Error::InvalidPath);
152 }
153
154 let mut doc = self.update_document_for_move(new_path);
155 doc.update(&self.database.0)?;
156 self.doc = doc;
157 Ok(())
158 }
159
160 pub fn rename(&mut self, new_name: String) -> Result<(), Error> {
162 if new_name.as_bytes().contains(&b'/') {
163 return Err(Error::InvalidName);
164 }
165
166 let mut doc = self.doc.clone();
168 doc.contents.name = new_name;
169 doc.update(&self.database.0)?;
170 self.doc = doc;
171 Ok(())
172 }
173
174 pub fn delete(&self) -> Result<(), Error> {
176 schema::block::Block::<Config>::delete_for_file(self.doc.header.id, &self.database.0)?;
177 self.doc.delete(&self.database.0)?;
178 Ok(())
179 }
180
181 fn map_block_metadata<F: FnOnce(BlockAppendInfo) -> T, T>(
182 &mut self,
183 callback: F,
184 ) -> Result<T, bonsaidb_core::Error> {
185 let metadata =
186 schema::block::Block::<Config>::summary_for_file(self.doc.header.id, &self.database.0)?;
187
188 Ok(callback(metadata))
189 }
190
191 #[allow(clippy::missing_panics_doc)]
193 pub fn len(&mut self) -> Result<u64, bonsaidb_core::Error> {
194 self.map_block_metadata(|metadata| metadata.length)
195 }
196
197 #[allow(clippy::missing_panics_doc)]
199 pub fn is_empty(&mut self) -> Result<bool, bonsaidb_core::Error> {
200 Ok(self.len()? == 0)
201 }
202
203 #[allow(clippy::missing_panics_doc)]
207 pub fn last_appended_at(
208 &mut self,
209 ) -> Result<Option<TimestampAsNanoseconds>, bonsaidb_core::Error> {
210 self.map_block_metadata(|metadata| metadata.timestamp)
211 }
212
213 pub fn contents(&self) -> Result<Contents<Blocking<Database>, Config>, bonsaidb_core::Error> {
219 let blocks = schema::block::Block::<Config>::for_file(self.id(), &self.database.0)?;
220 Ok(Contents {
221 database: self.database.clone(),
222 blocks,
223 loaded: VecDeque::default(),
224 current_block: 0,
225 offset: 0,
226 buffer_size: Config::BLOCK_SIZE * 10,
227 #[cfg(feature = "async")]
228 async_blocks: None,
229 _config: PhantomData,
230 })
231 }
232
233 pub fn truncate(&self, new_length: u64, from: Truncate) -> Result<(), bonsaidb_core::Error> {
242 schema::file::File::<Config>::truncate(&self.doc, new_length, from, &self.database.0)
243 }
244
245 pub fn append(&self, data: &[u8]) -> Result<(), bonsaidb_core::Error> {
249 schema::block::Block::<Config>::append(data, self.doc.header.id, &self.database.0)
250 }
251
252 pub fn append_buffered(&mut self) -> BufferedAppend<'_, Config, Database> {
254 BufferedAppend {
255 file: self,
256 buffer: Vec::new(),
257 _config: PhantomData,
258 }
259 }
260
261 pub fn update_metadata(&mut self) -> Result<(), bonsaidb_core::Error> {
263 self.doc.update(&self.database.0)
264 }
265}
266
267#[cfg(feature = "async")]
268impl<Database, Config> File<Async<Database>, Config>
269where
270 Database: AsyncConnection + Clone,
271 Config: FileConfig,
272{
273 async fn new_file_async(
274 path: Option<String>,
275 name: String,
276 contents: &[u8],
277 metadata: Config::Metadata,
278 database: Database,
279 ) -> Result<Self, Error> {
280 Ok(Self {
281 doc: schema::file::File::create_file_async(path, name, contents, metadata, &database)
282 .await?,
283 database: Async(database),
284 })
285 }
286
287 pub(crate) async fn get_async(
288 id: u32,
289 database: &Database,
290 ) -> Result<Option<Self>, bonsaidb_core::Error> {
291 schema::file::File::<Config>::get_async(&id, database)
292 .await
293 .map(|doc| {
294 doc.map(|doc| Self {
295 doc,
296 database: Async(database.clone()),
297 })
298 })
299 }
300
301 pub(crate) async fn load_async(path: &str, database: &Database) -> Result<Option<Self>, Error> {
302 schema::file::File::<Config>::find_async(path, database)
303 .await
304 .map(|opt| {
305 opt.map(|doc| Self {
306 doc,
307 database: Async(database.clone()),
308 })
309 })
310 }
311
312 pub(crate) async fn list_async(
313 path: &str,
314 database: &Database,
315 ) -> Result<Vec<Self>, bonsaidb_core::Error> {
316 schema::file::File::<Config>::list_path_contents_async(path, database)
317 .await
318 .map(|vec| {
319 vec.into_iter()
320 .map(|doc| Self {
321 doc,
322 database: Async(database.clone()),
323 })
324 .collect()
325 })
326 }
327
328 pub(crate) async fn list_recursive_async(
329 path: &str,
330 database: &Database,
331 ) -> Result<Vec<Self>, bonsaidb_core::Error> {
332 schema::file::File::<Config>::list_recursive_path_contents_async(path, database)
333 .await
334 .map(|vec| {
335 vec.into_iter()
336 .map(|doc| Self {
337 doc,
338 database: Async(database.clone()),
339 })
340 .collect()
341 })
342 }
343
344 pub(crate) async fn stats_for_path_async(
345 path: &str,
346 database: &Database,
347 ) -> Result<Statistics, bonsaidb_core::Error> {
348 schema::file::File::<Config>::summarize_recursive_path_contents_async(path, database).await
349 }
350
351 pub async fn children(&self) -> Result<Vec<Self>, bonsaidb_core::Error> {
361 schema::file::File::<Config>::list_path_contents_async(&self.path(), &self.database.0)
362 .await
363 .map(|docs| {
364 docs.into_iter()
365 .map(|doc| Self {
366 doc,
367 database: self.database.clone(),
368 })
369 .collect()
370 })
371 }
372
373 pub async fn move_to(&mut self, new_path: &str) -> Result<(), Error> {
381 if !new_path.as_bytes().starts_with(b"/") {
382 return Err(Error::InvalidPath);
383 }
384
385 let mut doc = self.update_document_for_move(new_path);
386 doc.update_async(&self.database.0).await?;
387 self.doc = doc;
388 Ok(())
389 }
390
391 pub async fn rename(&mut self, new_name: String) -> Result<(), Error> {
393 if new_name.as_bytes().contains(&b'/') {
394 return Err(Error::InvalidName);
395 }
396
397 let mut doc = self.doc.clone();
399 doc.contents.name = new_name;
400 doc.update_async(&self.database.0).await?;
401 self.doc = doc;
402 Ok(())
403 }
404
405 pub async fn delete(&self) -> Result<(), Error> {
407 schema::block::Block::<Config>::delete_for_file_async(self.doc.header.id, &self.database.0)
408 .await?;
409 self.doc.delete_async(&self.database.0).await?;
410 Ok(())
411 }
412
413 async fn map_block_metadata<F: FnOnce(BlockAppendInfo) -> T, T>(
414 &mut self,
415 callback: F,
416 ) -> Result<T, bonsaidb_core::Error> {
417 let metadata = schema::block::Block::<Config>::summary_for_file_async(
418 self.doc.header.id,
419 &self.database.0,
420 )
421 .await?;
422
423 Ok(callback(metadata))
424 }
425
426 #[allow(clippy::missing_panics_doc)]
428 pub async fn len(&mut self) -> Result<u64, bonsaidb_core::Error> {
429 self.map_block_metadata(|metadata| metadata.length).await
430 }
431
432 #[allow(clippy::missing_panics_doc)]
434 pub async fn is_empty(&mut self) -> Result<bool, bonsaidb_core::Error> {
435 Ok(self.len().await? == 0)
436 }
437
438 #[allow(clippy::missing_panics_doc)]
442 pub async fn last_appended_at(
443 &mut self,
444 ) -> Result<Option<TimestampAsNanoseconds>, bonsaidb_core::Error> {
445 self.map_block_metadata(|metadata| metadata.timestamp).await
446 }
447
448 pub async fn contents(
454 &self,
455 ) -> Result<Contents<Async<Database>, Config>, bonsaidb_core::Error> {
456 let blocks =
457 schema::block::Block::<Config>::for_file_async(self.id(), &self.database.0).await?;
458 Ok(Contents {
459 database: self.database.clone(),
460 blocks,
461 loaded: VecDeque::default(),
462 current_block: 0,
463 offset: 0,
464 buffer_size: Config::BLOCK_SIZE * 10,
465 #[cfg(feature = "async")]
466 async_blocks: None,
467 _config: PhantomData,
468 })
469 }
470
471 pub async fn truncate(
480 &self,
481 new_length: u64,
482 from: Truncate,
483 ) -> Result<(), bonsaidb_core::Error> {
484 schema::file::File::<Config>::truncate_async(&self.doc, new_length, from, &self.database.0)
485 .await
486 }
487
488 pub async fn append(&self, data: &[u8]) -> Result<(), bonsaidb_core::Error> {
492 schema::block::Block::<Config>::append_async(data, self.doc.header.id, &self.database.0)
493 .await
494 }
495
496 pub fn append_buffered(&mut self) -> AsyncBufferedAppend<'_, Config, Database> {
498 AsyncBufferedAppend {
499 file: self,
500 buffer: Vec::new(),
501 flush_future: None,
502 _config: PhantomData,
503 }
504 }
505
506 pub async fn update_metadata(&mut self) -> Result<(), bonsaidb_core::Error> {
508 self.doc.update_async(&self.database.0).await
509 }
510}
511
512impl<Database, Config> File<Database, Config>
513where
514 Database: Clone,
515 Config: FileConfig,
516{
517 pub fn id(&self) -> u32 {
520 self.doc.header.id
521 }
522
523 pub fn containing_path(&self) -> &str {
527 self.doc.contents.path.as_deref().unwrap_or("/")
528 }
529
530 pub fn name(&self) -> &str {
532 &self.doc.contents.name
533 }
534
535 pub fn path(&self) -> String {
537 let containing_path = self.containing_path();
538 let ends_in_slash = self.containing_path().ends_with('/');
539 let mut full_path = String::with_capacity(
540 containing_path.len() + usize::from(!ends_in_slash) + self.name().len(),
541 );
542 full_path.push_str(containing_path);
543 if !ends_in_slash {
544 full_path.push('/');
545 }
546 full_path.push_str(self.name());
547
548 full_path
549 }
550
551 pub fn created_at(&self) -> TimestampAsNanoseconds {
553 self.doc.contents.created_at
554 }
555
556 pub fn metadata(&self) -> &Config::Metadata {
558 &self.doc.contents.metadata
559 }
560
561 pub fn metadata_mut(&mut self) -> &mut Config::Metadata {
565 &mut self.doc.contents.metadata
566 }
567
568 fn update_document_for_move(
569 &self,
570 new_path: &str,
571 ) -> CollectionDocument<schema::file::File<Config>> {
572 let mut doc = self.doc.clone();
573 if new_path.as_bytes().ends_with(b"/") {
574 if new_path.len() > 1 {
575 doc.contents.path = Some(new_path.to_string());
576 } else {
577 doc.contents.path = None;
578 }
579 } else {
580 let (path, name) = new_path.rsplit_once('/').unwrap();
581 doc.contents.path = (!path.is_empty()).then(|| path.to_string());
582 doc.contents.name = name.to_string();
583 }
584
585 if let Some(path) = doc.contents.path.as_mut() {
587 if path.bytes().last() != Some(b'/') {
588 path.push('/');
589 }
590 }
591
592 doc
593 }
594}
595
596#[derive(Debug, Clone)]
598#[must_use]
599pub struct FileBuilder<'a, Config>
600where
601 Config: FileConfig,
602{
603 path: Option<String>,
604 name: String,
605 contents: &'a [u8],
606 metadata: Config::Metadata,
607 _config: PhantomData<Config>,
608}
609
610impl<'a, Config: FileConfig> FileBuilder<'a, Config> {
611 pub(crate) fn new<NameOrPath: AsRef<str>>(
612 name_or_path: NameOrPath,
613 metadata: Config::Metadata,
614 ) -> Self {
615 let mut name_or_path = name_or_path.as_ref();
616 let (path, name) = if name_or_path.starts_with('/') {
617 if name_or_path.ends_with('/') && name_or_path.len() > 1 {
619 name_or_path = &name_or_path[..name_or_path.len() - 1];
620 }
621 let (path, name) = name_or_path.rsplit_once('/').unwrap();
622 let path = match path {
623 "" | "/" => None,
624 other => Some(other.to_string()),
625 };
626 (path, name.to_string())
627 } else {
628 (None, name_or_path.to_string())
629 };
630 Self {
631 path,
632 name,
633 contents: b"",
634 metadata,
635 _config: PhantomData,
636 }
637 }
638
639 pub fn at_path<Path: Into<String>>(mut self, path: Path) -> Self {
642 self.path = Some(path.into());
643 self
644 }
645
646 pub fn contents(mut self, contents: &'a [u8]) -> Self {
648 self.contents = contents;
649 self
650 }
651
652 pub fn metadata(mut self, metadata: Config::Metadata) -> Self {
654 self.metadata = metadata;
655 self
656 }
657
658 pub fn create<Database: Connection + Clone>(
660 self,
661 database: &Database,
662 ) -> Result<File<Blocking<Database>, Config>, Error> {
663 File::new_file(
664 self.path,
665 self.name,
666 self.contents,
667 self.metadata,
668 database.clone(),
669 )
670 }
671
672 #[cfg(feature = "async")]
674 pub async fn create_async<Database: bonsaidb_core::connection::AsyncConnection + Clone>(
675 self,
676 database: &Database,
677 ) -> Result<File<Async<Database>, Config>, Error> {
678 File::new_file_async(
679 self.path,
680 self.name,
681 self.contents,
682 self.metadata,
683 database.clone(),
684 )
685 .await
686 }
687}
688
689#[must_use]
691pub struct Contents<Database: Clone, Config: FileConfig> {
692 database: Database,
693 blocks: Vec<BlockInfo>,
694 loaded: VecDeque<LoadedBlock>,
695 current_block: usize,
696 offset: usize,
697 buffer_size: usize,
698 #[cfg(feature = "async")]
699 async_blocks: Option<AsyncBlockTask>,
700 _config: PhantomData<Config>,
701}
702
703#[cfg(feature = "async")]
704struct AsyncBlockTask {
705 block_receiver:
706 flume::r#async::RecvFut<'static, Result<BTreeMap<u64, Vec<u8>>, std::io::Error>>,
707 requested: bool,
708 request_sender: flume::Sender<Vec<u64>>,
709}
710
711impl<Database: Clone, Config: FileConfig> Clone for Contents<Database, Config> {
712 fn clone(&self) -> Self {
713 Self {
714 database: self.database.clone(),
715 blocks: self.blocks.clone(),
716 loaded: VecDeque::new(),
717 current_block: self.current_block,
718 offset: self.offset,
719 buffer_size: self.buffer_size,
720 #[cfg(feature = "async")]
721 async_blocks: None,
722 _config: PhantomData,
723 }
724 }
725}
726
727#[derive(Clone)]
728struct LoadedBlock {
729 index: usize,
730 contents: Vec<u8>,
731}
732
733impl<Database: Connection + Clone, Config: FileConfig> Contents<Blocking<Database>, Config> {
734 pub fn to_vec(&self) -> std::io::Result<Vec<u8>> {
737 self.clone().into_vec()
738 }
739
740 pub fn to_string(&self) -> std::io::Result<String> {
743 String::from_utf8(self.to_vec()?)
744 .map_err(|err| std::io::Error::new(ErrorKind::InvalidData, err))
745 }
746
747 #[allow(clippy::missing_panics_doc)] pub fn into_vec(mut self) -> std::io::Result<Vec<u8>> {
751 let mut contents = Vec::with_capacity(usize::try_from(self.len()).unwrap());
752 self.read_to_end(&mut contents)?;
753 Ok(contents)
754 }
755
756 pub fn into_string(self) -> std::io::Result<String> {
759 String::from_utf8(self.into_vec()?)
760 .map_err(|err| std::io::Error::new(ErrorKind::InvalidData, err))
761 }
762
763 fn load_blocks(&mut self) -> std::io::Result<()> {
764 self.loaded.clear();
765 for (index, (_, contents)) in
766 schema::block::Block::<Config>::load(&self.next_blocks(), &self.database.0)
767 .map_err(|err| std::io::Error::new(ErrorKind::Other, err))?
768 .into_iter()
769 .enumerate()
770 {
771 self.loaded.push_back(LoadedBlock {
772 index: self.current_block + index,
773 contents,
774 });
775 }
776
777 Ok(())
778 }
779}
780
781#[cfg(feature = "async")]
782impl<
783 Database: bonsaidb_core::connection::AsyncConnection + Clone + Unpin + 'static,
784 Config: FileConfig,
785 > Contents<Async<Database>, Config>
786{
787 pub async fn to_vec(&self) -> std::io::Result<Vec<u8>> {
790 self.clone().into_vec().await
791 }
792
793 #[allow(clippy::missing_panics_doc)] pub async fn into_vec(mut self) -> std::io::Result<Vec<u8>> {
797 let mut contents = vec![0; usize::try_from(self.len()).unwrap()];
798 <Self as tokio::io::AsyncReadExt>::read_exact(&mut self, &mut contents).await?;
799 Ok(contents)
800 }
801
802 pub async fn to_string(&self) -> std::io::Result<String> {
805 String::from_utf8(self.to_vec().await?)
806 .map_err(|err| std::io::Error::new(ErrorKind::InvalidData, err))
807 }
808
809 pub async fn into_string(self) -> std::io::Result<String> {
812 String::from_utf8(self.into_vec().await?)
813 .map_err(|err| std::io::Error::new(ErrorKind::InvalidData, err))
814 }
815
816 fn spawn_block_fetching_task(&mut self) {
817 if self.async_blocks.is_none() {
818 let (block_sender, block_receiver) = flume::unbounded();
820 let (request_sender, request_receiver) = flume::unbounded();
821
822 let task_database = self.database.0.clone();
823 tokio::task::spawn(async move {
824 while let Ok(doc_ids) = request_receiver.recv_async().await {
825 let blocks =
826 schema::block::Block::<Config>::load_async(&doc_ids, &task_database)
827 .await
828 .map_err(|err| std::io::Error::new(ErrorKind::Other, err));
829 if block_sender.send(blocks).is_err() {
830 break;
831 }
832 }
833 });
834
835 self.async_blocks = Some(AsyncBlockTask {
836 block_receiver: block_receiver.into_recv_async(),
837 request_sender,
838 requested: false,
839 });
840 }
841 }
842
843 fn fetch_blocks(
844 &mut self,
845 cx: &mut std::task::Context<'_>,
846 ) -> Poll<Result<bool, std::io::Error>> {
847 if self.async_blocks.as_mut().unwrap().requested {
848 match ready!(self
849 .async_blocks
850 .as_mut()
851 .unwrap()
852 .block_receiver
853 .poll_unpin(cx))
854 {
855 Ok(Ok(blocks)) => {
856 self.async_blocks.as_mut().unwrap().requested = false;
857 for (index, (_, contents)) in blocks.into_iter().enumerate() {
858 let loaded_block = LoadedBlock {
859 index: self.current_block + index,
860 contents,
861 };
862 self.loaded.push_back(loaded_block);
863 }
864 Poll::Ready(Ok(true))
865 }
866 Ok(Err(db_err)) => Poll::Ready(Err(std::io::Error::new(ErrorKind::Other, db_err))),
867 Err(flume_error) => {
868 Poll::Ready(Err(std::io::Error::new(ErrorKind::BrokenPipe, flume_error)))
869 }
870 }
871 } else {
872 let blocks = self.next_blocks();
873 if blocks.is_empty() {
874 return Poll::Ready(Ok(false));
875 }
876 self.loaded.clear();
877 self.async_blocks.as_mut().unwrap().requested = true;
878 if let Err(err) = self
879 .async_blocks
880 .as_mut()
881 .unwrap()
882 .request_sender
883 .send(blocks)
884 {
885 return Poll::Ready(Err(std::io::Error::new(ErrorKind::BrokenPipe, err)));
886 }
887
888 Poll::Ready(Ok(true))
889 }
890 }
891}
892
893impl<Database: Clone, Config: FileConfig> Contents<Database, Config> {
894 fn next_blocks(&self) -> Vec<u64> {
895 let mut last_block = self.current_block;
896 let mut requesting_size = 0;
897 for index in self.current_block..self.blocks.len() {
898 let size_if_requested = self.blocks[index].length.saturating_add(requesting_size);
899 if size_if_requested > self.buffer_size {
900 break;
901 }
902
903 requesting_size = size_if_requested;
904 last_block = index;
905 }
906
907 self.blocks[self.current_block..=last_block]
908 .iter()
909 .map(|info| info.header.id)
910 .collect()
911 }
912
913 pub fn with_buffer_size(mut self, size_in_bytes: usize) -> Self {
917 self.buffer_size = size_in_bytes;
918 self
919 }
920
921 #[allow(clippy::missing_panics_doc)] #[must_use]
924 pub fn len(&self) -> u64 {
925 self.blocks
926 .last()
927 .map(|b| b.offset + u64::try_from(b.length).unwrap())
928 .unwrap_or_default()
929 }
930
931 #[must_use]
933 pub fn is_empty(&self) -> bool {
934 self.blocks.is_empty() || (self.blocks.len() == 1 && self.blocks[0].length == 0)
935 }
936
937 #[must_use]
940 pub fn last_appended_at(&self) -> Option<TimestampAsNanoseconds> {
941 self.blocks.last().map(|b| b.timestamp)
942 }
943
944 fn non_blocking_read_block(&mut self) -> NonBlockingBlockReadResult {
945 let block = self.loaded.pop_front();
946
947 if let Some(mut block) = block {
948 if block.index == self.current_block {
949 self.current_block += 1;
950 if self.offset > 0 {
951 block.contents.splice(..self.offset, []);
952 self.offset = 0;
953 }
954 return NonBlockingBlockReadResult::ReadBlock(block.contents);
955 }
956 }
957
958 let is_last_block = self.current_block + 1 == self.blocks.len();
961 if self.current_block < self.blocks.len()
962 || (is_last_block && self.offset < self.blocks.last().unwrap().length)
963 {
964 return NonBlockingBlockReadResult::NeedBlocks;
965 }
966
967 NonBlockingBlockReadResult::Eof
968 }
969
970 fn non_blocking_read<F: FnMut(&[u8]) -> usize>(
971 &mut self,
972 mut read_callback: F,
973 ) -> NonBlockingReadResult {
974 loop {
975 if self.loaded.is_empty() || self.loaded.front().unwrap().index != self.current_block {
976 let is_last_block = self.current_block + 1 == self.blocks.len();
977
978 if self.current_block < self.blocks.len()
979 || (is_last_block && self.offset < self.blocks.last().unwrap().length)
980 {
981 return NonBlockingReadResult::NeedBlocks;
982 }
983
984 return NonBlockingReadResult::Eof;
985 }
986 while let Some(block) = self.loaded.front() {
987 let read_length = read_callback(&block.contents[self.offset..]);
988 if read_length > 0 {
989 self.offset += read_length;
990 return NonBlockingReadResult::ReadBytes(read_length);
991 }
992
993 self.loaded.pop_front();
994 self.offset = 0;
995 self.current_block += 1;
996 }
997 }
998 }
999}
1000
1001enum NonBlockingBlockReadResult {
1002 NeedBlocks,
1003 ReadBlock(Vec<u8>),
1004 Eof,
1005}
1006
1007enum NonBlockingReadResult {
1008 NeedBlocks,
1009 ReadBytes(usize),
1010 Eof,
1011}
1012
1013impl<Database: Connection + Clone, Config: FileConfig> Read
1014 for Contents<Blocking<Database>, Config>
1015{
1016 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
1017 loop {
1018 match self.non_blocking_read(|block| {
1019 let bytes_to_read = buf.len().min(block.len());
1020 buf[..bytes_to_read].copy_from_slice(&block[..bytes_to_read]);
1021 bytes_to_read
1022 }) {
1023 NonBlockingReadResult::ReadBytes(bytes) => return Ok(bytes),
1024 NonBlockingReadResult::Eof => return Ok(0),
1025 NonBlockingReadResult::NeedBlocks => self.load_blocks()?,
1026 }
1027 }
1028 }
1029}
1030
1031impl<Database: Clone, Config: FileConfig> Seek for Contents<Database, Config> {
1032 fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
1033 let seek_to = match pos {
1034 SeekFrom::Start(offset) => offset,
1035 SeekFrom::End(from_end) => {
1036 if from_end < 0 {
1037 self.len() - u64::try_from(from_end.saturating_abs()).unwrap()
1038 } else {
1039 self.len()
1041 }
1042 }
1043 SeekFrom::Current(from_current) => {
1044 if self.blocks.is_empty() {
1045 return Ok(0);
1046 }
1047
1048 u64::try_from(
1049 i64::try_from(
1050 self.blocks[self.current_block].offset
1051 + u64::try_from(self.offset).unwrap(),
1052 )
1053 .unwrap()
1054 + from_current,
1055 )
1056 .unwrap()
1057 }
1058 };
1059 if let Some((index, block)) = self
1060 .blocks
1061 .iter()
1062 .enumerate()
1063 .find(|b| b.1.offset + u64::try_from(b.1.length).unwrap() > seek_to)
1064 {
1065 self.current_block = index;
1066 self.offset = usize::try_from(seek_to - block.offset).unwrap();
1067 Ok(seek_to)
1068 } else if let Some(last_block) = self.blocks.last() {
1069 self.current_block = self.blocks.len() - 1;
1071 self.offset = last_block.length;
1072 Ok(last_block.offset + u64::try_from(last_block.length).unwrap())
1073 } else {
1074 self.current_block = 0;
1076 self.offset = 0;
1077 Ok(0)
1078 }
1079 }
1080}
1081
1082#[cfg(feature = "async")]
1083impl<
1084 Database: bonsaidb_core::connection::AsyncConnection + Clone + Unpin + 'static,
1085 Config: FileConfig,
1086 > tokio::io::AsyncSeek for Contents<Async<Database>, Config>
1087{
1088 fn start_seek(mut self: std::pin::Pin<&mut Self>, position: SeekFrom) -> std::io::Result<()> {
1089 self.seek(position).map(|_| ())
1090 }
1091
1092 fn poll_complete(
1093 self: std::pin::Pin<&mut Self>,
1094 _cx: &mut std::task::Context<'_>,
1095 ) -> Poll<std::io::Result<u64>> {
1096 if self.blocks.is_empty() {
1097 Poll::Ready(Ok(0))
1098 } else if self.current_block < self.blocks.len() {
1099 Poll::Ready(Ok(
1100 self.blocks[self.current_block].offset + u64::try_from(self.offset).unwrap()
1101 ))
1102 } else {
1103 Poll::Ready(Ok(self.len()))
1104 }
1105 }
1106}
1107
1108#[cfg(feature = "async")]
1109impl<
1110 Database: bonsaidb_core::connection::AsyncConnection + Clone + Unpin + 'static,
1111 Config: FileConfig,
1112 > tokio::io::AsyncRead for Contents<Async<Database>, Config>
1113{
1114 fn poll_read(
1115 mut self: std::pin::Pin<&mut Self>,
1116 cx: &mut std::task::Context<'_>,
1117 buf: &mut tokio::io::ReadBuf<'_>,
1118 ) -> Poll<std::io::Result<()>> {
1119 self.spawn_block_fetching_task();
1120 loop {
1121 match self.non_blocking_read(|block| {
1122 let bytes_to_read = buf.remaining().min(block.len());
1123 buf.put_slice(&block[..bytes_to_read]);
1124 bytes_to_read
1125 }) {
1126 NonBlockingReadResult::NeedBlocks => match self.fetch_blocks(cx) {
1127 Poll::Ready(Ok(true)) => continue,
1128 Poll::Pending => return Poll::Pending,
1129 Poll::Ready(Ok(false)) => return Poll::Ready(Ok(())),
1130 Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
1131 },
1132 NonBlockingReadResult::ReadBytes(bytes) => {
1133 if bytes == 0 || buf.remaining() == 0 {
1134 return Poll::Ready(Ok(()));
1135 }
1136 }
1137 NonBlockingReadResult::Eof => return Poll::Ready(Ok(())),
1138 }
1139 }
1140 }
1141}
1142
1143impl<Database: Connection + Clone, Config: FileConfig> Iterator
1144 for Contents<Blocking<Database>, Config>
1145{
1146 type Item = std::io::Result<Vec<u8>>;
1147
1148 fn next(&mut self) -> Option<Self::Item> {
1149 loop {
1150 match self.non_blocking_read_block() {
1151 NonBlockingBlockReadResult::ReadBlock(bytes) => return Some(Ok(bytes)),
1152 NonBlockingBlockReadResult::Eof => return None,
1153 NonBlockingBlockReadResult::NeedBlocks => match self.load_blocks() {
1154 Ok(()) => {}
1155 Err(err) => return Some(Err(err)),
1156 },
1157 }
1158 }
1159 }
1160}
1161#[cfg(feature = "async")]
1162impl<
1163 Database: bonsaidb_core::connection::AsyncConnection + Unpin + Clone + 'static,
1164 Config: FileConfig,
1165 > futures::Stream for Contents<Async<Database>, Config>
1166{
1167 type Item = std::io::Result<Vec<u8>>;
1168
1169 fn poll_next(
1170 mut self: std::pin::Pin<&mut Self>,
1171 cx: &mut std::task::Context<'_>,
1172 ) -> Poll<Option<Self::Item>> {
1173 self.spawn_block_fetching_task();
1174 loop {
1175 match self.non_blocking_read_block() {
1176 NonBlockingBlockReadResult::NeedBlocks => match self.fetch_blocks(cx) {
1177 Poll::Ready(Ok(true)) => continue,
1178 Poll::Pending => return Poll::Pending,
1179 Poll::Ready(Ok(false)) => return Poll::Ready(None),
1180 Poll::Ready(Err(err)) => return Poll::Ready(Some(Err(err))),
1181 },
1182 NonBlockingBlockReadResult::ReadBlock(block) => {
1183 return Poll::Ready(Some(Ok(block)))
1184 }
1185 NonBlockingBlockReadResult::Eof => return Poll::Ready(None),
1186 }
1187 }
1188 }
1189}
1190
1191#[derive(Clone)]
1192pub(crate) struct BlockInfo {
1193 pub offset: u64,
1194 pub length: usize,
1195 pub timestamp: TimestampAsNanoseconds,
1196 pub header: CollectionHeader<u64>,
1197}
1198
1199pub struct BufferedAppend<'a, Config: FileConfig, Database: Connection + Clone> {
1202 file: &'a mut File<Blocking<Database>, Config>,
1203 pub(crate) buffer: Vec<u8>,
1204 _config: PhantomData<Config>,
1205}
1206
1207impl<'a, Config: FileConfig, Database: Connection + Clone> BufferedAppend<'a, Config, Database> {
1208 pub fn set_buffer_size(&mut self, capacity: usize) -> std::io::Result<()> {
1214 if self.buffer.capacity() > 0 {
1215 self.flush()?;
1216 }
1217 self.buffer = Vec::with_capacity(capacity);
1218 Ok(())
1219 }
1220}
1221
1222impl<'a, Config: FileConfig, Database: Connection + Clone> Write
1223 for BufferedAppend<'a, Config, Database>
1224{
1225 fn write(&mut self, data: &[u8]) -> std::io::Result<usize> {
1226 if self.buffer.capacity() == 0 {
1227 const ONE_MEGABYTE: usize = 1024 * 1024;
1228 self.buffer
1231 .reserve_exact(ONE_MEGABYTE / Config::BLOCK_SIZE * Config::BLOCK_SIZE);
1232 } else if self.buffer.capacity() == self.buffer.len() {
1233 self.flush()?;
1234 }
1235
1236 if data.is_empty() {
1237 Ok(0)
1238 } else {
1239 let bytes_to_write = data.len().min(self.buffer.capacity() - self.buffer.len());
1240 self.buffer.extend(&data[..bytes_to_write]);
1241 Ok(bytes_to_write)
1242 }
1243 }
1244
1245 fn flush(&mut self) -> std::io::Result<()> {
1246 self.file
1247 .append(&self.buffer)
1248 .map_err(|err| std::io::Error::new(ErrorKind::Other, err))?;
1249 self.buffer.clear();
1250 Ok(())
1251 }
1252}
1253
1254impl<'a, Config: FileConfig, Database: Connection + Clone> Drop
1255 for BufferedAppend<'a, Config, Database>
1256{
1257 fn drop(&mut self) {
1258 drop(self.flush());
1259 }
1260}
1261
1262#[cfg(feature = "async")]
1265pub struct AsyncBufferedAppend<'a, Config: FileConfig, Database: AsyncConnection + Clone + 'static>
1266{
1267 file: &'a mut File<Async<Database>, Config>,
1268 pub(crate) buffer: Vec<u8>,
1269 flush_future: Option<BoxFuture<'a, Result<(), std::io::Error>>>,
1270 _config: PhantomData<Config>,
1271}
1272
1273#[cfg(feature = "async")]
1274impl<'a, Config: FileConfig, Database: AsyncConnection + Clone + 'static>
1275 AsyncBufferedAppend<'a, Config, Database>
1276{
1277 pub async fn set_buffer_size(&mut self, capacity: usize) -> std::io::Result<()> {
1283 if self.buffer.capacity() > 0 {
1284 self.flush().await?;
1285 }
1286 self.buffer = Vec::with_capacity(capacity);
1287 Ok(())
1288 }
1289}
1290
1291#[cfg(feature = "async")]
1292impl<'a, Config: FileConfig, Database: AsyncConnection + Clone + 'static> tokio::io::AsyncWrite
1293 for AsyncBufferedAppend<'a, Config, Database>
1294{
1295 fn poll_write(
1296 mut self: std::pin::Pin<&mut Self>,
1297 cx: &mut std::task::Context<'_>,
1298 data: &[u8],
1299 ) -> Poll<Result<usize, std::io::Error>> {
1300 if self.buffer.capacity() == 0 {
1301 const ONE_MEGABYTE: usize = 1024 * 1024;
1302 self.buffer
1305 .reserve_exact(ONE_MEGABYTE / Config::BLOCK_SIZE * Config::BLOCK_SIZE);
1306 }
1307
1308 if self.flush_future.is_some() {
1309 if let Err(err) = ready!(std::pin::Pin::new(&mut self).poll_flush(cx)) {
1310 return Poll::Ready(Err(err));
1311 }
1312 } else if self.buffer.capacity() == self.buffer.len() {
1313 match ready!(std::pin::Pin::new(&mut self).poll_flush(cx)) {
1314 Ok(_) => {}
1315 Err(err) => {
1316 return Poll::Ready(Err(err));
1317 }
1318 }
1319 }
1320
1321 if data.is_empty() {
1322 Poll::Ready(Ok(0))
1323 } else {
1324 let bytes_to_write = data.len().min(self.buffer.capacity() - self.buffer.len());
1325 self.buffer.extend(&data[..bytes_to_write]);
1326 Poll::Ready(Ok(bytes_to_write))
1327 }
1328 }
1329
1330 fn poll_flush(
1331 mut self: std::pin::Pin<&mut Self>,
1332 cx: &mut std::task::Context<'_>,
1333 ) -> Poll<Result<(), std::io::Error>> {
1334 if let Some(flush_future) = &mut self.flush_future {
1335 let result = ready!(flush_future.poll_unpin(cx));
1336 self.flush_future = None;
1337 Poll::Ready(result)
1338 } else if self.buffer.is_empty() {
1339 Poll::Ready(Ok(()))
1340 } else {
1341 let file = self.file.clone();
1342
1343 let mut buffer = Vec::with_capacity(self.buffer.capacity());
1344 std::mem::swap(&mut buffer, &mut self.buffer);
1345
1346 let mut flush_task = async move {
1347 file.append(&buffer)
1348 .await
1349 .map_err(|err| std::io::Error::new(ErrorKind::Other, err))
1350 }
1351 .boxed();
1352 let poll_result = flush_task.poll_unpin(cx);
1353 self.flush_future = Some(flush_task);
1354 poll_result
1355 }
1356 }
1357
1358 fn poll_shutdown(
1359 self: std::pin::Pin<&mut Self>,
1360 cx: &mut std::task::Context<'_>,
1361 ) -> Poll<Result<(), std::io::Error>> {
1362 self.poll_flush(cx)
1363 }
1364}
1365
1366#[cfg(feature = "async")]
1367impl<'a, Config: FileConfig, Database: AsyncConnection + Clone + 'static> Drop
1368 for AsyncBufferedAppend<'a, Config, Database>
1369{
1370 fn drop(&mut self) {
1371 if !self.buffer.is_empty() {
1372 assert!(
1373 self.flush_future.is_none(),
1374 "flush() was started but not completed before dropped"
1375 );
1376 let mut buffer = Vec::new();
1377 std::mem::swap(&mut buffer, &mut self.buffer);
1378 let mut file = self.file.clone();
1379
1380 tokio::runtime::Handle::current().spawn(async move {
1381 drop(
1382 AsyncBufferedAppend {
1383 file: &mut file,
1384 buffer,
1385 flush_future: None,
1386 _config: PhantomData,
1387 }
1388 .flush()
1389 .await,
1390 );
1391 });
1392 }
1393 }
1394}