iroh_blobs/store/
mem.rs

1//! Mutable in-memory blob store.
2//!
3//! Being a memory store, this store has to import all data into memory before it can
4//! serve it. So the amount of data you can serve is limited by your available memory.
5//! Other than that this is a fully featured store that provides all features such as
6//! tags and garbage collection.
7//!
8//! For many use cases this can be quite useful, since it does not require write access
9//! to the file system.
10use std::{
11    collections::{BTreeMap, HashMap, HashSet},
12    future::Future,
13    io::{self, Write},
14    num::NonZeroU64,
15    ops::Deref,
16    sync::Arc,
17    time::SystemTime,
18};
19
20use bao_tree::{
21    blake3,
22    io::{
23        mixed::{traverse_ranges_validated, EncodedItem, ReadBytesAt},
24        outboard::PreOrderMemOutboard,
25        sync::{Outboard, ReadAt, WriteAt},
26        BaoContentItem, EncodeError, Leaf,
27    },
28    BaoTree, ChunkNum, ChunkRanges, TreeNode,
29};
30use bytes::Bytes;
31use irpc::channel::mpsc;
32use n0_future::future::yield_now;
33use range_collections::range_set::RangeSetRange;
34use tokio::{
35    io::AsyncReadExt,
36    sync::watch,
37    task::{JoinError, JoinSet},
38};
39use tracing::{error, info, instrument, trace, Instrument};
40
41use super::util::{BaoTreeSender, PartialMemStorage};
42use crate::{
43    api::{
44        self,
45        blobs::{AddProgressItem, Bitfield, BlobStatus, ExportProgressItem},
46        proto::{
47            BatchMsg, BatchResponse, BlobDeleteRequest, BlobStatusMsg, BlobStatusRequest, Command,
48            CreateTagMsg, CreateTagRequest, CreateTempTagMsg, DeleteBlobsMsg, DeleteTagsMsg,
49            DeleteTagsRequest, ExportBaoMsg, ExportBaoRequest, ExportPathMsg, ExportPathRequest,
50            ExportRangesItem, ExportRangesMsg, ExportRangesRequest, ImportBaoMsg, ImportBaoRequest,
51            ImportByteStreamMsg, ImportByteStreamUpdate, ImportBytesMsg, ImportBytesRequest,
52            ImportPathMsg, ImportPathRequest, ListBlobsMsg, ListTagsMsg, ListTagsRequest,
53            ObserveMsg, ObserveRequest, RenameTagMsg, RenameTagRequest, Scope, SetTagMsg,
54            SetTagRequest, ShutdownMsg, SyncDbMsg, WaitIdleMsg,
55        },
56        tags::TagInfo,
57        ApiClient,
58    },
59    protocol::ChunkRangesExt,
60    store::{
61        gc::{run_gc, GcConfig},
62        util::{SizeInfo, SparseMemFile, Tag},
63        IROH_BLOCK_SIZE,
64    },
65    util::temp_tag::{TagDrop, TempTagScope, TempTags},
66    BlobFormat, Hash, HashAndFormat,
67};
68
69#[derive(Debug, Default)]
70pub struct Options {
71    pub gc_config: Option<GcConfig>,
72}
73
74#[derive(Debug, Clone)]
75#[repr(transparent)]
76pub struct MemStore {
77    client: ApiClient,
78}
79
80impl From<MemStore> for crate::api::Store {
81    fn from(value: MemStore) -> Self {
82        crate::api::Store::from_sender(value.client)
83    }
84}
85
86impl AsRef<crate::api::Store> for MemStore {
87    fn as_ref(&self) -> &crate::api::Store {
88        crate::api::Store::ref_from_sender(&self.client)
89    }
90}
91
92impl Deref for MemStore {
93    type Target = crate::api::Store;
94
95    fn deref(&self) -> &Self::Target {
96        crate::api::Store::ref_from_sender(&self.client)
97    }
98}
99
100impl Default for MemStore {
101    fn default() -> Self {
102        Self::new()
103    }
104}
105
106#[derive(derive_more::From)]
107enum TaskResult {
108    Unit(()),
109    Import(anyhow::Result<ImportEntry>),
110    Scope(Scope),
111}
112
113impl MemStore {
114    pub fn from_sender(client: ApiClient) -> Self {
115        Self { client }
116    }
117
118    pub fn new() -> Self {
119        Self::new_with_opts(Options::default())
120    }
121
122    pub fn new_with_opts(opts: Options) -> Self {
123        let (sender, receiver) = tokio::sync::mpsc::channel(32);
124        tokio::spawn(
125            Actor {
126                commands: receiver,
127                tasks: JoinSet::new(),
128                state: State {
129                    data: HashMap::new(),
130                    tags: BTreeMap::new(),
131                    empty_hash: BaoFileHandle::new_partial(Hash::EMPTY),
132                },
133                options: Arc::new(Options::default()),
134                temp_tags: Default::default(),
135                protected: Default::default(),
136                idle_waiters: Default::default(),
137            }
138            .run(),
139        );
140
141        let store = Self::from_sender(sender.into());
142        if let Some(gc_config) = opts.gc_config {
143            tokio::spawn(run_gc(store.deref().clone(), gc_config));
144        }
145
146        store
147    }
148}
149
150struct Actor {
151    commands: tokio::sync::mpsc::Receiver<Command>,
152    tasks: JoinSet<TaskResult>,
153    state: State,
154    #[allow(dead_code)]
155    options: Arc<Options>,
156    // temp tags
157    temp_tags: TempTags,
158    // idle waiters
159    idle_waiters: Vec<irpc::channel::oneshot::Sender<()>>,
160    protected: HashSet<Hash>,
161}
162
163impl Actor {
164    fn spawn<F, T>(&mut self, f: F)
165    where
166        F: Future<Output = T> + Send + 'static,
167        T: Into<TaskResult>,
168    {
169        let span = tracing::Span::current();
170        let fut = async move { f.await.into() }.instrument(span);
171        self.tasks.spawn(fut);
172    }
173
174    async fn handle_command(&mut self, cmd: Command) -> Option<ShutdownMsg> {
175        match cmd {
176            Command::ImportBao(ImportBaoMsg {
177                inner: ImportBaoRequest { hash, size },
178                rx: data,
179                tx,
180                ..
181            }) => {
182                let entry = self.get_or_create_entry(hash);
183                self.spawn(import_bao(entry, size, data, tx));
184            }
185            Command::WaitIdle(WaitIdleMsg { tx, .. }) => {
186                trace!("wait idle");
187                if self.tasks.is_empty() {
188                    // we are currently idle
189                    tx.send(()).await.ok();
190                } else {
191                    // wait for idle state
192                    self.idle_waiters.push(tx);
193                }
194            }
195            Command::Observe(ObserveMsg {
196                inner: ObserveRequest { hash },
197                tx,
198                ..
199            }) => {
200                let entry = self.get_or_create_entry(hash);
201                self.spawn(observe(entry, tx));
202            }
203            Command::ImportBytes(ImportBytesMsg {
204                inner:
205                    ImportBytesRequest {
206                        data,
207                        scope,
208                        format,
209                        ..
210                    },
211                tx,
212                ..
213            }) => {
214                self.spawn(import_bytes(data, scope, format, tx));
215            }
216            Command::ImportByteStream(ImportByteStreamMsg { inner, tx, rx, .. }) => {
217                self.spawn(import_byte_stream(inner.scope, inner.format, rx, tx));
218            }
219            Command::ImportPath(cmd) => {
220                self.spawn(import_path(cmd));
221            }
222            Command::ExportBao(ExportBaoMsg {
223                inner: ExportBaoRequest { hash, ranges },
224                tx,
225                ..
226            }) => {
227                let entry = self.get(&hash);
228                self.spawn(export_bao(entry, ranges, tx))
229            }
230            Command::ExportPath(cmd) => {
231                let entry = self.get(&cmd.hash);
232                self.spawn(export_path(entry, cmd));
233            }
234            Command::DeleteTags(cmd) => {
235                let DeleteTagsMsg {
236                    inner: DeleteTagsRequest { from, to },
237                    tx,
238                    ..
239                } = cmd;
240                info!("deleting tags from {:?} to {:?}", from, to);
241                // state.tags.remove(&from.unwrap());
242                // todo: more efficient impl
243                let mut deleted = 0;
244                self.state.tags.retain(|tag, _| {
245                    if let Some(from) = &from {
246                        if tag < from {
247                            return true;
248                        }
249                    }
250                    if let Some(to) = &to {
251                        if tag >= to {
252                            return true;
253                        }
254                    }
255                    info!("    removing {:?}", tag);
256                    deleted += 1;
257                    false
258                });
259                tx.send(Ok(deleted)).await.ok();
260            }
261            Command::RenameTag(cmd) => {
262                let RenameTagMsg {
263                    inner: RenameTagRequest { from, to },
264                    tx,
265                    ..
266                } = cmd;
267                let tags = &mut self.state.tags;
268                let value = match tags.remove(&from) {
269                    Some(value) => value,
270                    None => {
271                        tx.send(Err(api::Error::io(
272                            io::ErrorKind::NotFound,
273                            format!("tag not found: {from:?}"),
274                        )))
275                        .await
276                        .ok();
277                        return None;
278                    }
279                };
280                tags.insert(to, value);
281                tx.send(Ok(())).await.ok();
282                return None;
283            }
284            Command::ListTags(cmd) => {
285                let ListTagsMsg {
286                    inner:
287                        ListTagsRequest {
288                            from,
289                            to,
290                            raw,
291                            hash_seq,
292                        },
293                    tx,
294                    ..
295                } = cmd;
296                let tags = self
297                    .state
298                    .tags
299                    .iter()
300                    .filter(move |(tag, value)| {
301                        if let Some(from) = &from {
302                            if tag < &from {
303                                return false;
304                            }
305                        }
306                        if let Some(to) = &to {
307                            if tag >= &to {
308                                return false;
309                            }
310                        }
311                        raw && value.format.is_raw() || hash_seq && value.format.is_hash_seq()
312                    })
313                    .map(|(tag, value)| TagInfo {
314                        name: tag.clone(),
315                        hash: value.hash,
316                        format: value.format,
317                    })
318                    .map(Ok);
319                tx.send(tags.collect()).await.ok();
320            }
321            Command::SetTag(SetTagMsg {
322                inner: SetTagRequest { name: tag, value },
323                tx,
324                ..
325            }) => {
326                self.state.tags.insert(tag, value);
327                tx.send(Ok(())).await.ok();
328            }
329            Command::CreateTag(CreateTagMsg {
330                inner: CreateTagRequest { value },
331                tx,
332                ..
333            }) => {
334                let tag = Tag::auto(SystemTime::now(), |tag| self.state.tags.contains_key(tag));
335                self.state.tags.insert(tag.clone(), value);
336                tx.send(Ok(tag)).await.ok();
337            }
338            Command::CreateTempTag(cmd) => {
339                trace!("{cmd:?}");
340                self.create_temp_tag(cmd).await;
341            }
342            Command::ListTempTags(cmd) => {
343                trace!("{cmd:?}");
344                let tts = self.temp_tags.list();
345                cmd.tx.send(tts).await.ok();
346            }
347            Command::ListBlobs(cmd) => {
348                let ListBlobsMsg { tx, .. } = cmd;
349                let blobs = self.state.data.keys().cloned().collect::<Vec<Hash>>();
350                self.spawn(async move {
351                    for blob in blobs {
352                        if tx.send(Ok(blob)).await.is_err() {
353                            break;
354                        }
355                    }
356                });
357            }
358            Command::BlobStatus(cmd) => {
359                trace!("{cmd:?}");
360                let BlobStatusMsg {
361                    inner: BlobStatusRequest { hash },
362                    tx,
363                    ..
364                } = cmd;
365                let res = match self.get(&hash) {
366                    None => api::blobs::BlobStatus::NotFound,
367                    Some(x) => {
368                        let bitfield = x.0.state.borrow().bitfield();
369                        if bitfield.is_complete() {
370                            BlobStatus::Complete {
371                                size: bitfield.size,
372                            }
373                        } else {
374                            BlobStatus::Partial {
375                                size: bitfield.validated_size(),
376                            }
377                        }
378                    }
379                };
380                tx.send(res).await.ok();
381            }
382            Command::DeleteBlobs(cmd) => {
383                trace!("{cmd:?}");
384                let DeleteBlobsMsg {
385                    inner: BlobDeleteRequest { hashes, force },
386                    tx,
387                    ..
388                } = cmd;
389                for hash in hashes {
390                    if !force && self.protected.contains(&hash) {
391                        continue;
392                    }
393                    self.state.data.remove(&hash);
394                }
395                tx.send(Ok(())).await.ok();
396            }
397            Command::Batch(cmd) => {
398                trace!("{cmd:?}");
399                let (id, scope) = self.temp_tags.create_scope();
400                self.spawn(handle_batch(cmd, id, scope));
401            }
402            Command::ClearProtected(cmd) => {
403                self.protected.clear();
404                cmd.tx.send(Ok(())).await.ok();
405            }
406            Command::ExportRanges(cmd) => {
407                let entry = self.get(&cmd.hash);
408                self.spawn(export_ranges(cmd, entry));
409            }
410            Command::SyncDb(SyncDbMsg { tx, .. }) => {
411                tx.send(Ok(())).await.ok();
412            }
413            Command::Shutdown(cmd) => {
414                return Some(cmd);
415            }
416        }
417        None
418    }
419
420    fn get(&mut self, hash: &Hash) -> Option<BaoFileHandle> {
421        if *hash == Hash::EMPTY {
422            Some(self.state.empty_hash.clone())
423        } else {
424            self.state.data.get(hash).cloned()
425        }
426    }
427
428    fn get_or_create_entry(&mut self, hash: Hash) -> BaoFileHandle {
429        if hash == Hash::EMPTY {
430            self.state.empty_hash.clone()
431        } else {
432            self.state
433                .data
434                .entry(hash)
435                .or_insert_with(|| BaoFileHandle::new_partial(hash))
436                .clone()
437        }
438    }
439
440    async fn create_temp_tag(&mut self, cmd: CreateTempTagMsg) {
441        let CreateTempTagMsg { tx, inner, .. } = cmd;
442        let mut tt = self.temp_tags.create(inner.scope, inner.value);
443        if tx.is_rpc() {
444            tt.leak();
445        }
446        tx.send(tt).await.ok();
447    }
448
449    async fn finish_import(&mut self, res: anyhow::Result<ImportEntry>) {
450        let import_data = match res {
451            Ok(entry) => entry,
452            Err(e) => {
453                error!("import failed: {e}");
454                return;
455            }
456        };
457        let hash = import_data.outboard.root().into();
458        let entry = self.get_or_create_entry(hash);
459        entry
460            .0
461            .state
462            .send_if_modified(|state: &mut BaoFileStorage| {
463                let BaoFileStorage::Partial(_) = state.deref() else {
464                    return false;
465                };
466                *state =
467                    CompleteStorage::new(import_data.data, import_data.outboard.data.into()).into();
468                true
469            });
470        let tt = self.temp_tags.create(
471            import_data.scope,
472            HashAndFormat {
473                hash,
474                format: import_data.format,
475            },
476        );
477        import_data.tx.send(AddProgressItem::Done(tt)).await.ok();
478    }
479
480    fn log_task_result(&self, res: Result<TaskResult, JoinError>) -> Option<TaskResult> {
481        match res {
482            Ok(x) => Some(x),
483            Err(e) => {
484                if e.is_cancelled() {
485                    trace!("task cancelled: {e}");
486                } else {
487                    error!("task failed: {e}");
488                }
489                None
490            }
491        }
492    }
493
494    pub async fn run(mut self) {
495        let shutdown = loop {
496            tokio::select! {
497                cmd = self.commands.recv() => {
498                    let Some(cmd) = cmd else {
499                        // last sender has been dropped.
500                        // exit immediately.
501                        break None;
502                    };
503                    if let Some(cmd) = self.handle_command(cmd).await {
504                        break Some(cmd);
505                    }
506                }
507                Some(res) = self.tasks.join_next(), if !self.tasks.is_empty() => {
508                    let Some(res) = self.log_task_result(res) else {
509                        continue;
510                    };
511                    match res {
512                        TaskResult::Import(res) => {
513                            self.finish_import(res).await;
514                        }
515                        TaskResult::Scope(scope) => {
516                            self.temp_tags.end_scope(scope);
517                        }
518                        TaskResult::Unit(_) => {}
519                    }
520                    if self.tasks.is_empty() {
521                        // we are idle now
522                        for tx in self.idle_waiters.drain(..) {
523                            tx.send(()).await.ok();
524                        }
525                    }
526                }
527            }
528        };
529        if let Some(shutdown) = shutdown {
530            shutdown.tx.send(()).await.ok();
531        }
532    }
533}
534
535async fn handle_batch(cmd: BatchMsg, id: Scope, scope: Arc<TempTagScope>) -> Scope {
536    if let Err(cause) = handle_batch_impl(cmd, id, &scope).await {
537        error!("batch failed: {cause}");
538    }
539    id
540}
541
542async fn handle_batch_impl(cmd: BatchMsg, id: Scope, scope: &Arc<TempTagScope>) -> api::Result<()> {
543    let BatchMsg { tx, mut rx, .. } = cmd;
544    trace!("created scope {}", id);
545    tx.send(id).await.map_err(api::Error::other)?;
546    while let Some(msg) = rx.recv().await? {
547        match msg {
548            BatchResponse::Drop(msg) => scope.on_drop(&msg),
549            BatchResponse::Ping => {}
550        }
551    }
552    Ok(())
553}
554
555async fn export_ranges(mut cmd: ExportRangesMsg, entry: Option<BaoFileHandle>) {
556    let Some(entry) = entry else {
557        let err = io::Error::new(io::ErrorKind::NotFound, "hash not found");
558        cmd.tx.send(ExportRangesItem::Error(err.into())).await.ok();
559        return;
560    };
561    if let Err(cause) = export_ranges_impl(cmd.inner, &mut cmd.tx, entry).await {
562        cmd.tx
563            .send(ExportRangesItem::Error(cause.into()))
564            .await
565            .ok();
566    }
567}
568
569async fn export_ranges_impl(
570    cmd: ExportRangesRequest,
571    tx: &mut mpsc::Sender<ExportRangesItem>,
572    entry: BaoFileHandle,
573) -> io::Result<()> {
574    let ExportRangesRequest { ranges, hash } = cmd;
575    let bitfield = entry.bitfield();
576    trace!(
577        "exporting ranges: {hash} {ranges:?} size={}",
578        bitfield.size()
579    );
580    debug_assert!(entry.hash() == hash, "hash mismatch");
581    let data = entry.data_reader();
582    let size = bitfield.size();
583    for range in ranges.iter() {
584        let range = match range {
585            RangeSetRange::Range(range) => size.min(*range.start)..size.min(*range.end),
586            RangeSetRange::RangeFrom(range) => size.min(*range.start)..size,
587        };
588        let requested = ChunkRanges::bytes(range.start..range.end);
589        if !bitfield.ranges.is_superset(&requested) {
590            return Err(io::Error::other(format!(
591                "missing range: {requested:?}, present: {bitfield:?}",
592            )));
593        }
594        let bs = 1024;
595        let mut offset = range.start;
596        loop {
597            let end: u64 = (offset + bs).min(range.end);
598            let size = (end - offset) as usize;
599            tx.send(
600                Leaf {
601                    offset,
602                    data: data.read_bytes_at(offset, size)?,
603                }
604                .into(),
605            )
606            .await?;
607            offset = end;
608            if offset >= range.end {
609                break;
610            }
611        }
612    }
613    Ok(())
614}
615
616fn chunk_range(leaf: &Leaf) -> ChunkRanges {
617    let start = ChunkNum::chunks(leaf.offset);
618    let end = ChunkNum::chunks(leaf.offset + leaf.data.len() as u64);
619    (start..end).into()
620}
621
622async fn import_bao(
623    entry: BaoFileHandle,
624    size: NonZeroU64,
625    mut stream: mpsc::Receiver<BaoContentItem>,
626    tx: irpc::channel::oneshot::Sender<api::Result<()>>,
627) {
628    let size = size.get();
629    entry
630        .0
631        .state
632        .send_if_modified(|state: &mut BaoFileStorage| {
633            let BaoFileStorage::Partial(entry) = state else {
634                // entry was already completed, no need to write
635                return false;
636            };
637            entry.size.write(0, size);
638            false
639        });
640    let tree = BaoTree::new(size, IROH_BLOCK_SIZE);
641    while let Some(item) = stream.recv().await.unwrap() {
642        entry.0.state.send_if_modified(|state| {
643            let BaoFileStorage::Partial(partial) = state else {
644                // entry was already completed, no need to write
645                return false;
646            };
647            match item {
648                BaoContentItem::Parent(parent) => {
649                    if let Some(offset) = tree.pre_order_offset(parent.node) {
650                        let mut pair = [0u8; 64];
651                        pair[..32].copy_from_slice(parent.pair.0.as_bytes());
652                        pair[32..].copy_from_slice(parent.pair.1.as_bytes());
653                        partial
654                            .outboard
655                            .write_at(offset * 64, &pair)
656                            .expect("writing to mem can never fail");
657                    }
658                    false
659                }
660                BaoContentItem::Leaf(leaf) => {
661                    let start = leaf.offset;
662                    partial
663                        .data
664                        .write_at(start, &leaf.data)
665                        .expect("writing to mem can never fail");
666                    let added = chunk_range(&leaf);
667                    let update = partial.bitfield.update(&Bitfield::new(added.clone(), size));
668                    if update.new_state().complete {
669                        let data = std::mem::take(&mut partial.data);
670                        let outboard = std::mem::take(&mut partial.outboard);
671                        let data: Bytes = <Vec<u8>>::try_from(data).unwrap().into();
672                        let outboard: Bytes = <Vec<u8>>::try_from(outboard).unwrap().into();
673                        *state = CompleteStorage::new(data, outboard).into();
674                    }
675                    update.changed()
676                }
677            }
678        });
679    }
680    tx.send(Ok(())).await.ok();
681}
682
683#[instrument(skip_all, fields(hash = tracing::field::Empty))]
684async fn export_bao(
685    entry: Option<BaoFileHandle>,
686    ranges: ChunkRanges,
687    mut sender: mpsc::Sender<EncodedItem>,
688) {
689    let Some(entry) = entry else {
690        let err = EncodeError::Io(io::Error::new(io::ErrorKind::NotFound, "hash not found"));
691        sender.send(err.into()).await.ok();
692        return;
693    };
694    tracing::Span::current().record("hash", tracing::field::display(entry.hash));
695    let data = entry.data_reader();
696    let outboard = entry.outboard_reader();
697    let tx = BaoTreeSender::new(&mut sender);
698    traverse_ranges_validated(data, outboard, &ranges, tx)
699        .await
700        .ok();
701}
702
703#[instrument(skip_all, fields(hash = %entry.hash.fmt_short()))]
704async fn observe(entry: BaoFileHandle, tx: mpsc::Sender<api::blobs::Bitfield>) {
705    entry.subscribe().forward(tx).await.ok();
706}
707
708async fn import_bytes(
709    data: Bytes,
710    scope: Scope,
711    format: BlobFormat,
712    tx: mpsc::Sender<AddProgressItem>,
713) -> anyhow::Result<ImportEntry> {
714    tx.send(AddProgressItem::Size(data.len() as u64)).await?;
715    tx.send(AddProgressItem::CopyDone).await?;
716    let outboard = PreOrderMemOutboard::create(&data, IROH_BLOCK_SIZE);
717    Ok(ImportEntry {
718        data,
719        outboard,
720        scope,
721        format,
722        tx,
723    })
724}
725
726async fn import_byte_stream(
727    scope: Scope,
728    format: BlobFormat,
729    mut rx: mpsc::Receiver<ImportByteStreamUpdate>,
730    tx: mpsc::Sender<AddProgressItem>,
731) -> anyhow::Result<ImportEntry> {
732    let mut res = Vec::new();
733    loop {
734        match rx.recv().await {
735            Ok(Some(ImportByteStreamUpdate::Bytes(data))) => {
736                res.extend_from_slice(&data);
737                tx.send(AddProgressItem::CopyProgress(res.len() as u64))
738                    .await?;
739            }
740            Ok(Some(ImportByteStreamUpdate::Done)) => {
741                break;
742            }
743            Ok(None) => {
744                return Err(api::Error::io(
745                    io::ErrorKind::UnexpectedEof,
746                    "byte stream ended unexpectedly",
747                )
748                .into());
749            }
750            Err(e) => {
751                return Err(e.into());
752            }
753        }
754    }
755    import_bytes(res.into(), scope, format, tx).await
756}
757
758#[instrument(skip_all, fields(path = %cmd.path.display()))]
759async fn import_path(cmd: ImportPathMsg) -> anyhow::Result<ImportEntry> {
760    let ImportPathMsg {
761        inner:
762            ImportPathRequest {
763                path,
764                scope,
765                format,
766                ..
767            },
768        tx,
769        ..
770    } = cmd;
771    let mut res = Vec::new();
772    let mut file = tokio::fs::File::open(path).await?;
773    let mut buf = [0u8; 1024 * 64];
774    loop {
775        let size = file.read(&mut buf).await?;
776        if size == 0 {
777            break;
778        }
779        res.extend_from_slice(&buf[..size]);
780        tx.send(AddProgressItem::CopyProgress(res.len() as u64))
781            .await?;
782    }
783    import_bytes(res.into(), scope, format, tx).await
784}
785
786#[instrument(skip_all, fields(hash = %cmd.hash.fmt_short(), path = %cmd.target.display()))]
787async fn export_path(entry: Option<BaoFileHandle>, cmd: ExportPathMsg) {
788    let ExportPathMsg { inner, mut tx, .. } = cmd;
789    let Some(entry) = entry else {
790        tx.send(ExportProgressItem::Error(api::Error::io(
791            io::ErrorKind::NotFound,
792            "hash not found",
793        )))
794        .await
795        .ok();
796        return;
797    };
798    match export_path_impl(entry, inner, &mut tx).await {
799        Ok(()) => tx.send(ExportProgressItem::Done).await.ok(),
800        Err(e) => tx.send(ExportProgressItem::Error(e.into())).await.ok(),
801    };
802}
803
804async fn export_path_impl(
805    entry: BaoFileHandle,
806    cmd: ExportPathRequest,
807    tx: &mut mpsc::Sender<ExportProgressItem>,
808) -> io::Result<()> {
809    let ExportPathRequest { target, .. } = cmd;
810    if !target.is_absolute() {
811        return Err(io::Error::new(
812            io::ErrorKind::InvalidInput,
813            "path is not absolute",
814        ));
815    }
816    if let Some(parent) = target.parent() {
817        std::fs::create_dir_all(parent)?;
818    }
819    // todo: for partial entries make sure to only write the part that is actually present
820    let mut file = std::fs::File::create(target)?;
821    let size = entry.0.state.borrow().size();
822    tx.send(ExportProgressItem::Size(size)).await?;
823    let mut buf = [0u8; 1024 * 64];
824    for offset in (0..size).step_by(1024 * 64) {
825        let len = std::cmp::min(size - offset, 1024 * 64) as usize;
826        let buf = &mut buf[..len];
827        entry.0.state.borrow().data().read_exact_at(offset, buf)?;
828        file.write_all(buf)?;
829        tx.try_send(ExportProgressItem::CopyProgress(offset))
830            .await
831            .map_err(|_e| io::Error::other(""))?;
832        yield_now().await;
833    }
834    Ok(())
835}
836
837struct ImportEntry {
838    scope: Scope,
839    format: BlobFormat,
840    data: Bytes,
841    outboard: PreOrderMemOutboard,
842    tx: mpsc::Sender<AddProgressItem>,
843}
844
845pub struct DataReader(BaoFileHandle);
846
847impl ReadBytesAt for DataReader {
848    fn read_bytes_at(&self, offset: u64, size: usize) -> std::io::Result<Bytes> {
849        let entry = self.0 .0.state.borrow();
850        entry.data().read_bytes_at(offset, size)
851    }
852}
853
854pub struct OutboardReader {
855    hash: blake3::Hash,
856    tree: BaoTree,
857    data: BaoFileHandle,
858}
859
860impl Outboard for OutboardReader {
861    fn root(&self) -> blake3::Hash {
862        self.hash
863    }
864
865    fn tree(&self) -> BaoTree {
866        self.tree
867    }
868
869    fn load(&self, node: TreeNode) -> io::Result<Option<(blake3::Hash, blake3::Hash)>> {
870        let Some(offset) = self.tree.pre_order_offset(node) else {
871            return Ok(None);
872        };
873        let mut buf = [0u8; 64];
874        let size = self
875            .data
876            .0
877            .state
878            .borrow()
879            .outboard()
880            .read_at(offset * 64, &mut buf)?;
881        if size != 64 {
882            return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "short read"));
883        }
884        let left: [u8; 32] = buf[..32].try_into().unwrap();
885        let right: [u8; 32] = buf[32..].try_into().unwrap();
886        Ok(Some((left.into(), right.into())))
887    }
888}
889
890struct State {
891    data: HashMap<Hash, BaoFileHandle>,
892    tags: BTreeMap<Tag, HashAndFormat>,
893    empty_hash: BaoFileHandle,
894}
895
896#[derive(Debug, derive_more::From)]
897pub enum BaoFileStorage {
898    Partial(PartialMemStorage),
899    Complete(CompleteStorage),
900}
901
902impl BaoFileStorage {
903    /// Get the bitfield of the storage.
904    pub fn bitfield(&self) -> Bitfield {
905        match self {
906            Self::Partial(entry) => entry.bitfield.clone(),
907            Self::Complete(entry) => Bitfield::complete(entry.size()),
908        }
909    }
910}
911
912#[derive(Debug)]
913pub struct BaoFileHandleInner {
914    state: watch::Sender<BaoFileStorage>,
915    hash: Hash,
916}
917
918/// A cheaply cloneable handle to a bao file, including the hash
919#[derive(Debug, Clone, derive_more::Deref)]
920pub struct BaoFileHandle(Arc<BaoFileHandleInner>);
921
922impl BaoFileHandle {
923    pub fn new_partial(hash: Hash) -> Self {
924        let (state, _) = watch::channel(BaoFileStorage::Partial(PartialMemStorage {
925            data: SparseMemFile::new(),
926            outboard: SparseMemFile::new(),
927            size: SizeInfo::default(),
928            bitfield: Bitfield::empty(),
929        }));
930        Self(Arc::new(BaoFileHandleInner { state, hash }))
931    }
932
933    pub fn hash(&self) -> Hash {
934        self.hash
935    }
936
937    pub fn bitfield(&self) -> Bitfield {
938        self.0.state.borrow().bitfield()
939    }
940
941    pub fn subscribe(&self) -> BaoFileStorageSubscriber {
942        BaoFileStorageSubscriber::new(self.0.state.subscribe())
943    }
944
945    pub fn data_reader(&self) -> DataReader {
946        DataReader(self.clone())
947    }
948
949    pub fn outboard_reader(&self) -> OutboardReader {
950        let entry = self.0.state.borrow();
951        let hash = self.hash.into();
952        let tree = BaoTree::new(entry.size(), IROH_BLOCK_SIZE);
953        OutboardReader {
954            hash,
955            tree,
956            data: self.clone(),
957        }
958    }
959}
960
961impl Default for BaoFileStorage {
962    fn default() -> Self {
963        Self::Partial(Default::default())
964    }
965}
966
967impl BaoFileStorage {
968    fn data(&self) -> &[u8] {
969        match self {
970            Self::Partial(entry) => entry.data.as_ref(),
971            Self::Complete(entry) => &entry.data,
972        }
973    }
974
975    fn outboard(&self) -> &[u8] {
976        match self {
977            Self::Partial(entry) => entry.outboard.as_ref(),
978            Self::Complete(entry) => &entry.outboard,
979        }
980    }
981
982    fn size(&self) -> u64 {
983        match self {
984            Self::Partial(entry) => entry.current_size(),
985            Self::Complete(entry) => entry.size(),
986        }
987    }
988}
989
990#[derive(Debug, Clone)]
991pub struct CompleteStorage {
992    pub(crate) data: Bytes,
993    pub(crate) outboard: Bytes,
994}
995
996impl CompleteStorage {
997    pub fn create(data: Bytes) -> (Hash, Self) {
998        let outboard = PreOrderMemOutboard::create(&data, IROH_BLOCK_SIZE);
999        let hash = outboard.root().into();
1000        let outboard = outboard.data.into();
1001        let entry = Self::new(data, outboard);
1002        (hash, entry)
1003    }
1004
1005    pub fn new(data: Bytes, outboard: Bytes) -> Self {
1006        Self { data, outboard }
1007    }
1008
1009    pub fn size(&self) -> u64 {
1010        self.data.len() as u64
1011    }
1012}
1013
1014#[allow(dead_code)]
1015fn print_outboard(hashes: &[u8]) {
1016    assert!(hashes.len() % 64 == 0);
1017    for chunk in hashes.chunks(64) {
1018        let left: [u8; 32] = chunk[..32].try_into().unwrap();
1019        let right: [u8; 32] = chunk[32..].try_into().unwrap();
1020        let left = blake3::Hash::from(left);
1021        let right = blake3::Hash::from(right);
1022        println!("l: {left:?}, r: {right:?}");
1023    }
1024}
1025
1026pub struct BaoFileStorageSubscriber {
1027    receiver: watch::Receiver<BaoFileStorage>,
1028}
1029
1030impl BaoFileStorageSubscriber {
1031    pub fn new(receiver: watch::Receiver<BaoFileStorage>) -> Self {
1032        Self { receiver }
1033    }
1034
1035    /// Forward observed *values* to the given sender
1036    ///
1037    /// Returns an error if sending fails, or if the last sender is dropped
1038    pub async fn forward(mut self, mut tx: mpsc::Sender<Bitfield>) -> anyhow::Result<()> {
1039        let value = self.receiver.borrow().bitfield();
1040        tx.send(value).await?;
1041        loop {
1042            self.update_or_closed(&mut tx).await?;
1043            let value = self.receiver.borrow().bitfield();
1044            tx.send(value.clone()).await?;
1045        }
1046    }
1047
1048    /// Forward observed *deltas* to the given sender
1049    ///
1050    /// Returns an error if sending fails, or if the last sender is dropped
1051    #[allow(dead_code)]
1052    pub async fn forward_delta(mut self, mut tx: mpsc::Sender<Bitfield>) -> anyhow::Result<()> {
1053        let value = self.receiver.borrow().bitfield();
1054        let mut old = value.clone();
1055        tx.send(value).await?;
1056        loop {
1057            self.update_or_closed(&mut tx).await?;
1058            let new = self.receiver.borrow().bitfield();
1059            let diff = old.diff(&new);
1060            if diff.is_empty() {
1061                continue;
1062            }
1063            tx.send(diff).await?;
1064            old = new;
1065        }
1066    }
1067
1068    async fn update_or_closed(&mut self, tx: &mut mpsc::Sender<Bitfield>) -> anyhow::Result<()> {
1069        tokio::select! {
1070            _ = tx.closed() => {
1071                // the sender is closed, we are done
1072                Err(irpc::channel::SendError::ReceiverClosed.into())
1073            }
1074            e = self.receiver.changed() => Ok(e?),
1075        }
1076    }
1077}
1078
1079#[cfg(test)]
1080mod tests {
1081    use n0_future::StreamExt;
1082    use testresult::TestResult;
1083
1084    use super::*;
1085
1086    #[tokio::test]
1087    async fn smoke() -> TestResult<()> {
1088        let store = MemStore::new();
1089        let tt = store.add_bytes(vec![0u8; 1024 * 64]).temp_tag().await?;
1090        let hash = tt.hash();
1091        println!("hash: {hash:?}");
1092        let mut stream = store.export_bao(hash, ChunkRanges::all()).stream();
1093        while let Some(item) = stream.next().await {
1094            println!("item: {item:?}");
1095        }
1096        let stream = store.export_bao(hash, ChunkRanges::all());
1097        let exported = stream.bao_to_vec().await?;
1098
1099        let store2 = MemStore::new();
1100        let mut or = store2.observe(hash).stream().await?;
1101        tokio::spawn(async move {
1102            while let Some(event) = or.next().await {
1103                println!("event: {event:?}");
1104            }
1105        });
1106        store2
1107            .import_bao_bytes(hash, ChunkRanges::all(), exported.clone())
1108            .await?;
1109
1110        let exported2 = store2
1111            .export_bao(hash, ChunkRanges::all())
1112            .bao_to_vec()
1113            .await?;
1114        assert_eq!(exported, exported2);
1115
1116        Ok(())
1117    }
1118}