1use std::{
13 fs::{File, OpenOptions},
14 io,
15 ops::{Deref, DerefMut},
16 path::{Path, PathBuf},
17 sync::{Arc, RwLock, Weak},
18};
19
20use bao_tree::{
21 io::{
22 fsm::BaoContentItem,
23 outboard::PreOrderOutboard,
24 sync::{ReadAt, WriteAt},
25 },
26 BaoTree,
27};
28use bytes::{Bytes, BytesMut};
29use derive_more::Debug;
30use iroh_io::AsyncSliceReader;
31
32use super::mutable_mem_storage::{MutableMemStorage, SizeInfo};
33use crate::{
34 store::BaoBatchWriter,
35 util::{get_limited_slice, MemOrFile, SparseMemFile},
36 Hash, IROH_BLOCK_SIZE,
37};
38
39struct DataPaths {
49 data: PathBuf,
54 outboard: PathBuf,
65 sizes: PathBuf,
73}
74
75#[derive(Default, derive_more::Debug)]
85pub struct CompleteStorage {
86 #[debug("{:?}", data.as_ref().map_mem(|x| x.len()))]
88 pub data: MemOrFile<Bytes, (File, u64)>,
89 #[debug("{:?}", outboard.as_ref().map_mem(|x| x.len()))]
91 pub outboard: MemOrFile<Bytes, (File, u64)>,
92}
93
94impl CompleteStorage {
95 pub fn read_data_at(&self, offset: u64, len: usize) -> Bytes {
97 match &self.data {
98 MemOrFile::Mem(mem) => get_limited_slice(mem, offset, len),
99 MemOrFile::File((file, _size)) => read_to_end(file, offset, len).unwrap(),
100 }
101 }
102
103 pub fn read_outboard_at(&self, offset: u64, len: usize) -> Bytes {
105 match &self.outboard {
106 MemOrFile::Mem(mem) => get_limited_slice(mem, offset, len),
107 MemOrFile::File((file, _size)) => read_to_end(file, offset, len).unwrap(),
108 }
109 }
110
111 pub fn data_size(&self) -> u64 {
113 match &self.data {
114 MemOrFile::Mem(mem) => mem.len() as u64,
115 MemOrFile::File((_file, size)) => *size,
116 }
117 }
118
119 pub fn outboard_size(&self) -> u64 {
121 match &self.outboard {
122 MemOrFile::Mem(mem) => mem.len() as u64,
123 MemOrFile::File((_file, size)) => *size,
124 }
125 }
126}
127
128fn create_read_write(path: impl AsRef<Path>) -> io::Result<File> {
131 OpenOptions::new()
132 .read(true)
133 .write(true)
134 .create(true)
135 .truncate(false)
136 .open(path)
137}
138
139fn read_to_end(file: impl ReadAt, offset: u64, max: usize) -> io::Result<Bytes> {
141 let mut res = BytesMut::new();
142 let mut buf = [0u8; 4096];
143 let mut remaining = max;
144 let mut offset = offset;
145 while remaining > 0 {
146 let end = buf.len().min(remaining);
147 let read = file.read_at(offset, &mut buf[..end])?;
148 if read == 0 {
149 break;
151 }
152 res.extend_from_slice(&buf[..read]);
153 offset += read as u64;
154 remaining -= read;
155 }
156 Ok(res.freeze())
157}
158
159fn max_offset(batch: &[BaoContentItem]) -> u64 {
160 batch
161 .iter()
162 .filter_map(|item| match item {
163 BaoContentItem::Leaf(leaf) => {
164 let len = leaf.data.len().try_into().unwrap();
165 let end = leaf
166 .offset
167 .checked_add(len)
168 .expect("u64 overflow for leaf end");
169 Some(end)
170 }
171 _ => None,
172 })
173 .max()
174 .unwrap_or(0)
175}
176
177#[derive(Debug)]
179pub struct FileStorage {
180 data: std::fs::File,
181 outboard: std::fs::File,
182 sizes: std::fs::File,
183}
184
185impl FileStorage {
186 pub fn into_parts(self) -> (File, File, File) {
188 (self.data, self.outboard, self.sizes)
189 }
190
191 fn current_size(&self) -> io::Result<u64> {
192 let len = self.sizes.metadata()?.len();
193 if len < 8 {
194 Ok(0)
195 } else {
196 let mut buf = [0u8; 8];
199 self.sizes.read_exact_at(len - 8, &mut buf)?;
200 Ok(u64::from_le_bytes(buf))
201 }
202 }
203
204 fn write_batch(&mut self, size: u64, batch: &[BaoContentItem]) -> io::Result<()> {
205 let tree = BaoTree::new(size, IROH_BLOCK_SIZE);
206 for item in batch {
207 match item {
208 BaoContentItem::Parent(parent) => {
209 if let Some(offset) = tree.pre_order_offset(parent.node) {
210 let o0 = offset * 64;
211 self.outboard
212 .write_all_at(o0, parent.pair.0.as_bytes().as_slice())?;
213 self.outboard
214 .write_all_at(o0 + 32, parent.pair.1.as_bytes().as_slice())?;
215 }
216 }
217 BaoContentItem::Leaf(leaf) => {
218 let o0 = leaf.offset;
219 let index = (leaf.offset >> (tree.block_size().chunk_log() + 10)) << 3;
221 tracing::trace!(
222 "write_batch f={:?} o={} l={}",
223 self.data,
224 o0,
225 leaf.data.len()
226 );
227 self.data.write_all_at(o0, leaf.data.as_ref())?;
228 let size = tree.size();
229 self.sizes.write_all_at(index, &size.to_le_bytes())?;
230 }
231 }
232 }
233 Ok(())
234 }
235
236 fn read_data_at(&self, offset: u64, len: usize) -> io::Result<Bytes> {
237 read_to_end(&self.data, offset, len)
238 }
239
240 fn read_outboard_at(&self, offset: u64, len: usize) -> io::Result<Bytes> {
241 read_to_end(&self.outboard, offset, len)
242 }
243}
244
245#[derive(Debug)]
247pub(crate) enum BaoFileStorage {
248 IncompleteMem(MutableMemStorage),
258 IncompleteFile(FileStorage),
260 Complete(CompleteStorage),
265}
266
267impl Default for BaoFileStorage {
268 fn default() -> Self {
269 BaoFileStorage::Complete(Default::default())
270 }
271}
272
273impl BaoFileStorage {
274 #[cfg(feature = "fs-store")]
278 pub fn take(&mut self) -> Self {
279 std::mem::take(self)
280 }
281
282 pub fn incomplete_mem() -> Self {
284 Self::IncompleteMem(Default::default())
285 }
286
287 fn sync_all(&self) -> io::Result<()> {
289 match self {
290 Self::Complete(_) => Ok(()),
291 Self::IncompleteMem(_) => Ok(()),
292 Self::IncompleteFile(file) => {
293 file.data.sync_all()?;
294 file.outboard.sync_all()?;
295 file.sizes.sync_all()?;
296 Ok(())
297 }
298 }
299 }
300
301 pub fn is_mem(&self) -> bool {
303 match self {
304 Self::IncompleteMem(_) => true,
305 Self::IncompleteFile(_) => false,
306 Self::Complete(c) => c.data.is_mem() && c.outboard.is_mem(),
307 }
308 }
309}
310
311#[derive(Debug, Clone)]
313pub struct BaoFileHandleWeak(Weak<BaoFileHandleInner>);
314
315impl BaoFileHandleWeak {
316 pub fn upgrade(&self) -> Option<BaoFileHandle> {
318 self.0.upgrade().map(BaoFileHandle)
319 }
320
321 pub fn is_live(&self) -> bool {
323 self.0.strong_count() > 0
324 }
325}
326
327#[derive(Debug)]
329pub struct BaoFileHandleInner {
330 pub(crate) storage: RwLock<BaoFileStorage>,
331 config: Arc<BaoFileConfig>,
332 hash: Hash,
333}
334
335#[derive(Debug, Clone, derive_more::Deref)]
337pub struct BaoFileHandle(Arc<BaoFileHandleInner>);
338
339pub(crate) type CreateCb = Arc<dyn Fn(&Hash) -> io::Result<()> + Send + Sync>;
340
341#[derive(derive_more::Debug, Clone)]
344pub struct BaoFileConfig {
345 dir: Arc<PathBuf>,
347 max_mem: usize,
349 #[debug("{:?}", on_file_create.as_ref().map(|_| ()))]
353 on_file_create: Option<CreateCb>,
354}
355
356impl BaoFileConfig {
357 pub fn new(dir: Arc<PathBuf>, max_mem: usize, on_file_create: Option<CreateCb>) -> Self {
359 Self {
360 dir,
361 max_mem,
362 on_file_create,
363 }
364 }
365
366 fn paths(&self, hash: &Hash) -> DataPaths {
368 DataPaths {
369 data: self.dir.join(format!("{}.data", hash.to_hex())),
370 outboard: self.dir.join(format!("{}.obao4", hash.to_hex())),
371 sizes: self.dir.join(format!("{}.sizes4", hash.to_hex())),
372 }
373 }
374}
375
376#[derive(Debug)]
378pub struct DataReader(Option<BaoFileHandle>);
379
380async fn with_storage<T, P, F>(opt: &mut Option<BaoFileHandle>, no_io: P, f: F) -> io::Result<T>
381where
382 P: Fn(&BaoFileStorage) -> bool + Send + 'static,
383 F: FnOnce(&BaoFileStorage) -> io::Result<T> + Send + 'static,
384 T: Send + 'static,
385{
386 let handle = opt
387 .take()
388 .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "deferred batch busy"))?;
389 if let Ok(storage) = handle.storage.try_read() {
392 if no_io(&storage) {
393 let res = f(&storage);
394 *opt = Some(handle.clone());
397 return res;
398 }
399 };
400 let (handle, res) = tokio::task::spawn_blocking(move || {
402 let storage = handle.storage.read().unwrap();
403 let res = f(storage.deref());
404 drop(storage);
405 (handle, res)
406 })
407 .await
408 .expect("spawn_blocking failed");
409 *opt = Some(handle);
410 res
411}
412
413impl AsyncSliceReader for DataReader {
414 async fn read_at(&mut self, offset: u64, len: usize) -> io::Result<Bytes> {
415 with_storage(
416 &mut self.0,
417 BaoFileStorage::is_mem,
418 move |storage| match storage {
419 BaoFileStorage::Complete(mem) => Ok(mem.read_data_at(offset, len)),
420 BaoFileStorage::IncompleteMem(mem) => Ok(mem.read_data_at(offset, len)),
421 BaoFileStorage::IncompleteFile(file) => file.read_data_at(offset, len),
422 },
423 )
424 .await
425 }
426
427 async fn size(&mut self) -> io::Result<u64> {
428 with_storage(
429 &mut self.0,
430 BaoFileStorage::is_mem,
431 move |storage| match storage {
432 BaoFileStorage::Complete(mem) => Ok(mem.data_size()),
433 BaoFileStorage::IncompleteMem(mem) => Ok(mem.data.len() as u64),
434 BaoFileStorage::IncompleteFile(file) => file.data.metadata().map(|m| m.len()),
435 },
436 )
437 .await
438 }
439}
440
441#[derive(Debug)]
443pub struct OutboardReader(Option<BaoFileHandle>);
444
445impl AsyncSliceReader for OutboardReader {
446 async fn read_at(&mut self, offset: u64, len: usize) -> io::Result<Bytes> {
447 with_storage(
448 &mut self.0,
449 BaoFileStorage::is_mem,
450 move |storage| match storage {
451 BaoFileStorage::Complete(mem) => Ok(mem.read_outboard_at(offset, len)),
452 BaoFileStorage::IncompleteMem(mem) => Ok(mem.read_outboard_at(offset, len)),
453 BaoFileStorage::IncompleteFile(file) => file.read_outboard_at(offset, len),
454 },
455 )
456 .await
457 }
458
459 async fn size(&mut self) -> io::Result<u64> {
460 with_storage(
461 &mut self.0,
462 BaoFileStorage::is_mem,
463 move |storage| match storage {
464 BaoFileStorage::Complete(mem) => Ok(mem.outboard_size()),
465 BaoFileStorage::IncompleteMem(mem) => Ok(mem.outboard.len() as u64),
466 BaoFileStorage::IncompleteFile(file) => file.outboard.metadata().map(|m| m.len()),
467 },
468 )
469 .await
470 }
471}
472
473enum HandleChange {
474 None,
475 MemToFile,
476 }
478
479impl BaoFileHandle {
480 pub fn incomplete_mem(config: Arc<BaoFileConfig>, hash: Hash) -> Self {
485 let storage = BaoFileStorage::incomplete_mem();
486 Self(Arc::new(BaoFileHandleInner {
487 storage: RwLock::new(storage),
488 config,
489 hash,
490 }))
491 }
492
493 pub fn incomplete_file(config: Arc<BaoFileConfig>, hash: Hash) -> io::Result<Self> {
495 let paths = config.paths(&hash);
496 let storage = BaoFileStorage::IncompleteFile(FileStorage {
497 data: create_read_write(&paths.data)?,
498 outboard: create_read_write(&paths.outboard)?,
499 sizes: create_read_write(&paths.sizes)?,
500 });
501 Ok(Self(Arc::new(BaoFileHandleInner {
502 storage: RwLock::new(storage),
503 config,
504 hash,
505 })))
506 }
507
508 pub fn new_complete(
510 config: Arc<BaoFileConfig>,
511 hash: Hash,
512 data: MemOrFile<Bytes, (File, u64)>,
513 outboard: MemOrFile<Bytes, (File, u64)>,
514 ) -> Self {
515 let storage = BaoFileStorage::Complete(CompleteStorage { data, outboard });
516 Self(Arc::new(BaoFileHandleInner {
517 storage: RwLock::new(storage),
518 config,
519 hash,
520 }))
521 }
522
523 #[cfg(feature = "fs-store")]
526 pub(crate) fn transform(
527 &self,
528 f: impl FnOnce(BaoFileStorage) -> io::Result<BaoFileStorage>,
529 ) -> io::Result<()> {
530 let mut lock = self.storage.write().unwrap();
531 let storage = lock.take();
532 *lock = f(storage)?;
533 Ok(())
534 }
535
536 pub fn is_complete(&self) -> bool {
538 matches!(
539 self.storage.read().unwrap().deref(),
540 BaoFileStorage::Complete(_)
541 )
542 }
543
544 pub fn data_reader(&self) -> DataReader {
549 DataReader(Some(self.clone()))
550 }
551
552 pub fn outboard_reader(&self) -> OutboardReader {
557 OutboardReader(Some(self.clone()))
558 }
559
560 pub fn current_size(&self) -> io::Result<u64> {
562 match self.storage.read().unwrap().deref() {
563 BaoFileStorage::Complete(mem) => Ok(mem.data_size()),
564 BaoFileStorage::IncompleteMem(mem) => Ok(mem.current_size()),
565 BaoFileStorage::IncompleteFile(file) => file.current_size(),
566 }
567 }
568
569 pub fn outboard(&self) -> io::Result<PreOrderOutboard<OutboardReader>> {
571 let root = self.hash.into();
572 let tree = BaoTree::new(self.current_size()?, IROH_BLOCK_SIZE);
573 let outboard = self.outboard_reader();
574 Ok(PreOrderOutboard {
575 root,
576 tree,
577 data: outboard,
578 })
579 }
580
581 pub fn hash(&self) -> Hash {
583 self.hash
584 }
585
586 pub fn writer(&self) -> BaoFileWriter {
588 BaoFileWriter(Some(self.clone()))
589 }
590
591 fn write_batch(&self, size: u64, batch: &[BaoContentItem]) -> io::Result<HandleChange> {
593 let mut storage = self.storage.write().unwrap();
594 match storage.deref_mut() {
595 BaoFileStorage::IncompleteMem(mem) => {
596 if max_offset(batch) <= self.config.max_mem as u64 {
598 mem.write_batch(size, batch)?;
599 Ok(HandleChange::None)
600 } else {
601 let paths = self.config.paths(&self.hash);
604 let mut file_batch = mem.persist(paths)?;
609 file_batch.write_batch(size, batch)?;
610 *storage = BaoFileStorage::IncompleteFile(file_batch);
611 Ok(HandleChange::MemToFile)
612 }
613 }
614 BaoFileStorage::IncompleteFile(file) => {
615 file.write_batch(size, batch)?;
617 Ok(HandleChange::None)
618 }
619 BaoFileStorage::Complete(_) => {
620 Ok(HandleChange::None)
623 }
624 }
625 }
626
627 pub fn downgrade(&self) -> BaoFileHandleWeak {
629 BaoFileHandleWeak(Arc::downgrade(&self.0))
630 }
631}
632
633impl SizeInfo {
634 pub fn persist(&self, mut target: impl WriteAt) -> io::Result<()> {
636 let size_offset = (self.offset >> IROH_BLOCK_SIZE.chunk_log()) << 3;
637 target.write_all_at(size_offset, self.size.to_le_bytes().as_slice())?;
638 Ok(())
639 }
640
641 pub fn to_vec(&self) -> Vec<u8> {
643 let mut res = Vec::new();
644 self.persist(&mut res).expect("io error writing to vec");
645 res
646 }
647}
648
649impl MutableMemStorage {
650 fn persist(&self, paths: DataPaths) -> io::Result<FileStorage> {
652 let mut data = create_read_write(&paths.data)?;
653 let mut outboard = create_read_write(&paths.outboard)?;
654 let mut sizes = create_read_write(&paths.sizes)?;
655 self.data.persist(&mut data)?;
656 self.outboard.persist(&mut outboard)?;
657 self.sizes.persist(&mut sizes)?;
658 data.sync_all()?;
659 outboard.sync_all()?;
660 sizes.sync_all()?;
661 Ok(FileStorage {
662 data,
663 outboard,
664 sizes,
665 })
666 }
667
668 pub fn into_parts(self) -> (SparseMemFile, SparseMemFile, SizeInfo) {
670 (self.data, self.outboard, self.sizes)
671 }
672}
673
674#[derive(Debug)]
679pub struct BaoFileWriter(Option<BaoFileHandle>);
680
681impl BaoBatchWriter for BaoFileWriter {
682 async fn write_batch(&mut self, size: u64, batch: Vec<BaoContentItem>) -> std::io::Result<()> {
683 let Some(handle) = self.0.take() else {
684 return Err(io::Error::new(io::ErrorKind::Other, "deferred batch busy"));
685 };
686 let (handle, change) = tokio::task::spawn_blocking(move || {
687 let change = handle.write_batch(size, &batch);
688 (handle, change)
689 })
690 .await
691 .expect("spawn_blocking failed");
692 match change? {
693 HandleChange::None => {}
694 HandleChange::MemToFile => {
695 if let Some(cb) = handle.config.on_file_create.as_ref() {
696 cb(&handle.hash)?;
697 }
698 }
699 }
700 self.0 = Some(handle);
701 Ok(())
702 }
703
704 async fn sync(&mut self) -> io::Result<()> {
705 let Some(handle) = self.0.take() else {
706 return Err(io::Error::new(io::ErrorKind::Other, "deferred batch busy"));
707 };
708 let (handle, res) = tokio::task::spawn_blocking(move || {
709 let res = handle.storage.write().unwrap().sync_all();
710 (handle, res)
711 })
712 .await
713 .expect("spawn_blocking failed");
714 self.0 = Some(handle);
715 res
716 }
717}
718
719#[cfg(test)]
720pub mod test_support {
721 use std::{future::Future, io::Cursor, ops::Range};
722
723 use bao_tree::{
724 io::{
725 fsm::{ResponseDecoder, ResponseDecoderNext},
726 outboard::PostOrderMemOutboard,
727 round_up_to_chunks,
728 sync::encode_ranges_validated,
729 },
730 BlockSize, ChunkRanges,
731 };
732 use futures_lite::{Stream, StreamExt};
733 use iroh_io::AsyncStreamReader;
734 use rand::RngCore;
735 use range_collections::RangeSet2;
736
737 use super::*;
738 use crate::util::limited_range;
739
740 pub const IROH_BLOCK_SIZE: BlockSize = BlockSize::from_chunk_log(4);
741
742 pub async fn decode_response_into_batch<R, W>(
744 root: Hash,
745 block_size: BlockSize,
746 ranges: ChunkRanges,
747 mut encoded: R,
748 mut target: W,
749 ) -> io::Result<()>
750 where
751 R: AsyncStreamReader,
752 W: BaoBatchWriter,
753 {
754 let size = encoded.read::<8>().await?;
755 let size = u64::from_le_bytes(size);
756 let mut reading =
757 ResponseDecoder::new(root.into(), ranges, BaoTree::new(size, block_size), encoded);
758 let mut stack = Vec::new();
759 loop {
760 let item = match reading.next().await {
761 ResponseDecoderNext::Done(_reader) => break,
762 ResponseDecoderNext::More((next, item)) => {
763 reading = next;
764 item?
765 }
766 };
767 match item {
768 BaoContentItem::Parent(_) => {
769 stack.push(item);
770 }
771 BaoContentItem::Leaf(_) => {
772 stack.push(item);
775 target.write_batch(size, std::mem::take(&mut stack)).await?;
776 }
777 }
778 }
779 assert!(stack.is_empty(), "last item should be a leaf");
780 Ok(())
781 }
782
783 pub fn random_test_data(size: usize) -> Vec<u8> {
784 let mut rand = rand::thread_rng();
785 let mut res = vec![0u8; size];
786 rand.fill_bytes(&mut res);
787 res
788 }
789
790 pub fn simulate_remote(data: &[u8]) -> (Hash, Cursor<Bytes>) {
792 let outboard = bao_tree::io::outboard::PostOrderMemOutboard::create(data, IROH_BLOCK_SIZE);
793 let size = data.len() as u64;
794 let mut encoded = size.to_le_bytes().to_vec();
795 bao_tree::io::sync::encode_ranges_validated(
796 data,
797 &outboard,
798 &ChunkRanges::all(),
799 &mut encoded,
800 )
801 .unwrap();
802 let hash = outboard.root;
803 (hash.into(), Cursor::new(encoded.into()))
804 }
805
806 pub fn to_ranges(ranges: &[Range<u64>]) -> RangeSet2<u64> {
807 let mut range_set = RangeSet2::empty();
808 for range in ranges.as_ref().iter().cloned() {
809 range_set |= RangeSet2::from(range);
810 }
811 range_set
812 }
813
814 pub fn make_wire_data(
816 data: &[u8],
817 ranges: impl AsRef<[Range<u64>]>,
818 ) -> (Hash, ChunkRanges, Vec<u8>) {
819 let range_set = to_ranges(ranges.as_ref());
821 let chunk_ranges = round_up_to_chunks(&range_set);
823 let outboard = PostOrderMemOutboard::create(data, IROH_BLOCK_SIZE).flip();
825 let size = data.len() as u64;
826 let mut encoded = size.to_le_bytes().to_vec();
827 encode_ranges_validated(data, &outboard, &chunk_ranges, &mut encoded).unwrap();
828 (outboard.root.into(), chunk_ranges, encoded)
829 }
830
831 pub async fn validate(handle: &BaoFileHandle, original: &[u8], ranges: &[Range<u64>]) {
832 let mut r = handle.data_reader();
833 for range in ranges {
834 let start = range.start;
835 let len = (range.end - range.start).try_into().unwrap();
836 let data = &original[limited_range(start, len, original.len())];
837 let read = r.read_at(start, len).await.unwrap();
838 assert_eq!(data.len(), read.as_ref().len());
839 assert_eq!(data, read.as_ref());
840 }
841 }
842
843 pub fn trickle(
845 data: &[u8],
846 mtu: usize,
847 delay: std::time::Duration,
848 ) -> impl Stream<Item = Bytes> {
849 let parts = data
850 .chunks(mtu)
851 .map(Bytes::copy_from_slice)
852 .collect::<Vec<_>>();
853 futures_lite::stream::iter(parts).then(move |part| async move {
854 tokio::time::sleep(delay).await;
855 part
856 })
857 }
858
859 pub async fn local<F>(f: F) -> F::Output
860 where
861 F: Future,
862 {
863 tokio::task::LocalSet::new().run_until(f).await
864 }
865}
866
867#[cfg(test)]
868mod tests {
869 use std::io::Write;
870
871 use bao_tree::{blake3, ChunkNum, ChunkRanges};
872 use futures_lite::StreamExt;
873 use iroh_io::TokioStreamReader;
874 use tests::test_support::{
875 decode_response_into_batch, local, make_wire_data, random_test_data, trickle, validate,
876 };
877 use tokio::task::JoinSet;
878
879 use super::*;
880 use crate::util::local_pool::LocalPool;
881
882 #[tokio::test]
883 async fn partial_downloads() {
884 local(async move {
885 let n = 1024 * 64u64;
886 let test_data = random_test_data(n as usize);
887 let temp_dir = tempfile::tempdir().unwrap();
888 let hash = blake3::hash(&test_data);
889 let handle = BaoFileHandle::incomplete_mem(
890 Arc::new(BaoFileConfig::new(
891 Arc::new(temp_dir.as_ref().to_owned()),
892 1024 * 16,
893 None,
894 )),
895 hash.into(),
896 );
897 let mut tasks = JoinSet::new();
898 for i in 1..3 {
899 let file = handle.writer();
900 let range = (i * (n / 4))..((i + 1) * (n / 4));
901 println!("range: {:?}", range);
902 let (hash, chunk_ranges, wire_data) = make_wire_data(&test_data, &[range]);
903 let trickle = trickle(&wire_data, 1200, std::time::Duration::from_millis(10))
904 .map(io::Result::Ok)
905 .boxed();
906 let trickle = TokioStreamReader::new(tokio_util::io::StreamReader::new(trickle));
907 let _task = tasks.spawn_local(async move {
908 decode_response_into_batch(hash, IROH_BLOCK_SIZE, chunk_ranges, trickle, file)
909 .await
910 });
911 }
912 while let Some(res) = tasks.join_next().await {
913 res.unwrap().unwrap();
914 }
915 println!(
916 "len {:?} {:?}",
917 handle,
918 handle.data_reader().size().await.unwrap()
919 );
920 #[allow(clippy::single_range_in_vec_init)]
921 let ranges = [1024 * 16..1024 * 48];
922 validate(&handle, &test_data, &ranges).await;
923
924 let mut encoded = Vec::new();
927 let ob = handle.outboard().unwrap();
928 encoded
929 .write_all(ob.tree.size().to_le_bytes().as_slice())
930 .unwrap();
931 bao_tree::io::fsm::encode_ranges_validated(
932 handle.data_reader(),
933 ob,
934 &ChunkRanges::from(ChunkNum(16)..ChunkNum(48)),
935 encoded,
936 )
937 .await
938 .unwrap();
939 })
940 .await;
941 }
942
943 #[tokio::test]
944 async fn concurrent_downloads() {
945 let n = 1024 * 32u64;
946 let test_data = random_test_data(n as usize);
947 let temp_dir = tempfile::tempdir().unwrap();
948 let hash = blake3::hash(&test_data);
949 let handle = BaoFileHandle::incomplete_mem(
950 Arc::new(BaoFileConfig::new(
951 Arc::new(temp_dir.as_ref().to_owned()),
952 1024 * 16,
953 None,
954 )),
955 hash.into(),
956 );
957 let local = LocalPool::default();
958 let mut tasks = Vec::new();
959 for i in 0..4 {
960 let file = handle.writer();
961 let range = (i * (n / 4))..((i + 1) * (n / 4));
962 println!("range: {:?}", range);
963 let (hash, chunk_ranges, wire_data) = make_wire_data(&test_data, &[range]);
964 let trickle = trickle(&wire_data, 1200, std::time::Duration::from_millis(10))
965 .map(io::Result::Ok)
966 .boxed();
967 let trickle = TokioStreamReader::new(tokio_util::io::StreamReader::new(trickle));
968 let task = local.spawn(move || async move {
969 decode_response_into_batch(hash, IROH_BLOCK_SIZE, chunk_ranges, trickle, file).await
970 });
971 tasks.push(task);
972 }
973 for task in tasks {
974 task.await.unwrap().unwrap();
975 }
976 println!(
977 "len {:?} {:?}",
978 handle,
979 handle.data_reader().size().await.unwrap()
980 );
981 #[allow(clippy::single_range_in_vec_init)]
982 let ranges = [0..n];
983 validate(&handle, &test_data, &ranges).await;
984
985 let mut encoded = Vec::new();
986 let ob = handle.outboard().unwrap();
987 encoded
988 .write_all(ob.tree.size().to_le_bytes().as_slice())
989 .unwrap();
990 bao_tree::io::fsm::encode_ranges_validated(
991 handle.data_reader(),
992 ob,
993 &ChunkRanges::all(),
994 encoded,
995 )
996 .await
997 .unwrap();
998 }
999
1000 #[tokio::test]
1001 async fn stay_in_mem() {
1002 let test_data = random_test_data(1024 * 17);
1003 #[allow(clippy::single_range_in_vec_init)]
1004 let ranges = [0..test_data.len().try_into().unwrap()];
1005 let (hash, chunk_ranges, wire_data) = make_wire_data(&test_data, &ranges);
1006 println!("file len is {:?}", chunk_ranges);
1007 let temp_dir = tempfile::tempdir().unwrap();
1008 let handle = BaoFileHandle::incomplete_mem(
1009 Arc::new(BaoFileConfig::new(
1010 Arc::new(temp_dir.as_ref().to_owned()),
1011 1024 * 16,
1012 None,
1013 )),
1014 hash,
1015 );
1016 decode_response_into_batch(
1017 hash,
1018 IROH_BLOCK_SIZE,
1019 chunk_ranges,
1020 wire_data.as_slice(),
1021 handle.writer(),
1022 )
1023 .await
1024 .unwrap();
1025 validate(&handle, &test_data, &ranges).await;
1026
1027 let mut encoded = Vec::new();
1028 let ob = handle.outboard().unwrap();
1029 encoded
1030 .write_all(ob.tree.size().to_le_bytes().as_slice())
1031 .unwrap();
1032 bao_tree::io::fsm::encode_ranges_validated(
1033 handle.data_reader(),
1034 ob,
1035 &ChunkRanges::all(),
1036 encoded,
1037 )
1038 .await
1039 .unwrap();
1040 println!("{:?}", handle);
1041 }
1042}