iroh_blobs/store/
mem.rs

1//! Mutable in-memory blob store.
2//!
3//! Being a memory store, this store has to import all data into memory before it can
4//! serve it. So the amount of data you can serve is limited by your available memory.
5//! Other than that this is a fully featured store that provides all features such as
6//! tags and garbage collection.
7//!
8//! For many use cases this can be quite useful, since it does not require write access
9//! to the file system.
10use std::{
11    collections::{BTreeMap, HashMap, HashSet},
12    future::Future,
13    io::{self, Write},
14    num::NonZeroU64,
15    ops::Deref,
16    sync::Arc,
17    time::SystemTime,
18};
19
20use bao_tree::{
21    blake3,
22    io::{
23        mixed::{traverse_ranges_validated, EncodedItem, ReadBytesAt},
24        outboard::PreOrderMemOutboard,
25        sync::{Outboard, ReadAt, WriteAt},
26        BaoContentItem, Leaf,
27    },
28    BaoTree, ChunkNum, ChunkRanges, TreeNode,
29};
30use bytes::Bytes;
31use irpc::channel::mpsc;
32use n0_future::future::yield_now;
33use range_collections::range_set::RangeSetRange;
34use tokio::{
35    io::AsyncReadExt,
36    sync::watch,
37    task::{JoinError, JoinSet},
38};
39use tracing::{error, info, instrument, trace, Instrument};
40
41use super::util::{BaoTreeSender, PartialMemStorage};
42use crate::{
43    api::{
44        self,
45        blobs::{AddProgressItem, Bitfield, BlobStatus, ExportProgressItem},
46        proto::{
47            BatchMsg, BatchResponse, BlobDeleteRequest, BlobStatusMsg, BlobStatusRequest, Command,
48            CreateTagMsg, CreateTagRequest, CreateTempTagMsg, DeleteBlobsMsg, DeleteTagsMsg,
49            DeleteTagsRequest, ExportBaoMsg, ExportBaoRequest, ExportPathMsg, ExportPathRequest,
50            ExportRangesItem, ExportRangesMsg, ExportRangesRequest, ImportBaoMsg, ImportBaoRequest,
51            ImportByteStreamMsg, ImportByteStreamUpdate, ImportBytesMsg, ImportBytesRequest,
52            ImportPathMsg, ImportPathRequest, ListBlobsMsg, ListTagsMsg, ListTagsRequest,
53            ObserveMsg, ObserveRequest, RenameTagMsg, RenameTagRequest, Scope, SetTagMsg,
54            SetTagRequest, ShutdownMsg, SyncDbMsg,
55        },
56        tags::TagInfo,
57        ApiClient,
58    },
59    store::{
60        util::{SizeInfo, SparseMemFile, Tag},
61        HashAndFormat, IROH_BLOCK_SIZE,
62    },
63    util::{
64        temp_tag::{TagDrop, TempTagScope, TempTags},
65        ChunkRangesExt,
66    },
67    BlobFormat, Hash,
68};
69
70#[derive(Debug, Default)]
71pub struct Options {}
72
73#[derive(Debug, Clone)]
74#[repr(transparent)]
75pub struct MemStore {
76    client: ApiClient,
77}
78
79impl AsRef<crate::api::Store> for MemStore {
80    fn as_ref(&self) -> &crate::api::Store {
81        crate::api::Store::ref_from_sender(&self.client)
82    }
83}
84
85impl Deref for MemStore {
86    type Target = crate::api::Store;
87
88    fn deref(&self) -> &Self::Target {
89        crate::api::Store::ref_from_sender(&self.client)
90    }
91}
92
93impl Default for MemStore {
94    fn default() -> Self {
95        Self::new()
96    }
97}
98
99#[derive(derive_more::From)]
100enum TaskResult {
101    Unit(()),
102    Import(anyhow::Result<ImportEntry>),
103    Scope(Scope),
104}
105
106impl MemStore {
107    pub fn from_sender(client: ApiClient) -> Self {
108        Self { client }
109    }
110
111    pub fn new() -> Self {
112        let (sender, receiver) = tokio::sync::mpsc::channel(32);
113        tokio::spawn(
114            Actor {
115                commands: receiver,
116                tasks: JoinSet::new(),
117                state: State {
118                    data: HashMap::new(),
119                    tags: BTreeMap::new(),
120                },
121                options: Arc::new(Options::default()),
122                temp_tags: Default::default(),
123                protected: Default::default(),
124            }
125            .run(),
126        );
127        Self::from_sender(sender.into())
128    }
129}
130
131struct Actor {
132    commands: tokio::sync::mpsc::Receiver<Command>,
133    tasks: JoinSet<TaskResult>,
134    state: State,
135    #[allow(dead_code)]
136    options: Arc<Options>,
137    // temp tags
138    temp_tags: TempTags,
139    protected: HashSet<Hash>,
140}
141
142impl Actor {
143    fn spawn<F, T>(&mut self, f: F)
144    where
145        F: Future<Output = T> + Send + 'static,
146        T: Into<TaskResult>,
147    {
148        let span = tracing::Span::current();
149        let fut = async move { f.await.into() }.instrument(span);
150        self.tasks.spawn(fut);
151    }
152
153    async fn handle_command(&mut self, cmd: Command) -> Option<ShutdownMsg> {
154        match cmd {
155            Command::ImportBao(ImportBaoMsg {
156                inner: ImportBaoRequest { hash, size },
157                rx: data,
158                tx,
159                ..
160            }) => {
161                let entry = self.get_or_create_entry(hash);
162                self.spawn(import_bao(entry, size, data, tx));
163            }
164            Command::Observe(ObserveMsg {
165                inner: ObserveRequest { hash },
166                tx,
167                ..
168            }) => {
169                let entry = self.get_or_create_entry(hash);
170                self.spawn(observe(entry, tx));
171            }
172            Command::ImportBytes(ImportBytesMsg {
173                inner:
174                    ImportBytesRequest {
175                        data,
176                        scope,
177                        format,
178                        ..
179                    },
180                tx,
181                ..
182            }) => {
183                self.spawn(import_bytes(data, scope, format, tx));
184            }
185            Command::ImportByteStream(ImportByteStreamMsg { inner, tx, rx, .. }) => {
186                self.spawn(import_byte_stream(inner.scope, inner.format, rx, tx));
187            }
188            Command::ImportPath(cmd) => {
189                self.spawn(import_path(cmd));
190            }
191            Command::ExportBao(ExportBaoMsg {
192                inner: ExportBaoRequest { hash, ranges },
193                tx,
194                ..
195            }) => {
196                let entry = self.get_or_create_entry(hash);
197                self.spawn(export_bao(entry, ranges, tx));
198            }
199            Command::ExportPath(cmd) => {
200                let entry = self.state.data.get(&cmd.hash).cloned();
201                self.spawn(export_path(entry, cmd));
202            }
203            Command::DeleteTags(cmd) => {
204                let DeleteTagsMsg {
205                    inner: DeleteTagsRequest { from, to },
206                    tx,
207                    ..
208                } = cmd;
209                info!("deleting tags from {:?} to {:?}", from, to);
210                // state.tags.remove(&from.unwrap());
211                // todo: more efficient impl
212                self.state.tags.retain(|tag, _| {
213                    if let Some(from) = &from {
214                        if tag < from {
215                            return true;
216                        }
217                    }
218                    if let Some(to) = &to {
219                        if tag >= to {
220                            return true;
221                        }
222                    }
223                    info!("    removing {:?}", tag);
224                    false
225                });
226                tx.send(Ok(())).await.ok();
227            }
228            Command::RenameTag(cmd) => {
229                let RenameTagMsg {
230                    inner: RenameTagRequest { from, to },
231                    tx,
232                    ..
233                } = cmd;
234                let tags = &mut self.state.tags;
235                let value = match tags.remove(&from) {
236                    Some(value) => value,
237                    None => {
238                        tx.send(Err(api::Error::io(
239                            io::ErrorKind::NotFound,
240                            format!("tag not found: {from:?}"),
241                        )))
242                        .await
243                        .ok();
244                        return None;
245                    }
246                };
247                tags.insert(to, value);
248                tx.send(Ok(())).await.ok();
249                return None;
250            }
251            Command::ListTags(cmd) => {
252                let ListTagsMsg {
253                    inner:
254                        ListTagsRequest {
255                            from,
256                            to,
257                            raw,
258                            hash_seq,
259                        },
260                    tx,
261                    ..
262                } = cmd;
263                let tags = self
264                    .state
265                    .tags
266                    .iter()
267                    .filter(move |(tag, value)| {
268                        if let Some(from) = &from {
269                            if tag < &from {
270                                return false;
271                            }
272                        }
273                        if let Some(to) = &to {
274                            if tag >= &to {
275                                return false;
276                            }
277                        }
278                        raw && value.format.is_raw() || hash_seq && value.format.is_hash_seq()
279                    })
280                    .map(|(tag, value)| TagInfo {
281                        name: tag.clone(),
282                        hash: value.hash,
283                        format: value.format,
284                    })
285                    .map(Ok);
286                tx.send(tags.collect()).await.ok();
287            }
288            Command::SetTag(SetTagMsg {
289                inner: SetTagRequest { name: tag, value },
290                tx,
291                ..
292            }) => {
293                self.state.tags.insert(tag, value);
294                tx.send(Ok(())).await.ok();
295            }
296            Command::CreateTag(CreateTagMsg {
297                inner: CreateTagRequest { value },
298                tx,
299                ..
300            }) => {
301                let tag = Tag::auto(SystemTime::now(), |tag| self.state.tags.contains_key(tag));
302                self.state.tags.insert(tag.clone(), value);
303                tx.send(Ok(tag)).await.ok();
304            }
305            Command::CreateTempTag(cmd) => {
306                trace!("{cmd:?}");
307                self.create_temp_tag(cmd).await;
308            }
309            Command::ListTempTags(cmd) => {
310                trace!("{cmd:?}");
311                let tts = self.temp_tags.list();
312                cmd.tx.send(tts).await.ok();
313            }
314            Command::ListBlobs(cmd) => {
315                let ListBlobsMsg { tx, .. } = cmd;
316                let blobs = self.state.data.keys().cloned().collect::<Vec<Hash>>();
317                self.spawn(async move {
318                    for blob in blobs {
319                        if tx.send(Ok(blob)).await.is_err() {
320                            break;
321                        }
322                    }
323                });
324            }
325            Command::BlobStatus(cmd) => {
326                trace!("{cmd:?}");
327                let BlobStatusMsg {
328                    inner: BlobStatusRequest { hash },
329                    tx,
330                    ..
331                } = cmd;
332                let res = match self.state.data.get(&hash) {
333                    None => api::blobs::BlobStatus::NotFound,
334                    Some(x) => {
335                        let bitfield = x.0.state.borrow().bitfield();
336                        if bitfield.is_complete() {
337                            BlobStatus::Complete {
338                                size: bitfield.size,
339                            }
340                        } else {
341                            BlobStatus::Partial {
342                                size: bitfield.validated_size(),
343                            }
344                        }
345                    }
346                };
347                tx.send(res).await.ok();
348            }
349            Command::DeleteBlobs(cmd) => {
350                trace!("{cmd:?}");
351                let DeleteBlobsMsg {
352                    inner: BlobDeleteRequest { hashes, force },
353                    tx,
354                    ..
355                } = cmd;
356                for hash in hashes {
357                    if !force && self.protected.contains(&hash) {
358                        continue;
359                    }
360                    self.state.data.remove(&hash);
361                }
362                tx.send(Ok(())).await.ok();
363            }
364            Command::Batch(cmd) => {
365                trace!("{cmd:?}");
366                let (id, scope) = self.temp_tags.create_scope();
367                self.spawn(handle_batch(cmd, id, scope));
368            }
369            Command::ClearProtected(cmd) => {
370                self.protected.clear();
371                cmd.tx.send(Ok(())).await.ok();
372            }
373            Command::ExportRanges(cmd) => {
374                let entry = self.get_or_create_entry(cmd.hash);
375                self.spawn(export_ranges(cmd, entry.clone()));
376            }
377            Command::SyncDb(SyncDbMsg { tx, .. }) => {
378                tx.send(Ok(())).await.ok();
379            }
380            Command::Shutdown(cmd) => {
381                return Some(cmd);
382            }
383        }
384        None
385    }
386
387    fn get_or_create_entry(&mut self, hash: Hash) -> BaoFileHandle {
388        self.state
389            .data
390            .entry(hash)
391            .or_insert_with(|| BaoFileHandle::new_partial(hash))
392            .clone()
393    }
394
395    async fn create_temp_tag(&mut self, cmd: CreateTempTagMsg) {
396        let CreateTempTagMsg { tx, inner, .. } = cmd;
397        let mut tt = self.temp_tags.create(inner.scope, inner.value);
398        if tx.is_rpc() {
399            tt.leak();
400        }
401        tx.send(tt).await.ok();
402    }
403
404    async fn finish_import(&mut self, res: anyhow::Result<ImportEntry>) {
405        let import_data = match res {
406            Ok(entry) => entry,
407            Err(e) => {
408                error!("import failed: {e}");
409                return;
410            }
411        };
412        let hash = import_data.outboard.root().into();
413        let entry = self.get_or_create_entry(hash);
414        entry
415            .0
416            .state
417            .send_if_modified(|state: &mut BaoFileStorage| {
418                let BaoFileStorage::Partial(_) = state.deref() else {
419                    return false;
420                };
421                *state =
422                    CompleteStorage::new(import_data.data, import_data.outboard.data.into()).into();
423                true
424            });
425        let tt = self.temp_tags.create(
426            import_data.scope,
427            HashAndFormat {
428                hash,
429                format: import_data.format,
430            },
431        );
432        import_data.tx.send(AddProgressItem::Done(tt)).await.ok();
433    }
434
435    fn log_task_result(&self, res: Result<TaskResult, JoinError>) -> Option<TaskResult> {
436        match res {
437            Ok(x) => Some(x),
438            Err(e) => {
439                if e.is_cancelled() {
440                    trace!("task cancelled: {e}");
441                } else {
442                    error!("task failed: {e}");
443                }
444                None
445            }
446        }
447    }
448
449    pub async fn run(mut self) {
450        let shutdown = loop {
451            tokio::select! {
452                cmd = self.commands.recv() => {
453                    let Some(cmd) = cmd else {
454                        // last sender has been dropped.
455                        // exit immediately.
456                        break None;
457                    };
458                    if let Some(cmd) = self.handle_command(cmd).await {
459                        break Some(cmd);
460                    }
461                }
462                Some(res) = self.tasks.join_next(), if !self.tasks.is_empty() => {
463                    let Some(res) = self.log_task_result(res) else {
464                        continue;
465                    };
466                    match res {
467                        TaskResult::Import(res) => {
468                            self.finish_import(res).await;
469                        }
470                        TaskResult::Scope(scope) => {
471                            self.temp_tags.end_scope(scope);
472                        }
473                        TaskResult::Unit(_) => {}
474                    }
475                }
476            }
477        };
478        if let Some(shutdown) = shutdown {
479            shutdown.tx.send(()).await.ok();
480        }
481    }
482}
483
484async fn handle_batch(cmd: BatchMsg, id: Scope, scope: Arc<TempTagScope>) -> Scope {
485    if let Err(cause) = handle_batch_impl(cmd, id, &scope).await {
486        error!("batch failed: {cause}");
487    }
488    id
489}
490
491async fn handle_batch_impl(cmd: BatchMsg, id: Scope, scope: &Arc<TempTagScope>) -> api::Result<()> {
492    let BatchMsg { tx, mut rx, .. } = cmd;
493    trace!("created scope {}", id);
494    tx.send(id).await.map_err(api::Error::other)?;
495    while let Some(msg) = rx.recv().await? {
496        match msg {
497            BatchResponse::Drop(msg) => scope.on_drop(&msg),
498            BatchResponse::Ping => {}
499        }
500    }
501    Ok(())
502}
503
504async fn export_ranges(mut cmd: ExportRangesMsg, entry: BaoFileHandle) {
505    if let Err(cause) = export_ranges_impl(cmd.inner, &mut cmd.tx, entry).await {
506        cmd.tx
507            .send(ExportRangesItem::Error(cause.into()))
508            .await
509            .ok();
510    }
511}
512
513async fn export_ranges_impl(
514    cmd: ExportRangesRequest,
515    tx: &mut mpsc::Sender<ExportRangesItem>,
516    entry: BaoFileHandle,
517) -> io::Result<()> {
518    let ExportRangesRequest { ranges, hash } = cmd;
519    let bitfield = entry.bitfield();
520    trace!(
521        "exporting ranges: {hash} {ranges:?} size={}",
522        bitfield.size()
523    );
524    debug_assert!(entry.hash() == hash, "hash mismatch");
525    let data = entry.data_reader();
526    let size = bitfield.size();
527    for range in ranges.iter() {
528        let range = match range {
529            RangeSetRange::Range(range) => size.min(*range.start)..size.min(*range.end),
530            RangeSetRange::RangeFrom(range) => size.min(*range.start)..size,
531        };
532        let requested = ChunkRanges::bytes(range.start..range.end);
533        if !bitfield.ranges.is_superset(&requested) {
534            return Err(io::Error::other(format!(
535                "missing range: {requested:?}, present: {bitfield:?}",
536            )));
537        }
538        let bs = 1024;
539        let mut offset = range.start;
540        loop {
541            let end: u64 = (offset + bs).min(range.end);
542            let size = (end - offset) as usize;
543            tx.send(
544                Leaf {
545                    offset,
546                    data: data.read_bytes_at(offset, size)?,
547                }
548                .into(),
549            )
550            .await?;
551            offset = end;
552            if offset >= range.end {
553                break;
554            }
555        }
556    }
557    Ok(())
558}
559
560fn chunk_range(leaf: &Leaf) -> ChunkRanges {
561    let start = ChunkNum::chunks(leaf.offset);
562    let end = ChunkNum::chunks(leaf.offset + leaf.data.len() as u64);
563    (start..end).into()
564}
565
566async fn import_bao(
567    entry: BaoFileHandle,
568    size: NonZeroU64,
569    mut stream: mpsc::Receiver<BaoContentItem>,
570    tx: irpc::channel::oneshot::Sender<api::Result<()>>,
571) {
572    let size = size.get();
573    entry
574        .0
575        .state
576        .send_if_modified(|state: &mut BaoFileStorage| {
577            let BaoFileStorage::Partial(entry) = state else {
578                // entry was already completed, no need to write
579                return false;
580            };
581            entry.size.write(0, size);
582            false
583        });
584    let tree = BaoTree::new(size, IROH_BLOCK_SIZE);
585    while let Some(item) = stream.recv().await.unwrap() {
586        entry.0.state.send_if_modified(|state| {
587            let BaoFileStorage::Partial(partial) = state else {
588                // entry was already completed, no need to write
589                return false;
590            };
591            match item {
592                BaoContentItem::Parent(parent) => {
593                    if let Some(offset) = tree.pre_order_offset(parent.node) {
594                        let mut pair = [0u8; 64];
595                        pair[..32].copy_from_slice(parent.pair.0.as_bytes());
596                        pair[32..].copy_from_slice(parent.pair.1.as_bytes());
597                        partial
598                            .outboard
599                            .write_at(offset * 64, &pair)
600                            .expect("writing to mem can never fail");
601                    }
602                    false
603                }
604                BaoContentItem::Leaf(leaf) => {
605                    let start = leaf.offset;
606                    partial
607                        .data
608                        .write_at(start, &leaf.data)
609                        .expect("writing to mem can never fail");
610                    let added = chunk_range(&leaf);
611                    let update = partial.bitfield.update(&Bitfield::new(added.clone(), size));
612                    if update.new_state().complete {
613                        let data = std::mem::take(&mut partial.data);
614                        let outboard = std::mem::take(&mut partial.outboard);
615                        let data: Bytes = <Vec<u8>>::try_from(data).unwrap().into();
616                        let outboard: Bytes = <Vec<u8>>::try_from(outboard).unwrap().into();
617                        *state = CompleteStorage::new(data, outboard).into();
618                    }
619                    update.changed()
620                }
621            }
622        });
623    }
624    tx.send(Ok(())).await.ok();
625}
626
627#[instrument(skip_all, fields(hash = %entry.hash.fmt_short()))]
628async fn export_bao(
629    entry: BaoFileHandle,
630    ranges: ChunkRanges,
631    mut sender: mpsc::Sender<EncodedItem>,
632) {
633    let data = entry.data_reader();
634    let outboard = entry.outboard_reader();
635    let tx = BaoTreeSender::new(&mut sender);
636    traverse_ranges_validated(data, outboard, &ranges, tx)
637        .await
638        .ok();
639}
640
641#[instrument(skip_all, fields(hash = %entry.hash.fmt_short()))]
642async fn observe(entry: BaoFileHandle, tx: mpsc::Sender<api::blobs::Bitfield>) {
643    entry.subscribe().forward(tx).await.ok();
644}
645
646async fn import_bytes(
647    data: Bytes,
648    scope: Scope,
649    format: BlobFormat,
650    tx: mpsc::Sender<AddProgressItem>,
651) -> anyhow::Result<ImportEntry> {
652    tx.send(AddProgressItem::Size(data.len() as u64)).await?;
653    tx.send(AddProgressItem::CopyDone).await?;
654    let outboard = PreOrderMemOutboard::create(&data, IROH_BLOCK_SIZE);
655    Ok(ImportEntry {
656        data,
657        outboard,
658        scope,
659        format,
660        tx,
661    })
662}
663
664async fn import_byte_stream(
665    scope: Scope,
666    format: BlobFormat,
667    mut rx: mpsc::Receiver<ImportByteStreamUpdate>,
668    tx: mpsc::Sender<AddProgressItem>,
669) -> anyhow::Result<ImportEntry> {
670    let mut res = Vec::new();
671    loop {
672        match rx.recv().await {
673            Ok(Some(ImportByteStreamUpdate::Bytes(data))) => {
674                res.extend_from_slice(&data);
675                tx.send(AddProgressItem::CopyProgress(res.len() as u64))
676                    .await?;
677            }
678            Ok(Some(ImportByteStreamUpdate::Done)) => {
679                break;
680            }
681            Ok(None) => {
682                return Err(api::Error::io(
683                    io::ErrorKind::UnexpectedEof,
684                    "byte stream ended unexpectedly",
685                )
686                .into());
687            }
688            Err(e) => {
689                return Err(e.into());
690            }
691        }
692    }
693    import_bytes(res.into(), scope, format, tx).await
694}
695
696#[instrument(skip_all, fields(path = %cmd.path.display()))]
697async fn import_path(cmd: ImportPathMsg) -> anyhow::Result<ImportEntry> {
698    let ImportPathMsg {
699        inner:
700            ImportPathRequest {
701                path,
702                scope,
703                format,
704                ..
705            },
706        tx,
707        ..
708    } = cmd;
709    let mut res = Vec::new();
710    let mut file = tokio::fs::File::open(path).await?;
711    let mut buf = [0u8; 1024 * 64];
712    loop {
713        let size = file.read(&mut buf).await?;
714        if size == 0 {
715            break;
716        }
717        res.extend_from_slice(&buf[..size]);
718        tx.send(AddProgressItem::CopyProgress(res.len() as u64))
719            .await?;
720    }
721    import_bytes(res.into(), scope, format, tx).await
722}
723
724#[instrument(skip_all, fields(hash = %cmd.hash.fmt_short(), path = %cmd.target.display()))]
725async fn export_path(entry: Option<BaoFileHandle>, cmd: ExportPathMsg) {
726    let ExportPathMsg { inner, mut tx, .. } = cmd;
727    let Some(entry) = entry else {
728        tx.send(ExportProgressItem::Error(api::Error::io(
729            io::ErrorKind::NotFound,
730            "hash not found",
731        )))
732        .await
733        .ok();
734        return;
735    };
736    match export_path_impl(entry, inner, &mut tx).await {
737        Ok(()) => tx.send(ExportProgressItem::Done).await.ok(),
738        Err(e) => tx.send(ExportProgressItem::Error(e.into())).await.ok(),
739    };
740}
741
742async fn export_path_impl(
743    entry: BaoFileHandle,
744    cmd: ExportPathRequest,
745    tx: &mut mpsc::Sender<ExportProgressItem>,
746) -> io::Result<()> {
747    let ExportPathRequest { target, .. } = cmd;
748    // todo: for partial entries make sure to only write the part that is actually present
749    let mut file = std::fs::File::create(target)?;
750    let size = entry.0.state.borrow().size();
751    tx.send(ExportProgressItem::Size(size)).await?;
752    let mut buf = [0u8; 1024 * 64];
753    for offset in (0..size).step_by(1024 * 64) {
754        let len = std::cmp::min(size - offset, 1024 * 64) as usize;
755        let buf = &mut buf[..len];
756        entry.0.state.borrow().data().read_exact_at(offset, buf)?;
757        file.write_all(buf)?;
758        tx.try_send(ExportProgressItem::CopyProgress(offset))
759            .await
760            .map_err(|_e| io::Error::other(""))?;
761        yield_now().await;
762    }
763    Ok(())
764}
765
766struct ImportEntry {
767    scope: Scope,
768    format: BlobFormat,
769    data: Bytes,
770    outboard: PreOrderMemOutboard,
771    tx: mpsc::Sender<AddProgressItem>,
772}
773
774pub struct DataReader(BaoFileHandle);
775
776impl ReadBytesAt for DataReader {
777    fn read_bytes_at(&self, offset: u64, size: usize) -> std::io::Result<Bytes> {
778        let entry = self.0 .0.state.borrow();
779        entry.data().read_bytes_at(offset, size)
780    }
781}
782
783pub struct OutboardReader {
784    hash: blake3::Hash,
785    tree: BaoTree,
786    data: BaoFileHandle,
787}
788
789impl Outboard for OutboardReader {
790    fn root(&self) -> blake3::Hash {
791        self.hash
792    }
793
794    fn tree(&self) -> BaoTree {
795        self.tree
796    }
797
798    fn load(&self, node: TreeNode) -> io::Result<Option<(blake3::Hash, blake3::Hash)>> {
799        let Some(offset) = self.tree.pre_order_offset(node) else {
800            return Ok(None);
801        };
802        let mut buf = [0u8; 64];
803        let size = self
804            .data
805            .0
806            .state
807            .borrow()
808            .outboard()
809            .read_at(offset * 64, &mut buf)?;
810        if size != 64 {
811            return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "short read"));
812        }
813        let left: [u8; 32] = buf[..32].try_into().unwrap();
814        let right: [u8; 32] = buf[32..].try_into().unwrap();
815        Ok(Some((left.into(), right.into())))
816    }
817}
818
819struct State {
820    data: HashMap<Hash, BaoFileHandle>,
821    tags: BTreeMap<Tag, HashAndFormat>,
822}
823
824#[derive(Debug, derive_more::From)]
825pub enum BaoFileStorage {
826    Partial(PartialMemStorage),
827    Complete(CompleteStorage),
828}
829
830impl BaoFileStorage {
831    /// Get the bitfield of the storage.
832    pub fn bitfield(&self) -> Bitfield {
833        match self {
834            Self::Partial(entry) => entry.bitfield.clone(),
835            Self::Complete(entry) => Bitfield::complete(entry.size()),
836        }
837    }
838}
839
840#[derive(Debug)]
841pub struct BaoFileHandleInner {
842    state: watch::Sender<BaoFileStorage>,
843    hash: Hash,
844}
845
846/// A cheaply cloneable handle to a bao file, including the hash
847#[derive(Debug, Clone, derive_more::Deref)]
848pub struct BaoFileHandle(Arc<BaoFileHandleInner>);
849
850impl BaoFileHandle {
851    pub fn new_partial(hash: Hash) -> Self {
852        let (state, _) = watch::channel(BaoFileStorage::Partial(PartialMemStorage {
853            data: SparseMemFile::new(),
854            outboard: SparseMemFile::new(),
855            size: SizeInfo::default(),
856            bitfield: Bitfield::empty(),
857        }));
858        Self(Arc::new(BaoFileHandleInner { state, hash }))
859    }
860
861    pub fn hash(&self) -> Hash {
862        self.hash
863    }
864
865    pub fn bitfield(&self) -> Bitfield {
866        self.0.state.borrow().bitfield()
867    }
868
869    pub fn subscribe(&self) -> BaoFileStorageSubscriber {
870        BaoFileStorageSubscriber::new(self.0.state.subscribe())
871    }
872
873    pub fn data_reader(&self) -> DataReader {
874        DataReader(self.clone())
875    }
876
877    pub fn outboard_reader(&self) -> OutboardReader {
878        let entry = self.0.state.borrow();
879        let hash = self.hash.into();
880        let tree = BaoTree::new(entry.size(), IROH_BLOCK_SIZE);
881        OutboardReader {
882            hash,
883            tree,
884            data: self.clone(),
885        }
886    }
887}
888
889impl Default for BaoFileStorage {
890    fn default() -> Self {
891        Self::Partial(Default::default())
892    }
893}
894
895impl BaoFileStorage {
896    fn data(&self) -> &[u8] {
897        match self {
898            Self::Partial(entry) => entry.data.as_ref(),
899            Self::Complete(entry) => &entry.data,
900        }
901    }
902
903    fn outboard(&self) -> &[u8] {
904        match self {
905            Self::Partial(entry) => entry.outboard.as_ref(),
906            Self::Complete(entry) => &entry.outboard,
907        }
908    }
909
910    fn size(&self) -> u64 {
911        match self {
912            Self::Partial(entry) => entry.current_size(),
913            Self::Complete(entry) => entry.size(),
914        }
915    }
916}
917
918#[derive(Debug, Clone)]
919pub struct CompleteStorage {
920    pub(crate) data: Bytes,
921    pub(crate) outboard: Bytes,
922}
923
924impl CompleteStorage {
925    pub fn create(data: Bytes) -> (Hash, Self) {
926        let outboard = PreOrderMemOutboard::create(&data, IROH_BLOCK_SIZE);
927        let hash = outboard.root().into();
928        let outboard = outboard.data.into();
929        let entry = Self::new(data, outboard);
930        (hash, entry)
931    }
932
933    pub fn new(data: Bytes, outboard: Bytes) -> Self {
934        Self { data, outboard }
935    }
936
937    pub fn size(&self) -> u64 {
938        self.data.len() as u64
939    }
940}
941
942#[allow(dead_code)]
943fn print_outboard(hashes: &[u8]) {
944    assert!(hashes.len() % 64 == 0);
945    for chunk in hashes.chunks(64) {
946        let left: [u8; 32] = chunk[..32].try_into().unwrap();
947        let right: [u8; 32] = chunk[32..].try_into().unwrap();
948        let left = blake3::Hash::from(left);
949        let right = blake3::Hash::from(right);
950        println!("l: {left:?}, r: {right:?}");
951    }
952}
953
954pub struct BaoFileStorageSubscriber {
955    receiver: watch::Receiver<BaoFileStorage>,
956}
957
958impl BaoFileStorageSubscriber {
959    pub fn new(receiver: watch::Receiver<BaoFileStorage>) -> Self {
960        Self { receiver }
961    }
962
963    /// Forward observed *values* to the given sender
964    ///
965    /// Returns an error if sending fails, or if the last sender is dropped
966    pub async fn forward(mut self, mut tx: mpsc::Sender<Bitfield>) -> anyhow::Result<()> {
967        let value = self.receiver.borrow().bitfield();
968        tx.send(value).await?;
969        loop {
970            self.update_or_closed(&mut tx).await?;
971            let value = self.receiver.borrow().bitfield();
972            tx.send(value.clone()).await?;
973        }
974    }
975
976    /// Forward observed *deltas* to the given sender
977    ///
978    /// Returns an error if sending fails, or if the last sender is dropped
979    #[allow(dead_code)]
980    pub async fn forward_delta(mut self, mut tx: mpsc::Sender<Bitfield>) -> anyhow::Result<()> {
981        let value = self.receiver.borrow().bitfield();
982        let mut old = value.clone();
983        tx.send(value).await?;
984        loop {
985            self.update_or_closed(&mut tx).await?;
986            let new = self.receiver.borrow().bitfield();
987            let diff = old.diff(&new);
988            if diff.is_empty() {
989                continue;
990            }
991            tx.send(diff).await?;
992            old = new;
993        }
994    }
995
996    async fn update_or_closed(&mut self, tx: &mut mpsc::Sender<Bitfield>) -> anyhow::Result<()> {
997        tokio::select! {
998            _ = tx.closed() => {
999                // the sender is closed, we are done
1000                Err(irpc::channel::SendError::ReceiverClosed.into())
1001            }
1002            e = self.receiver.changed() => Ok(e?),
1003        }
1004    }
1005}
1006
1007#[cfg(test)]
1008mod tests {
1009    use n0_future::StreamExt;
1010    use testresult::TestResult;
1011
1012    use super::*;
1013
1014    #[tokio::test]
1015    async fn smoke() -> TestResult<()> {
1016        let store = MemStore::new();
1017        let tt = store.add_bytes(vec![0u8; 1024 * 64]).temp_tag().await?;
1018        let hash = *tt.hash();
1019        println!("hash: {hash:?}");
1020        let mut stream = store.export_bao(hash, ChunkRanges::all()).stream();
1021        while let Some(item) = stream.next().await {
1022            println!("item: {item:?}");
1023        }
1024        let stream = store.export_bao(hash, ChunkRanges::all());
1025        let exported = stream.bao_to_vec().await?;
1026
1027        let store2 = MemStore::new();
1028        let mut or = store2.observe(hash).stream().await?;
1029        tokio::spawn(async move {
1030            while let Some(event) = or.next().await {
1031                println!("event: {event:?}");
1032            }
1033        });
1034        store2
1035            .import_bao_bytes(hash, ChunkRanges::all(), exported.clone())
1036            .await?;
1037
1038        let exported2 = store2
1039            .export_bao(hash, ChunkRanges::all())
1040            .bao_to_vec()
1041            .await?;
1042        assert_eq!(exported, exported2);
1043
1044        Ok(())
1045    }
1046}