1use std::{
26 collections::{BTreeMap, HashMap},
27 io::{self, Write},
28 path::PathBuf,
29 sync::{Arc, Mutex},
30};
31
32use bao_tree::{
33 BaoTree, ChunkRanges,
34 io::{
35 Leaf,
36 mixed::{EncodedItem, ReadBytesAt, traverse_ranges_validated},
37 outboard::PreOrderMemOutboard,
38 sync::ReadAt,
39 },
40};
41use bytes::Bytes;
42use iroh_blobs::{
43 BlobFormat, Hash, HashAndFormat,
44 api::{
45 self, Store, TempTag,
46 blobs::{Bitfield, ExportProgressItem},
47 proto::{
48 self, BlobStatus, Command, ExportBaoMsg, ExportBaoRequest, ExportPathMsg,
49 ExportPathRequest, ExportRangesItem, ExportRangesMsg, ExportRangesRequest,
50 ImportBaoMsg, ImportByteStreamMsg, ImportBytesMsg, ObserveMsg, ObserveRequest,
51 WaitIdleMsg,
52 },
53 },
54 protocol::ChunkRangesExt,
55 store::IROH_BLOCK_SIZE,
56};
57use irpc::channel::mpsc;
58use range_collections::range_set::RangeSetRange;
59use ref_cast::RefCast;
60use tokio::task::{JoinError, JoinSet};
61
62#[derive(Debug, Clone, Copy, PartialEq, Eq)]
64pub enum DataStrategy {
65 Zeros,
67 Ones,
69 PseudoRandom { seed: u64 },
71}
72
73impl Default for DataStrategy {
74 fn default() -> Self {
75 Self::Zeros
76 }
77}
78
79#[derive(Debug, Clone)]
80pub struct FakeStoreConfig {
81 pub strategy: DataStrategy,
82 pub max_blob_size: Option<u64>,
84}
85
86impl Default for FakeStoreConfig {
87 fn default() -> Self {
88 Self {
89 strategy: DataStrategy::Zeros,
90 max_blob_size: Some(10 * 1024 * 1024 * 1024),
92 }
93 }
94}
95
96#[derive(Debug, Clone, Default)]
97pub struct FakeStoreBuilder {
98 config: FakeStoreConfig,
99 sizes: Vec<u64>,
100}
101
102impl FakeStoreBuilder {
103 pub fn new() -> Self {
104 Self::default()
105 }
106
107 pub fn strategy(mut self, strategy: DataStrategy) -> Self {
108 self.config.strategy = strategy;
109 self
110 }
111
112 pub fn max_blob_size(mut self, max: Option<u64>) -> Self {
114 self.config.max_blob_size = max;
115 self
116 }
117
118 pub fn with_blob(mut self, size: u64) -> Self {
119 self.sizes.push(size);
120 self
121 }
122
123 pub fn with_blobs(mut self, sizes: impl IntoIterator<Item = u64>) -> Self {
124 self.sizes.extend(sizes);
125 self
126 }
127
128 pub fn build(self) -> FakeStore {
131 if let Some(max) = self.config.max_blob_size {
132 for &size in &self.sizes {
133 assert!(size <= max, "Blob size {} exceeds maximum {}", size, max);
134 }
135 }
136
137 FakeStore::new_with_config(self.sizes, self.config)
138 }
139}
140
141#[derive(Debug, Clone)]
162pub struct FakeStore {
163 store: Store,
164}
165
166impl std::ops::Deref for FakeStore {
167 type Target = Store;
168
169 fn deref(&self) -> &Self::Target {
170 &self.store
171 }
172}
173
174#[derive(RefCast)]
176#[repr(transparent)]
177struct BaoTreeSender(mpsc::Sender<EncodedItem>);
178
179impl bao_tree::io::mixed::Sender for BaoTreeSender {
180 type Error = irpc::channel::SendError;
181 async fn send(&mut self, item: EncodedItem) -> std::result::Result<(), Self::Error> {
182 self.0.send(item).await
183 }
184}
185
186#[derive(Debug, Clone)]
187struct BlobMetadata {
188 size: u64,
189 outboard: Bytes,
190 strategy: DataStrategy,
191}
192
193struct Actor {
194 commands: tokio::sync::mpsc::Receiver<proto::Command>,
195 tasks: JoinSet<()>,
196 idle_waiters: Vec<irpc::channel::oneshot::Sender<()>>,
197 blobs: Arc<Mutex<HashMap<Hash, BlobMetadata>>>,
198 strategy: DataStrategy,
199 tags: BTreeMap<api::Tag, HashAndFormat>,
200}
201
202impl Actor {
203 fn new(
204 commands: tokio::sync::mpsc::Receiver<proto::Command>,
205 blobs: HashMap<Hash, BlobMetadata>,
206 strategy: DataStrategy,
207 ) -> Self {
208 Self {
209 blobs: Arc::new(Mutex::new(blobs)),
210 commands,
211 tasks: JoinSet::new(),
212 idle_waiters: Vec::new(),
213 strategy,
214 tags: BTreeMap::new(),
215 }
216 }
217
218 async fn handle_command(&mut self, cmd: Command) -> Option<irpc::channel::oneshot::Sender<()>> {
219 match cmd {
220 Command::ImportBao(msg) => {
221 self.handle_import_bao(msg).await;
222 }
223 Command::WaitIdle(WaitIdleMsg { tx, .. }) => {
224 if self.tasks.is_empty() {
225 tx.send(()).await.ok();
226 } else {
227 self.idle_waiters.push(tx);
228 }
229 }
230 Command::ImportBytes(msg) => {
231 self.handle_import_bytes(msg).await;
232 }
233 Command::ImportByteStream(msg) => {
234 self.handle_import_byte_stream(msg).await;
235 }
236 Command::ImportPath(msg) => {
237 msg.tx
238 .send(io::Error::other("import path not supported").into())
239 .await
240 .ok();
241 }
242 Command::Observe(ObserveMsg {
243 inner: ObserveRequest { hash },
244 tx,
245 ..
246 }) => {
247 let size = self.blobs.lock().unwrap().get(&hash).map(|x| x.size);
248 self.tasks.spawn(async move {
249 if let Some(size) = size {
250 tx.send(Bitfield::complete(size)).await.ok();
251 } else {
252 tx.send(Bitfield::empty()).await.ok();
253 };
254 });
255 }
256 Command::ExportBao(ExportBaoMsg {
257 inner: ExportBaoRequest { hash, ranges, .. },
258 tx,
259 ..
260 }) => {
261 let metadata = self.blobs.lock().unwrap().get(&hash).cloned();
262 self.tasks.spawn(export_bao(hash, metadata, ranges, tx));
263 }
264 Command::ExportPath(ExportPathMsg {
265 inner: ExportPathRequest { hash, target, .. },
266 tx,
267 ..
268 }) => {
269 let metadata = self.blobs.lock().unwrap().get(&hash).cloned();
270 self.tasks.spawn(export_path(metadata, target, tx));
271 }
272 Command::Batch(_cmd) => {}
273 Command::ClearProtected(cmd) => {
274 cmd.tx.send(Ok(())).await.ok();
275 }
276 Command::CreateTag(cmd) => {
277 use api::proto::CreateTagRequest;
278 use std::time::SystemTime;
279
280 let CreateTagRequest { value } = cmd.inner;
281 let tag = api::Tag::auto(SystemTime::now(), |t| self.tags.contains_key(t));
282 self.tags.insert(tag.clone(), value);
283 cmd.tx.send(Ok(tag)).await.ok();
284 }
285 Command::CreateTempTag(cmd) => {
286 cmd.tx.send(TempTag::new(cmd.inner.value, None)).await.ok();
287 }
288 Command::RenameTag(cmd) => {
289 use api::proto::RenameTagRequest;
290 let RenameTagRequest { from, to } = cmd.inner;
291
292 if let Some(value) = self.tags.remove(&from) {
293 self.tags.insert(to, value);
294 cmd.tx.send(Ok(())).await.ok();
295 } else {
296 cmd.tx
297 .send(Err(io::Error::new(io::ErrorKind::NotFound, "tag not found").into()))
298 .await
299 .ok();
300 }
301 }
302 Command::DeleteTags(cmd) => {
303 use api::proto::DeleteTagsRequest;
304 use std::ops::Bound;
305 let DeleteTagsRequest { from, to } = cmd.inner;
306
307 let start: Bound<&api::Tag> = from.as_ref().map_or(Bound::Unbounded, |t| Bound::Included(t));
309 let end: Bound<&api::Tag> = to.as_ref().map_or(Bound::Unbounded, |t| Bound::Excluded(t));
310
311 let to_delete: Vec<_> = self
312 .tags
313 .range::<api::Tag, _>((start, end))
314 .map(|(k, _)| k.clone())
315 .collect();
316
317 for tag in to_delete {
318 self.tags.remove(&tag);
319 }
320 cmd.tx.send(Ok(())).await.ok();
321 }
322 Command::DeleteBlobs(cmd) => {
323 cmd.tx
324 .send(Err(io::Error::other("delete blobs not supported").into()))
325 .await
326 .ok();
327 }
328 Command::ListBlobs(cmd) => {
329 let hashes: Vec<Hash> = self.blobs.lock().unwrap().keys().cloned().collect();
330 self.tasks.spawn(async move {
331 for hash in hashes {
332 cmd.tx.send(Ok(hash)).await.ok();
333 }
334 });
335 }
336 Command::BlobStatus(cmd) => {
337 let hash = cmd.inner.hash;
338 let metadata = self.blobs.lock().unwrap().get(&hash).cloned();
339 let status = if let Some(metadata) = metadata {
340 BlobStatus::Complete {
341 size: metadata.size,
342 }
343 } else {
344 BlobStatus::NotFound
345 };
346 cmd.tx.send(status).await.ok();
347 }
348 Command::ListTags(cmd) => {
349 use api::proto::TagInfo;
350 let tags: Vec<_> = self
351 .tags
352 .iter()
353 .map(|(name, value)| Ok(TagInfo {
354 name: name.clone(),
355 hash: value.hash,
356 format: value.format,
357 }))
358 .collect();
359 cmd.tx.send(tags).await.ok();
360 }
361 Command::SetTag(cmd) => {
362 use api::proto::SetTagRequest;
363 let SetTagRequest { name, value } = cmd.inner;
364
365 self.tags.insert(name, value);
366 cmd.tx.send(Ok(())).await.ok();
367 }
368 Command::ListTempTags(cmd) => {
369 cmd.tx.send(Vec::new()).await.ok();
370 }
371 Command::SyncDb(cmd) => {
372 cmd.tx.send(Ok(())).await.ok();
373 }
374 Command::Shutdown(cmd) => {
375 return Some(cmd.tx);
376 }
377 Command::ExportRanges(cmd) => {
378 let metadata = self.blobs.lock().unwrap().get(&cmd.inner.hash).cloned();
379 self.tasks.spawn(export_ranges(cmd, metadata));
380 }
381 }
382 None
383 }
384
385 fn log_unit_task(&self, res: Result<(), JoinError>) {
386 if let Err(e) = res {
387 eprintln!("task failed: {e}");
388 }
389 }
390
391 async fn run(mut self) {
392 loop {
393 tokio::select! {
394 Some(cmd) = self.commands.recv() => {
395 if let Some(shutdown) = self.handle_command(cmd).await {
396 shutdown.send(()).await.ok();
397 break;
398 }
399 },
400 Some(res) = self.tasks.join_next(), if !self.tasks.is_empty() => {
401 self.log_unit_task(res);
402 if self.tasks.is_empty() {
403 for tx in self.idle_waiters.drain(..) {
404 tx.send(()).await.ok();
405 }
406 }
407 },
408 else => break,
409 }
410 }
411 }
412
413 async fn handle_import_bytes(&mut self, msg: ImportBytesMsg) {
414 use bao_tree::io::outboard::PreOrderMemOutboard;
415 use iroh_blobs::api::blobs::AddProgressItem;
416
417 let ImportBytesMsg {
418 inner: proto::ImportBytesRequest { data, .. },
419 tx,
420 ..
421 } = msg;
422
423 let size = data.len() as u64;
424
425 if tx.send(AddProgressItem::Size(size)).await.is_err() {
426 return;
427 }
428 if tx.send(AddProgressItem::CopyDone).await.is_err() {
429 return;
430 }
431
432 let outboard = PreOrderMemOutboard::create(&data, IROH_BLOCK_SIZE);
434 let hash = Hash::from(outboard.root);
435
436 self.blobs.lock().unwrap().insert(
438 hash,
439 BlobMetadata {
440 size,
441 outboard: outboard.data.into(),
442 strategy: self.strategy,
443 },
444 );
445
446 let temp_tag = api::TempTag::new(
448 HashAndFormat {
449 hash,
450 format: BlobFormat::Raw,
451 },
452 None,
453 );
454 if tx.send(AddProgressItem::Done(temp_tag)).await.is_err() {}
455 }
456
457 async fn handle_import_byte_stream(&mut self, msg: ImportByteStreamMsg) {
458 use bao_tree::io::outboard::PreOrderMemOutboard;
459 use iroh_blobs::api::blobs::AddProgressItem;
460 use proto::ImportByteStreamUpdate;
461
462 let ImportByteStreamMsg { tx, mut rx, .. } = msg;
463
464 let mut data = Vec::new();
466 loop {
467 match rx.recv().await {
468 Ok(Some(ImportByteStreamUpdate::Bytes(chunk))) => {
469 data.extend_from_slice(&chunk);
470 if tx
471 .send(AddProgressItem::CopyProgress(data.len() as u64))
472 .await
473 .is_err()
474 {
475 return;
476 }
477 }
478 Ok(Some(ImportByteStreamUpdate::Done)) => {
479 break;
480 }
481 Ok(None) | Err(_) => {
482 tx.send(AddProgressItem::Error(io::Error::other(
483 "stream ended unexpectedly",
484 )))
485 .await
486 .ok();
487 return;
488 }
489 }
490 }
491
492 let size = data.len() as u64;
493
494 if tx.send(AddProgressItem::CopyDone).await.is_err() {
495 return;
496 }
497
498 let outboard = PreOrderMemOutboard::create(&data, IROH_BLOCK_SIZE);
500 let hash = Hash::from(outboard.root);
501
502 self.blobs.lock().unwrap().insert(
504 hash,
505 BlobMetadata {
506 size,
507 outboard: outboard.data.into(),
508 strategy: self.strategy,
509 },
510 );
511
512 let temp_tag = api::TempTag::new(
514 HashAndFormat {
515 hash,
516 format: BlobFormat::Raw,
517 },
518 None,
519 );
520 if tx.send(AddProgressItem::Done(temp_tag)).await.is_err() {}
521 }
522
523 async fn handle_import_bao(&mut self, msg: ImportBaoMsg) {
524 use bao_tree::io::outboard::PreOrderMemOutboard;
525 use proto::ImportBaoRequest;
526
527 let ImportBaoMsg {
528 inner: ImportBaoRequest { hash, size },
529 tx,
530 mut rx,
531 ..
532 } = msg;
533
534 let size_u64 = size.get();
535 let strategy = self.strategy;
536 let blobs = self.blobs.clone();
537
538 self.tasks.spawn(async move {
539 while let Ok(Some(_item)) = rx.recv().await {
541 }
543
544 let data = generate_data_for_strategy(size_u64, strategy);
546 let outboard = PreOrderMemOutboard::create(&data, IROH_BLOCK_SIZE);
547
548 blobs.lock().unwrap().insert(
550 hash,
551 BlobMetadata {
552 size: size_u64,
553 outboard: outboard.data.into(),
554 strategy,
555 },
556 );
557
558 tx.send(Ok(())).await.ok();
559 });
560 }
561}
562
563#[derive(Clone)]
565struct DataReader {
566 strategy: DataStrategy,
567}
568
569impl DataReader {
570 fn new(strategy: DataStrategy) -> Self {
571 Self { strategy }
572 }
573
574 fn byte_at(&self, offset: u64) -> u8 {
575 match self.strategy {
576 DataStrategy::Zeros => 0,
577 DataStrategy::Ones => 0xFF,
578 DataStrategy::PseudoRandom { seed } => {
579 let mut x = seed.wrapping_add(offset);
583 x = (x ^ (x >> 30)).wrapping_mul(0xbf58476d1ce4e5b9);
584 x = (x ^ (x >> 27)).wrapping_mul(0x94d049bb133111eb);
585 x = x ^ (x >> 31);
586 (x >> 24) as u8
587 }
588 }
589 }
590}
591
592impl ReadAt for DataReader {
593 fn read_at(&self, offset: u64, buf: &mut [u8]) -> io::Result<usize> {
594 for (i, byte) in buf.iter_mut().enumerate() {
595 *byte = self.byte_at(offset + i as u64);
596 }
597 Ok(buf.len())
598 }
599}
600
601impl ReadBytesAt for DataReader {
602 fn read_bytes_at(&self, offset: u64, size: usize) -> io::Result<Bytes> {
603 let mut data = vec![0u8; size];
604 self.read_at(offset, &mut data)?;
605 Ok(Bytes::from(data))
606 }
607}
608
609fn generate_data_for_strategy(size: u64, strategy: DataStrategy) -> Vec<u8> {
610 let reader = DataReader::new(strategy);
611 let mut data = vec![0u8; size as usize];
612 reader.read_at(0, &mut data).expect("read should succeed");
613 data
614}
615
616fn compute_hash_for_strategy(size: u64, strategy: DataStrategy) -> Hash {
617 let data = generate_data_for_strategy(size, strategy);
618 let outboard = PreOrderMemOutboard::create(&data, IROH_BLOCK_SIZE);
619 Hash::from(outboard.root)
620}
621
622async fn export_bao(
623 hash: Hash,
624 metadata: Option<BlobMetadata>,
625 ranges: ChunkRanges,
626 mut sender: mpsc::Sender<EncodedItem>,
627) {
628 let metadata = match metadata {
629 Some(metadata) => metadata,
630 None => {
631 sender
632 .send(EncodedItem::Error(bao_tree::io::EncodeError::Io(
633 io::Error::new(
634 io::ErrorKind::UnexpectedEof,
635 "export task ended unexpectedly",
636 ),
637 )))
638 .await
639 .ok();
640 return;
641 }
642 };
643
644 let size = metadata.size;
645 let data = DataReader::new(metadata.strategy);
646
647 let tree = BaoTree::new(size, IROH_BLOCK_SIZE);
648 let outboard = PreOrderMemOutboard {
649 root: hash.into(),
650 tree,
651 data: metadata.outboard,
652 };
653
654 let sender = BaoTreeSender::ref_cast_mut(&mut sender);
655 traverse_ranges_validated(data, outboard, &ranges, sender)
656 .await
657 .ok();
658}
659
660async fn export_ranges(mut cmd: ExportRangesMsg, metadata: Option<BlobMetadata>) {
661 let Some(metadata) = metadata else {
662 cmd.tx
663 .send(ExportRangesItem::Error(api::Error::io(
664 io::ErrorKind::NotFound,
665 "hash not found",
666 )))
667 .await
668 .ok();
669 return;
670 };
671 if let Err(cause) = export_ranges_impl(cmd.inner, &mut cmd.tx, metadata).await {
672 cmd.tx
673 .send(ExportRangesItem::Error(cause.into()))
674 .await
675 .ok();
676 }
677}
678
679async fn export_ranges_impl(
680 cmd: ExportRangesRequest,
681 tx: &mut mpsc::Sender<ExportRangesItem>,
682 metadata: BlobMetadata,
683) -> io::Result<()> {
684 let ExportRangesRequest { ranges, .. } = cmd;
685 let size = metadata.size;
686 let bitfield = Bitfield::complete(size);
687
688 for range in ranges.iter() {
689 let range = match range {
690 RangeSetRange::Range(range) => size.min(*range.start)..size.min(*range.end),
691 RangeSetRange::RangeFrom(range) => size.min(*range.start)..size,
692 };
693 let requested = ChunkRanges::bytes(range.start..range.end);
694 if !bitfield.ranges.is_superset(&requested) {
695 return Err(io::Error::other(format!(
696 "missing range: {requested:?}, present: {bitfield:?}",
697 )));
698 }
699 let reader = DataReader::new(metadata.strategy);
700 let bs = 1024;
701 let mut offset = range.start;
702 loop {
703 let end: u64 = (offset + bs).min(range.end);
704 let chunk_size = (end - offset) as usize;
705 let data = reader.read_bytes_at(offset, chunk_size)?;
706 tx.send(Leaf { offset, data }.into()).await?;
707 offset = end;
708 if offset >= range.end {
709 break;
710 }
711 }
712 }
713 Ok(())
714}
715
716impl FakeStore {
717 pub fn new(sizes: impl IntoIterator<Item = u64>) -> Self {
719 Self::builder().with_blobs(sizes).build()
720 }
721
722 pub fn builder() -> FakeStoreBuilder {
723 FakeStoreBuilder::new()
724 }
725
726 fn new_with_config(sizes: impl IntoIterator<Item = u64>, config: FakeStoreConfig) -> Self {
727 let mut blobs = HashMap::new();
728 for size in sizes {
729 let hash = compute_hash_for_strategy(size, config.strategy);
730 let data = generate_data_for_strategy(size, config.strategy);
731 let outboard_data = PreOrderMemOutboard::create(&data, IROH_BLOCK_SIZE);
732 blobs.insert(
733 hash,
734 BlobMetadata {
735 size,
736 outboard: outboard_data.data.into(),
737 strategy: config.strategy,
738 },
739 );
740 }
741
742 let (sender, receiver) = tokio::sync::mpsc::channel(1);
743 let actor = Actor::new(receiver, blobs, config.strategy);
744 tokio::spawn(actor.run());
745
746 let local = irpc::LocalSender::from(sender);
748 let client = local.into();
749
750 Self {
751 store: Store::ref_cast(&client).clone(),
752 }
753 }
754
755 pub fn store(&self) -> &Store {
756 &self.store
757 }
758}
759
760async fn export_path(
761 metadata: Option<BlobMetadata>,
762 target: PathBuf,
763 mut tx: mpsc::Sender<ExportProgressItem>,
764) {
765 let Some(metadata) = metadata else {
766 tx.send(api::Error::io(io::ErrorKind::NotFound, "hash not found").into())
767 .await
768 .ok();
769 return;
770 };
771 match export_path_impl(metadata, target, &mut tx).await {
772 Ok(()) => tx.send(ExportProgressItem::Done).await.ok(),
773 Err(cause) => tx.send(api::Error::from(cause).into()).await.ok(),
774 };
775}
776
777async fn export_path_impl(
778 metadata: BlobMetadata,
779 target: PathBuf,
780 tx: &mut mpsc::Sender<ExportProgressItem>,
781) -> io::Result<()> {
782 let size = metadata.size;
783 let mut file = std::fs::File::create(&target)?;
784 tx.send(ExportProgressItem::Size(size)).await?;
785
786 let buf = [0u8; 1024 * 64];
787 for offset in (0..size).step_by(1024 * 64) {
788 let len = std::cmp::min(size - offset, 1024 * 64) as usize;
789 let buf = &buf[..len];
790 file.write_all(buf)?;
791 tx.try_send(ExportProgressItem::CopyProgress(offset))
792 .await?;
793 }
794 Ok(())
795}
796
797#[cfg(test)]
798mod tests;