iroh_blobs/store/
mem.rs

1//! Mutable in-memory blob store.
2//!
3//! Being a memory store, this store has to import all data into memory before it can
4//! serve it. So the amount of data you can serve is limited by your available memory.
5//! Other than that this is a fully featured store that provides all features such as
6//! tags and garbage collection.
7//!
8//! For many use cases this can be quite useful, since it does not require write access
9//! to the file system.
10use std::{
11    collections::{BTreeMap, HashMap, HashSet},
12    future::Future,
13    io::{self, Write},
14    num::NonZeroU64,
15    ops::Deref,
16    sync::Arc,
17    time::SystemTime,
18};
19
20use bao_tree::{
21    blake3,
22    io::{
23        mixed::{traverse_ranges_validated, EncodedItem, ReadBytesAt},
24        outboard::PreOrderMemOutboard,
25        sync::{Outboard, ReadAt, WriteAt},
26        BaoContentItem, EncodeError, Leaf,
27    },
28    BaoTree, ChunkNum, ChunkRanges, TreeNode,
29};
30use bytes::Bytes;
31use irpc::channel::mpsc;
32use n0_future::future::yield_now;
33use range_collections::range_set::RangeSetRange;
34use tokio::{
35    io::AsyncReadExt,
36    sync::watch,
37    task::{JoinError, JoinSet},
38};
39use tracing::{error, info, instrument, trace, Instrument};
40
41use super::util::{BaoTreeSender, PartialMemStorage};
42use crate::{
43    api::{
44        self,
45        blobs::{AddProgressItem, Bitfield, BlobStatus, ExportProgressItem},
46        proto::{
47            BatchMsg, BatchResponse, BlobDeleteRequest, BlobStatusMsg, BlobStatusRequest, Command,
48            CreateTagMsg, CreateTagRequest, CreateTempTagMsg, DeleteBlobsMsg, DeleteTagsMsg,
49            DeleteTagsRequest, ExportBaoMsg, ExportBaoRequest, ExportPathMsg, ExportPathRequest,
50            ExportRangesItem, ExportRangesMsg, ExportRangesRequest, ImportBaoMsg, ImportBaoRequest,
51            ImportByteStreamMsg, ImportByteStreamUpdate, ImportBytesMsg, ImportBytesRequest,
52            ImportPathMsg, ImportPathRequest, ListBlobsMsg, ListTagsMsg, ListTagsRequest,
53            ObserveMsg, ObserveRequest, RenameTagMsg, RenameTagRequest, Scope, SetTagMsg,
54            SetTagRequest, ShutdownMsg, SyncDbMsg, WaitIdleMsg,
55        },
56        tags::TagInfo,
57        ApiClient,
58    },
59    protocol::ChunkRangesExt,
60    store::{
61        util::{SizeInfo, SparseMemFile, Tag},
62        IROH_BLOCK_SIZE,
63    },
64    util::temp_tag::{TagDrop, TempTagScope, TempTags},
65    BlobFormat, Hash, HashAndFormat,
66};
67
68#[derive(Debug, Default)]
69pub struct Options {}
70
71#[derive(Debug, Clone)]
72#[repr(transparent)]
73pub struct MemStore {
74    client: ApiClient,
75}
76
77impl AsRef<crate::api::Store> for MemStore {
78    fn as_ref(&self) -> &crate::api::Store {
79        crate::api::Store::ref_from_sender(&self.client)
80    }
81}
82
83impl Deref for MemStore {
84    type Target = crate::api::Store;
85
86    fn deref(&self) -> &Self::Target {
87        crate::api::Store::ref_from_sender(&self.client)
88    }
89}
90
91impl Default for MemStore {
92    fn default() -> Self {
93        Self::new()
94    }
95}
96
97#[derive(derive_more::From)]
98enum TaskResult {
99    Unit(()),
100    Import(anyhow::Result<ImportEntry>),
101    Scope(Scope),
102}
103
104impl MemStore {
105    pub fn from_sender(client: ApiClient) -> Self {
106        Self { client }
107    }
108
109    pub fn new() -> Self {
110        let (sender, receiver) = tokio::sync::mpsc::channel(32);
111        tokio::spawn(
112            Actor {
113                commands: receiver,
114                tasks: JoinSet::new(),
115                state: State {
116                    data: HashMap::new(),
117                    tags: BTreeMap::new(),
118                    empty_hash: BaoFileHandle::new_partial(Hash::EMPTY),
119                },
120                options: Arc::new(Options::default()),
121                temp_tags: Default::default(),
122                protected: Default::default(),
123                idle_waiters: Default::default(),
124            }
125            .run(),
126        );
127        Self::from_sender(sender.into())
128    }
129}
130
131struct Actor {
132    commands: tokio::sync::mpsc::Receiver<Command>,
133    tasks: JoinSet<TaskResult>,
134    state: State,
135    #[allow(dead_code)]
136    options: Arc<Options>,
137    // temp tags
138    temp_tags: TempTags,
139    // idle waiters
140    idle_waiters: Vec<irpc::channel::oneshot::Sender<()>>,
141    protected: HashSet<Hash>,
142}
143
144impl Actor {
145    fn spawn<F, T>(&mut self, f: F)
146    where
147        F: Future<Output = T> + Send + 'static,
148        T: Into<TaskResult>,
149    {
150        let span = tracing::Span::current();
151        let fut = async move { f.await.into() }.instrument(span);
152        self.tasks.spawn(fut);
153    }
154
155    async fn handle_command(&mut self, cmd: Command) -> Option<ShutdownMsg> {
156        match cmd {
157            Command::ImportBao(ImportBaoMsg {
158                inner: ImportBaoRequest { hash, size },
159                rx: data,
160                tx,
161                ..
162            }) => {
163                let entry = self.get_or_create_entry(hash);
164                self.spawn(import_bao(entry, size, data, tx));
165            }
166            Command::WaitIdle(WaitIdleMsg { tx, .. }) => {
167                trace!("wait idle");
168                if self.tasks.is_empty() {
169                    // we are currently idle
170                    tx.send(()).await.ok();
171                } else {
172                    // wait for idle state
173                    self.idle_waiters.push(tx);
174                }
175            }
176            Command::Observe(ObserveMsg {
177                inner: ObserveRequest { hash },
178                tx,
179                ..
180            }) => {
181                let entry = self.get_or_create_entry(hash);
182                self.spawn(observe(entry, tx));
183            }
184            Command::ImportBytes(ImportBytesMsg {
185                inner:
186                    ImportBytesRequest {
187                        data,
188                        scope,
189                        format,
190                        ..
191                    },
192                tx,
193                ..
194            }) => {
195                self.spawn(import_bytes(data, scope, format, tx));
196            }
197            Command::ImportByteStream(ImportByteStreamMsg { inner, tx, rx, .. }) => {
198                self.spawn(import_byte_stream(inner.scope, inner.format, rx, tx));
199            }
200            Command::ImportPath(cmd) => {
201                self.spawn(import_path(cmd));
202            }
203            Command::ExportBao(ExportBaoMsg {
204                inner: ExportBaoRequest { hash, ranges },
205                tx,
206                ..
207            }) => {
208                let entry = self.get(&hash);
209                self.spawn(export_bao(entry, ranges, tx))
210            }
211            Command::ExportPath(cmd) => {
212                let entry = self.get(&cmd.hash);
213                self.spawn(export_path(entry, cmd));
214            }
215            Command::DeleteTags(cmd) => {
216                let DeleteTagsMsg {
217                    inner: DeleteTagsRequest { from, to },
218                    tx,
219                    ..
220                } = cmd;
221                info!("deleting tags from {:?} to {:?}", from, to);
222                // state.tags.remove(&from.unwrap());
223                // todo: more efficient impl
224                self.state.tags.retain(|tag, _| {
225                    if let Some(from) = &from {
226                        if tag < from {
227                            return true;
228                        }
229                    }
230                    if let Some(to) = &to {
231                        if tag >= to {
232                            return true;
233                        }
234                    }
235                    info!("    removing {:?}", tag);
236                    false
237                });
238                tx.send(Ok(())).await.ok();
239            }
240            Command::RenameTag(cmd) => {
241                let RenameTagMsg {
242                    inner: RenameTagRequest { from, to },
243                    tx,
244                    ..
245                } = cmd;
246                let tags = &mut self.state.tags;
247                let value = match tags.remove(&from) {
248                    Some(value) => value,
249                    None => {
250                        tx.send(Err(api::Error::io(
251                            io::ErrorKind::NotFound,
252                            format!("tag not found: {from:?}"),
253                        )))
254                        .await
255                        .ok();
256                        return None;
257                    }
258                };
259                tags.insert(to, value);
260                tx.send(Ok(())).await.ok();
261                return None;
262            }
263            Command::ListTags(cmd) => {
264                let ListTagsMsg {
265                    inner:
266                        ListTagsRequest {
267                            from,
268                            to,
269                            raw,
270                            hash_seq,
271                        },
272                    tx,
273                    ..
274                } = cmd;
275                let tags = self
276                    .state
277                    .tags
278                    .iter()
279                    .filter(move |(tag, value)| {
280                        if let Some(from) = &from {
281                            if tag < &from {
282                                return false;
283                            }
284                        }
285                        if let Some(to) = &to {
286                            if tag >= &to {
287                                return false;
288                            }
289                        }
290                        raw && value.format.is_raw() || hash_seq && value.format.is_hash_seq()
291                    })
292                    .map(|(tag, value)| TagInfo {
293                        name: tag.clone(),
294                        hash: value.hash,
295                        format: value.format,
296                    })
297                    .map(Ok);
298                tx.send(tags.collect()).await.ok();
299            }
300            Command::SetTag(SetTagMsg {
301                inner: SetTagRequest { name: tag, value },
302                tx,
303                ..
304            }) => {
305                self.state.tags.insert(tag, value);
306                tx.send(Ok(())).await.ok();
307            }
308            Command::CreateTag(CreateTagMsg {
309                inner: CreateTagRequest { value },
310                tx,
311                ..
312            }) => {
313                let tag = Tag::auto(SystemTime::now(), |tag| self.state.tags.contains_key(tag));
314                self.state.tags.insert(tag.clone(), value);
315                tx.send(Ok(tag)).await.ok();
316            }
317            Command::CreateTempTag(cmd) => {
318                trace!("{cmd:?}");
319                self.create_temp_tag(cmd).await;
320            }
321            Command::ListTempTags(cmd) => {
322                trace!("{cmd:?}");
323                let tts = self.temp_tags.list();
324                cmd.tx.send(tts).await.ok();
325            }
326            Command::ListBlobs(cmd) => {
327                let ListBlobsMsg { tx, .. } = cmd;
328                let blobs = self.state.data.keys().cloned().collect::<Vec<Hash>>();
329                self.spawn(async move {
330                    for blob in blobs {
331                        if tx.send(Ok(blob)).await.is_err() {
332                            break;
333                        }
334                    }
335                });
336            }
337            Command::BlobStatus(cmd) => {
338                trace!("{cmd:?}");
339                let BlobStatusMsg {
340                    inner: BlobStatusRequest { hash },
341                    tx,
342                    ..
343                } = cmd;
344                let res = match self.get(&hash) {
345                    None => api::blobs::BlobStatus::NotFound,
346                    Some(x) => {
347                        let bitfield = x.0.state.borrow().bitfield();
348                        if bitfield.is_complete() {
349                            BlobStatus::Complete {
350                                size: bitfield.size,
351                            }
352                        } else {
353                            BlobStatus::Partial {
354                                size: bitfield.validated_size(),
355                            }
356                        }
357                    }
358                };
359                tx.send(res).await.ok();
360            }
361            Command::DeleteBlobs(cmd) => {
362                trace!("{cmd:?}");
363                let DeleteBlobsMsg {
364                    inner: BlobDeleteRequest { hashes, force },
365                    tx,
366                    ..
367                } = cmd;
368                for hash in hashes {
369                    if !force && self.protected.contains(&hash) {
370                        continue;
371                    }
372                    self.state.data.remove(&hash);
373                }
374                tx.send(Ok(())).await.ok();
375            }
376            Command::Batch(cmd) => {
377                trace!("{cmd:?}");
378                let (id, scope) = self.temp_tags.create_scope();
379                self.spawn(handle_batch(cmd, id, scope));
380            }
381            Command::ClearProtected(cmd) => {
382                self.protected.clear();
383                cmd.tx.send(Ok(())).await.ok();
384            }
385            Command::ExportRanges(cmd) => {
386                let entry = self.get(&cmd.hash);
387                self.spawn(export_ranges(cmd, entry));
388            }
389            Command::SyncDb(SyncDbMsg { tx, .. }) => {
390                tx.send(Ok(())).await.ok();
391            }
392            Command::Shutdown(cmd) => {
393                return Some(cmd);
394            }
395        }
396        None
397    }
398
399    fn get(&mut self, hash: &Hash) -> Option<BaoFileHandle> {
400        if *hash == Hash::EMPTY {
401            Some(self.state.empty_hash.clone())
402        } else {
403            self.state.data.get(hash).cloned()
404        }
405    }
406
407    fn get_or_create_entry(&mut self, hash: Hash) -> BaoFileHandle {
408        if hash == Hash::EMPTY {
409            self.state.empty_hash.clone()
410        } else {
411            self.state
412                .data
413                .entry(hash)
414                .or_insert_with(|| BaoFileHandle::new_partial(hash))
415                .clone()
416        }
417    }
418
419    async fn create_temp_tag(&mut self, cmd: CreateTempTagMsg) {
420        let CreateTempTagMsg { tx, inner, .. } = cmd;
421        let mut tt = self.temp_tags.create(inner.scope, inner.value);
422        if tx.is_rpc() {
423            tt.leak();
424        }
425        tx.send(tt).await.ok();
426    }
427
428    async fn finish_import(&mut self, res: anyhow::Result<ImportEntry>) {
429        let import_data = match res {
430            Ok(entry) => entry,
431            Err(e) => {
432                error!("import failed: {e}");
433                return;
434            }
435        };
436        let hash = import_data.outboard.root().into();
437        let entry = self.get_or_create_entry(hash);
438        entry
439            .0
440            .state
441            .send_if_modified(|state: &mut BaoFileStorage| {
442                let BaoFileStorage::Partial(_) = state.deref() else {
443                    return false;
444                };
445                *state =
446                    CompleteStorage::new(import_data.data, import_data.outboard.data.into()).into();
447                true
448            });
449        let tt = self.temp_tags.create(
450            import_data.scope,
451            HashAndFormat {
452                hash,
453                format: import_data.format,
454            },
455        );
456        import_data.tx.send(AddProgressItem::Done(tt)).await.ok();
457    }
458
459    fn log_task_result(&self, res: Result<TaskResult, JoinError>) -> Option<TaskResult> {
460        match res {
461            Ok(x) => Some(x),
462            Err(e) => {
463                if e.is_cancelled() {
464                    trace!("task cancelled: {e}");
465                } else {
466                    error!("task failed: {e}");
467                }
468                None
469            }
470        }
471    }
472
473    pub async fn run(mut self) {
474        let shutdown = loop {
475            tokio::select! {
476                cmd = self.commands.recv() => {
477                    let Some(cmd) = cmd else {
478                        // last sender has been dropped.
479                        // exit immediately.
480                        break None;
481                    };
482                    if let Some(cmd) = self.handle_command(cmd).await {
483                        break Some(cmd);
484                    }
485                }
486                Some(res) = self.tasks.join_next(), if !self.tasks.is_empty() => {
487                    let Some(res) = self.log_task_result(res) else {
488                        continue;
489                    };
490                    match res {
491                        TaskResult::Import(res) => {
492                            self.finish_import(res).await;
493                        }
494                        TaskResult::Scope(scope) => {
495                            self.temp_tags.end_scope(scope);
496                        }
497                        TaskResult::Unit(_) => {}
498                    }
499                    if self.tasks.is_empty() {
500                        // we are idle now
501                        for tx in self.idle_waiters.drain(..) {
502                            tx.send(()).await.ok();
503                        }
504                    }
505                }
506            }
507        };
508        if let Some(shutdown) = shutdown {
509            shutdown.tx.send(()).await.ok();
510        }
511    }
512}
513
514async fn handle_batch(cmd: BatchMsg, id: Scope, scope: Arc<TempTagScope>) -> Scope {
515    if let Err(cause) = handle_batch_impl(cmd, id, &scope).await {
516        error!("batch failed: {cause}");
517    }
518    id
519}
520
521async fn handle_batch_impl(cmd: BatchMsg, id: Scope, scope: &Arc<TempTagScope>) -> api::Result<()> {
522    let BatchMsg { tx, mut rx, .. } = cmd;
523    trace!("created scope {}", id);
524    tx.send(id).await.map_err(api::Error::other)?;
525    while let Some(msg) = rx.recv().await? {
526        match msg {
527            BatchResponse::Drop(msg) => scope.on_drop(&msg),
528            BatchResponse::Ping => {}
529        }
530    }
531    Ok(())
532}
533
534async fn export_ranges(mut cmd: ExportRangesMsg, entry: Option<BaoFileHandle>) {
535    let Some(entry) = entry else {
536        let err = io::Error::new(io::ErrorKind::NotFound, "hash not found");
537        cmd.tx.send(ExportRangesItem::Error(err.into())).await.ok();
538        return;
539    };
540    if let Err(cause) = export_ranges_impl(cmd.inner, &mut cmd.tx, entry).await {
541        cmd.tx
542            .send(ExportRangesItem::Error(cause.into()))
543            .await
544            .ok();
545    }
546}
547
548async fn export_ranges_impl(
549    cmd: ExportRangesRequest,
550    tx: &mut mpsc::Sender<ExportRangesItem>,
551    entry: BaoFileHandle,
552) -> io::Result<()> {
553    let ExportRangesRequest { ranges, hash } = cmd;
554    let bitfield = entry.bitfield();
555    trace!(
556        "exporting ranges: {hash} {ranges:?} size={}",
557        bitfield.size()
558    );
559    debug_assert!(entry.hash() == hash, "hash mismatch");
560    let data = entry.data_reader();
561    let size = bitfield.size();
562    for range in ranges.iter() {
563        let range = match range {
564            RangeSetRange::Range(range) => size.min(*range.start)..size.min(*range.end),
565            RangeSetRange::RangeFrom(range) => size.min(*range.start)..size,
566        };
567        let requested = ChunkRanges::bytes(range.start..range.end);
568        if !bitfield.ranges.is_superset(&requested) {
569            return Err(io::Error::other(format!(
570                "missing range: {requested:?}, present: {bitfield:?}",
571            )));
572        }
573        let bs = 1024;
574        let mut offset = range.start;
575        loop {
576            let end: u64 = (offset + bs).min(range.end);
577            let size = (end - offset) as usize;
578            tx.send(
579                Leaf {
580                    offset,
581                    data: data.read_bytes_at(offset, size)?,
582                }
583                .into(),
584            )
585            .await?;
586            offset = end;
587            if offset >= range.end {
588                break;
589            }
590        }
591    }
592    Ok(())
593}
594
595fn chunk_range(leaf: &Leaf) -> ChunkRanges {
596    let start = ChunkNum::chunks(leaf.offset);
597    let end = ChunkNum::chunks(leaf.offset + leaf.data.len() as u64);
598    (start..end).into()
599}
600
601async fn import_bao(
602    entry: BaoFileHandle,
603    size: NonZeroU64,
604    mut stream: mpsc::Receiver<BaoContentItem>,
605    tx: irpc::channel::oneshot::Sender<api::Result<()>>,
606) {
607    let size = size.get();
608    entry
609        .0
610        .state
611        .send_if_modified(|state: &mut BaoFileStorage| {
612            let BaoFileStorage::Partial(entry) = state else {
613                // entry was already completed, no need to write
614                return false;
615            };
616            entry.size.write(0, size);
617            false
618        });
619    let tree = BaoTree::new(size, IROH_BLOCK_SIZE);
620    while let Some(item) = stream.recv().await.unwrap() {
621        entry.0.state.send_if_modified(|state| {
622            let BaoFileStorage::Partial(partial) = state else {
623                // entry was already completed, no need to write
624                return false;
625            };
626            match item {
627                BaoContentItem::Parent(parent) => {
628                    if let Some(offset) = tree.pre_order_offset(parent.node) {
629                        let mut pair = [0u8; 64];
630                        pair[..32].copy_from_slice(parent.pair.0.as_bytes());
631                        pair[32..].copy_from_slice(parent.pair.1.as_bytes());
632                        partial
633                            .outboard
634                            .write_at(offset * 64, &pair)
635                            .expect("writing to mem can never fail");
636                    }
637                    false
638                }
639                BaoContentItem::Leaf(leaf) => {
640                    let start = leaf.offset;
641                    partial
642                        .data
643                        .write_at(start, &leaf.data)
644                        .expect("writing to mem can never fail");
645                    let added = chunk_range(&leaf);
646                    let update = partial.bitfield.update(&Bitfield::new(added.clone(), size));
647                    if update.new_state().complete {
648                        let data = std::mem::take(&mut partial.data);
649                        let outboard = std::mem::take(&mut partial.outboard);
650                        let data: Bytes = <Vec<u8>>::try_from(data).unwrap().into();
651                        let outboard: Bytes = <Vec<u8>>::try_from(outboard).unwrap().into();
652                        *state = CompleteStorage::new(data, outboard).into();
653                    }
654                    update.changed()
655                }
656            }
657        });
658    }
659    tx.send(Ok(())).await.ok();
660}
661
662#[instrument(skip_all, fields(hash = tracing::field::Empty))]
663async fn export_bao(
664    entry: Option<BaoFileHandle>,
665    ranges: ChunkRanges,
666    mut sender: mpsc::Sender<EncodedItem>,
667) {
668    let Some(entry) = entry else {
669        let err = EncodeError::Io(io::Error::new(io::ErrorKind::NotFound, "hash not found"));
670        sender.send(err.into()).await.ok();
671        return;
672    };
673    tracing::Span::current().record("hash", tracing::field::display(entry.hash));
674    let data = entry.data_reader();
675    let outboard = entry.outboard_reader();
676    let tx = BaoTreeSender::new(&mut sender);
677    traverse_ranges_validated(data, outboard, &ranges, tx)
678        .await
679        .ok();
680}
681
682#[instrument(skip_all, fields(hash = %entry.hash.fmt_short()))]
683async fn observe(entry: BaoFileHandle, tx: mpsc::Sender<api::blobs::Bitfield>) {
684    entry.subscribe().forward(tx).await.ok();
685}
686
687async fn import_bytes(
688    data: Bytes,
689    scope: Scope,
690    format: BlobFormat,
691    tx: mpsc::Sender<AddProgressItem>,
692) -> anyhow::Result<ImportEntry> {
693    tx.send(AddProgressItem::Size(data.len() as u64)).await?;
694    tx.send(AddProgressItem::CopyDone).await?;
695    let outboard = PreOrderMemOutboard::create(&data, IROH_BLOCK_SIZE);
696    Ok(ImportEntry {
697        data,
698        outboard,
699        scope,
700        format,
701        tx,
702    })
703}
704
705async fn import_byte_stream(
706    scope: Scope,
707    format: BlobFormat,
708    mut rx: mpsc::Receiver<ImportByteStreamUpdate>,
709    tx: mpsc::Sender<AddProgressItem>,
710) -> anyhow::Result<ImportEntry> {
711    let mut res = Vec::new();
712    loop {
713        match rx.recv().await {
714            Ok(Some(ImportByteStreamUpdate::Bytes(data))) => {
715                res.extend_from_slice(&data);
716                tx.send(AddProgressItem::CopyProgress(res.len() as u64))
717                    .await?;
718            }
719            Ok(Some(ImportByteStreamUpdate::Done)) => {
720                break;
721            }
722            Ok(None) => {
723                return Err(api::Error::io(
724                    io::ErrorKind::UnexpectedEof,
725                    "byte stream ended unexpectedly",
726                )
727                .into());
728            }
729            Err(e) => {
730                return Err(e.into());
731            }
732        }
733    }
734    import_bytes(res.into(), scope, format, tx).await
735}
736
737#[instrument(skip_all, fields(path = %cmd.path.display()))]
738async fn import_path(cmd: ImportPathMsg) -> anyhow::Result<ImportEntry> {
739    let ImportPathMsg {
740        inner:
741            ImportPathRequest {
742                path,
743                scope,
744                format,
745                ..
746            },
747        tx,
748        ..
749    } = cmd;
750    let mut res = Vec::new();
751    let mut file = tokio::fs::File::open(path).await?;
752    let mut buf = [0u8; 1024 * 64];
753    loop {
754        let size = file.read(&mut buf).await?;
755        if size == 0 {
756            break;
757        }
758        res.extend_from_slice(&buf[..size]);
759        tx.send(AddProgressItem::CopyProgress(res.len() as u64))
760            .await?;
761    }
762    import_bytes(res.into(), scope, format, tx).await
763}
764
765#[instrument(skip_all, fields(hash = %cmd.hash.fmt_short(), path = %cmd.target.display()))]
766async fn export_path(entry: Option<BaoFileHandle>, cmd: ExportPathMsg) {
767    let ExportPathMsg { inner, mut tx, .. } = cmd;
768    let Some(entry) = entry else {
769        tx.send(ExportProgressItem::Error(api::Error::io(
770            io::ErrorKind::NotFound,
771            "hash not found",
772        )))
773        .await
774        .ok();
775        return;
776    };
777    match export_path_impl(entry, inner, &mut tx).await {
778        Ok(()) => tx.send(ExportProgressItem::Done).await.ok(),
779        Err(e) => tx.send(ExportProgressItem::Error(e.into())).await.ok(),
780    };
781}
782
783async fn export_path_impl(
784    entry: BaoFileHandle,
785    cmd: ExportPathRequest,
786    tx: &mut mpsc::Sender<ExportProgressItem>,
787) -> io::Result<()> {
788    let ExportPathRequest { target, .. } = cmd;
789    if !target.is_absolute() {
790        return Err(io::Error::new(
791            io::ErrorKind::InvalidInput,
792            "path is not absolute",
793        ));
794    }
795    if let Some(parent) = target.parent() {
796        std::fs::create_dir_all(parent)?;
797    }
798    // todo: for partial entries make sure to only write the part that is actually present
799    let mut file = std::fs::File::create(target)?;
800    let size = entry.0.state.borrow().size();
801    tx.send(ExportProgressItem::Size(size)).await?;
802    let mut buf = [0u8; 1024 * 64];
803    for offset in (0..size).step_by(1024 * 64) {
804        let len = std::cmp::min(size - offset, 1024 * 64) as usize;
805        let buf = &mut buf[..len];
806        entry.0.state.borrow().data().read_exact_at(offset, buf)?;
807        file.write_all(buf)?;
808        tx.try_send(ExportProgressItem::CopyProgress(offset))
809            .await
810            .map_err(|_e| io::Error::other(""))?;
811        yield_now().await;
812    }
813    Ok(())
814}
815
816struct ImportEntry {
817    scope: Scope,
818    format: BlobFormat,
819    data: Bytes,
820    outboard: PreOrderMemOutboard,
821    tx: mpsc::Sender<AddProgressItem>,
822}
823
824pub struct DataReader(BaoFileHandle);
825
826impl ReadBytesAt for DataReader {
827    fn read_bytes_at(&self, offset: u64, size: usize) -> std::io::Result<Bytes> {
828        let entry = self.0 .0.state.borrow();
829        entry.data().read_bytes_at(offset, size)
830    }
831}
832
833pub struct OutboardReader {
834    hash: blake3::Hash,
835    tree: BaoTree,
836    data: BaoFileHandle,
837}
838
839impl Outboard for OutboardReader {
840    fn root(&self) -> blake3::Hash {
841        self.hash
842    }
843
844    fn tree(&self) -> BaoTree {
845        self.tree
846    }
847
848    fn load(&self, node: TreeNode) -> io::Result<Option<(blake3::Hash, blake3::Hash)>> {
849        let Some(offset) = self.tree.pre_order_offset(node) else {
850            return Ok(None);
851        };
852        let mut buf = [0u8; 64];
853        let size = self
854            .data
855            .0
856            .state
857            .borrow()
858            .outboard()
859            .read_at(offset * 64, &mut buf)?;
860        if size != 64 {
861            return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "short read"));
862        }
863        let left: [u8; 32] = buf[..32].try_into().unwrap();
864        let right: [u8; 32] = buf[32..].try_into().unwrap();
865        Ok(Some((left.into(), right.into())))
866    }
867}
868
869struct State {
870    data: HashMap<Hash, BaoFileHandle>,
871    tags: BTreeMap<Tag, HashAndFormat>,
872    empty_hash: BaoFileHandle,
873}
874
875#[derive(Debug, derive_more::From)]
876pub enum BaoFileStorage {
877    Partial(PartialMemStorage),
878    Complete(CompleteStorage),
879}
880
881impl BaoFileStorage {
882    /// Get the bitfield of the storage.
883    pub fn bitfield(&self) -> Bitfield {
884        match self {
885            Self::Partial(entry) => entry.bitfield.clone(),
886            Self::Complete(entry) => Bitfield::complete(entry.size()),
887        }
888    }
889}
890
891#[derive(Debug)]
892pub struct BaoFileHandleInner {
893    state: watch::Sender<BaoFileStorage>,
894    hash: Hash,
895}
896
897/// A cheaply cloneable handle to a bao file, including the hash
898#[derive(Debug, Clone, derive_more::Deref)]
899pub struct BaoFileHandle(Arc<BaoFileHandleInner>);
900
901impl BaoFileHandle {
902    pub fn new_partial(hash: Hash) -> Self {
903        let (state, _) = watch::channel(BaoFileStorage::Partial(PartialMemStorage {
904            data: SparseMemFile::new(),
905            outboard: SparseMemFile::new(),
906            size: SizeInfo::default(),
907            bitfield: Bitfield::empty(),
908        }));
909        Self(Arc::new(BaoFileHandleInner { state, hash }))
910    }
911
912    pub fn hash(&self) -> Hash {
913        self.hash
914    }
915
916    pub fn bitfield(&self) -> Bitfield {
917        self.0.state.borrow().bitfield()
918    }
919
920    pub fn subscribe(&self) -> BaoFileStorageSubscriber {
921        BaoFileStorageSubscriber::new(self.0.state.subscribe())
922    }
923
924    pub fn data_reader(&self) -> DataReader {
925        DataReader(self.clone())
926    }
927
928    pub fn outboard_reader(&self) -> OutboardReader {
929        let entry = self.0.state.borrow();
930        let hash = self.hash.into();
931        let tree = BaoTree::new(entry.size(), IROH_BLOCK_SIZE);
932        OutboardReader {
933            hash,
934            tree,
935            data: self.clone(),
936        }
937    }
938}
939
940impl Default for BaoFileStorage {
941    fn default() -> Self {
942        Self::Partial(Default::default())
943    }
944}
945
946impl BaoFileStorage {
947    fn data(&self) -> &[u8] {
948        match self {
949            Self::Partial(entry) => entry.data.as_ref(),
950            Self::Complete(entry) => &entry.data,
951        }
952    }
953
954    fn outboard(&self) -> &[u8] {
955        match self {
956            Self::Partial(entry) => entry.outboard.as_ref(),
957            Self::Complete(entry) => &entry.outboard,
958        }
959    }
960
961    fn size(&self) -> u64 {
962        match self {
963            Self::Partial(entry) => entry.current_size(),
964            Self::Complete(entry) => entry.size(),
965        }
966    }
967}
968
969#[derive(Debug, Clone)]
970pub struct CompleteStorage {
971    pub(crate) data: Bytes,
972    pub(crate) outboard: Bytes,
973}
974
975impl CompleteStorage {
976    pub fn create(data: Bytes) -> (Hash, Self) {
977        let outboard = PreOrderMemOutboard::create(&data, IROH_BLOCK_SIZE);
978        let hash = outboard.root().into();
979        let outboard = outboard.data.into();
980        let entry = Self::new(data, outboard);
981        (hash, entry)
982    }
983
984    pub fn new(data: Bytes, outboard: Bytes) -> Self {
985        Self { data, outboard }
986    }
987
988    pub fn size(&self) -> u64 {
989        self.data.len() as u64
990    }
991}
992
993#[allow(dead_code)]
994fn print_outboard(hashes: &[u8]) {
995    assert!(hashes.len() % 64 == 0);
996    for chunk in hashes.chunks(64) {
997        let left: [u8; 32] = chunk[..32].try_into().unwrap();
998        let right: [u8; 32] = chunk[32..].try_into().unwrap();
999        let left = blake3::Hash::from(left);
1000        let right = blake3::Hash::from(right);
1001        println!("l: {left:?}, r: {right:?}");
1002    }
1003}
1004
1005pub struct BaoFileStorageSubscriber {
1006    receiver: watch::Receiver<BaoFileStorage>,
1007}
1008
1009impl BaoFileStorageSubscriber {
1010    pub fn new(receiver: watch::Receiver<BaoFileStorage>) -> Self {
1011        Self { receiver }
1012    }
1013
1014    /// Forward observed *values* to the given sender
1015    ///
1016    /// Returns an error if sending fails, or if the last sender is dropped
1017    pub async fn forward(mut self, mut tx: mpsc::Sender<Bitfield>) -> anyhow::Result<()> {
1018        let value = self.receiver.borrow().bitfield();
1019        tx.send(value).await?;
1020        loop {
1021            self.update_or_closed(&mut tx).await?;
1022            let value = self.receiver.borrow().bitfield();
1023            tx.send(value.clone()).await?;
1024        }
1025    }
1026
1027    /// Forward observed *deltas* to the given sender
1028    ///
1029    /// Returns an error if sending fails, or if the last sender is dropped
1030    #[allow(dead_code)]
1031    pub async fn forward_delta(mut self, mut tx: mpsc::Sender<Bitfield>) -> anyhow::Result<()> {
1032        let value = self.receiver.borrow().bitfield();
1033        let mut old = value.clone();
1034        tx.send(value).await?;
1035        loop {
1036            self.update_or_closed(&mut tx).await?;
1037            let new = self.receiver.borrow().bitfield();
1038            let diff = old.diff(&new);
1039            if diff.is_empty() {
1040                continue;
1041            }
1042            tx.send(diff).await?;
1043            old = new;
1044        }
1045    }
1046
1047    async fn update_or_closed(&mut self, tx: &mut mpsc::Sender<Bitfield>) -> anyhow::Result<()> {
1048        tokio::select! {
1049            _ = tx.closed() => {
1050                // the sender is closed, we are done
1051                Err(irpc::channel::SendError::ReceiverClosed.into())
1052            }
1053            e = self.receiver.changed() => Ok(e?),
1054        }
1055    }
1056}
1057
1058#[cfg(test)]
1059mod tests {
1060    use n0_future::StreamExt;
1061    use testresult::TestResult;
1062
1063    use super::*;
1064
1065    #[tokio::test]
1066    async fn smoke() -> TestResult<()> {
1067        let store = MemStore::new();
1068        let tt = store.add_bytes(vec![0u8; 1024 * 64]).temp_tag().await?;
1069        let hash = *tt.hash();
1070        println!("hash: {hash:?}");
1071        let mut stream = store.export_bao(hash, ChunkRanges::all()).stream();
1072        while let Some(item) = stream.next().await {
1073            println!("item: {item:?}");
1074        }
1075        let stream = store.export_bao(hash, ChunkRanges::all());
1076        let exported = stream.bao_to_vec().await?;
1077
1078        let store2 = MemStore::new();
1079        let mut or = store2.observe(hash).stream().await?;
1080        tokio::spawn(async move {
1081            while let Some(event) = or.next().await {
1082                println!("event: {event:?}");
1083            }
1084        });
1085        store2
1086            .import_bao_bytes(hash, ChunkRanges::all(), exported.clone())
1087            .await?;
1088
1089        let exported2 = store2
1090            .export_bao(hash, ChunkRanges::all())
1091            .bao_to_vec()
1092            .await?;
1093        assert_eq!(exported, exported2);
1094
1095        Ok(())
1096    }
1097}