iroh_blobs/store/
mem.rs

1//! Mutable in-memory blob store.
2//!
3//! Being a memory store, this store has to import all data into memory before it can
4//! serve it. So the amount of data you can serve is limited by your available memory.
5//! Other than that this is a fully featured store that provides all features such as
6//! tags and garbage collection.
7//!
8//! For many use cases this can be quite useful, since it does not require write access
9//! to the file system.
10use std::{
11    collections::{BTreeMap, HashMap, HashSet},
12    future::Future,
13    io::{self, Write},
14    num::NonZeroU64,
15    ops::Deref,
16    sync::Arc,
17    time::SystemTime,
18};
19
20use bao_tree::{
21    blake3,
22    io::{
23        mixed::{traverse_ranges_validated, EncodedItem, ReadBytesAt},
24        outboard::PreOrderMemOutboard,
25        sync::{Outboard, ReadAt, WriteAt},
26        BaoContentItem, EncodeError, Leaf,
27    },
28    BaoTree, ChunkNum, ChunkRanges, TreeNode,
29};
30use bytes::Bytes;
31use irpc::channel::mpsc;
32use n0_future::future::yield_now;
33use range_collections::range_set::RangeSetRange;
34use tokio::{
35    io::AsyncReadExt,
36    sync::watch,
37    task::{JoinError, JoinSet},
38};
39use tracing::{error, info, instrument, trace, Instrument};
40
41use super::util::{BaoTreeSender, PartialMemStorage};
42use crate::{
43    api::{
44        self,
45        blobs::{AddProgressItem, Bitfield, BlobStatus, ExportProgressItem},
46        proto::{
47            BatchMsg, BatchResponse, BlobDeleteRequest, BlobStatusMsg, BlobStatusRequest, Command,
48            CreateTagMsg, CreateTagRequest, CreateTempTagMsg, DeleteBlobsMsg, DeleteTagsMsg,
49            DeleteTagsRequest, ExportBaoMsg, ExportBaoRequest, ExportPathMsg, ExportPathRequest,
50            ExportRangesItem, ExportRangesMsg, ExportRangesRequest, ImportBaoMsg, ImportBaoRequest,
51            ImportByteStreamMsg, ImportByteStreamUpdate, ImportBytesMsg, ImportBytesRequest,
52            ImportPathMsg, ImportPathRequest, ListBlobsMsg, ListTagsMsg, ListTagsRequest,
53            ObserveMsg, ObserveRequest, RenameTagMsg, RenameTagRequest, Scope, SetTagMsg,
54            SetTagRequest, ShutdownMsg, SyncDbMsg,
55        },
56        tags::TagInfo,
57        ApiClient,
58    },
59    store::{
60        util::{SizeInfo, SparseMemFile, Tag},
61        HashAndFormat, IROH_BLOCK_SIZE,
62    },
63    util::{
64        temp_tag::{TagDrop, TempTagScope, TempTags},
65        ChunkRangesExt,
66    },
67    BlobFormat, Hash,
68};
69
70#[derive(Debug, Default)]
71pub struct Options {}
72
73#[derive(Debug, Clone)]
74#[repr(transparent)]
75pub struct MemStore {
76    client: ApiClient,
77}
78
79impl AsRef<crate::api::Store> for MemStore {
80    fn as_ref(&self) -> &crate::api::Store {
81        crate::api::Store::ref_from_sender(&self.client)
82    }
83}
84
85impl Deref for MemStore {
86    type Target = crate::api::Store;
87
88    fn deref(&self) -> &Self::Target {
89        crate::api::Store::ref_from_sender(&self.client)
90    }
91}
92
93impl Default for MemStore {
94    fn default() -> Self {
95        Self::new()
96    }
97}
98
99#[derive(derive_more::From)]
100enum TaskResult {
101    Unit(()),
102    Import(anyhow::Result<ImportEntry>),
103    Scope(Scope),
104}
105
106impl MemStore {
107    pub fn from_sender(client: ApiClient) -> Self {
108        Self { client }
109    }
110
111    pub fn new() -> Self {
112        let (sender, receiver) = tokio::sync::mpsc::channel(32);
113        tokio::spawn(
114            Actor {
115                commands: receiver,
116                tasks: JoinSet::new(),
117                state: State {
118                    data: HashMap::new(),
119                    tags: BTreeMap::new(),
120                    empty_hash: BaoFileHandle::new_partial(Hash::EMPTY),
121                },
122                options: Arc::new(Options::default()),
123                temp_tags: Default::default(),
124                protected: Default::default(),
125            }
126            .run(),
127        );
128        Self::from_sender(sender.into())
129    }
130}
131
132struct Actor {
133    commands: tokio::sync::mpsc::Receiver<Command>,
134    tasks: JoinSet<TaskResult>,
135    state: State,
136    #[allow(dead_code)]
137    options: Arc<Options>,
138    // temp tags
139    temp_tags: TempTags,
140    protected: HashSet<Hash>,
141}
142
143impl Actor {
144    fn spawn<F, T>(&mut self, f: F)
145    where
146        F: Future<Output = T> + Send + 'static,
147        T: Into<TaskResult>,
148    {
149        let span = tracing::Span::current();
150        let fut = async move { f.await.into() }.instrument(span);
151        self.tasks.spawn(fut);
152    }
153
154    async fn handle_command(&mut self, cmd: Command) -> Option<ShutdownMsg> {
155        match cmd {
156            Command::ImportBao(ImportBaoMsg {
157                inner: ImportBaoRequest { hash, size },
158                rx: data,
159                tx,
160                ..
161            }) => {
162                let entry = self.get_or_create_entry(hash);
163                self.spawn(import_bao(entry, size, data, tx));
164            }
165            Command::Observe(ObserveMsg {
166                inner: ObserveRequest { hash },
167                tx,
168                ..
169            }) => {
170                let entry = self.get_or_create_entry(hash);
171                self.spawn(observe(entry, tx));
172            }
173            Command::ImportBytes(ImportBytesMsg {
174                inner:
175                    ImportBytesRequest {
176                        data,
177                        scope,
178                        format,
179                        ..
180                    },
181                tx,
182                ..
183            }) => {
184                self.spawn(import_bytes(data, scope, format, tx));
185            }
186            Command::ImportByteStream(ImportByteStreamMsg { inner, tx, rx, .. }) => {
187                self.spawn(import_byte_stream(inner.scope, inner.format, rx, tx));
188            }
189            Command::ImportPath(cmd) => {
190                self.spawn(import_path(cmd));
191            }
192            Command::ExportBao(ExportBaoMsg {
193                inner: ExportBaoRequest { hash, ranges },
194                tx,
195                ..
196            }) => {
197                let entry = self.get(&hash);
198                self.spawn(export_bao(entry, ranges, tx))
199            }
200            Command::ExportPath(cmd) => {
201                let entry = self.get(&cmd.hash);
202                self.spawn(export_path(entry, cmd));
203            }
204            Command::DeleteTags(cmd) => {
205                let DeleteTagsMsg {
206                    inner: DeleteTagsRequest { from, to },
207                    tx,
208                    ..
209                } = cmd;
210                info!("deleting tags from {:?} to {:?}", from, to);
211                // state.tags.remove(&from.unwrap());
212                // todo: more efficient impl
213                self.state.tags.retain(|tag, _| {
214                    if let Some(from) = &from {
215                        if tag < from {
216                            return true;
217                        }
218                    }
219                    if let Some(to) = &to {
220                        if tag >= to {
221                            return true;
222                        }
223                    }
224                    info!("    removing {:?}", tag);
225                    false
226                });
227                tx.send(Ok(())).await.ok();
228            }
229            Command::RenameTag(cmd) => {
230                let RenameTagMsg {
231                    inner: RenameTagRequest { from, to },
232                    tx,
233                    ..
234                } = cmd;
235                let tags = &mut self.state.tags;
236                let value = match tags.remove(&from) {
237                    Some(value) => value,
238                    None => {
239                        tx.send(Err(api::Error::io(
240                            io::ErrorKind::NotFound,
241                            format!("tag not found: {from:?}"),
242                        )))
243                        .await
244                        .ok();
245                        return None;
246                    }
247                };
248                tags.insert(to, value);
249                tx.send(Ok(())).await.ok();
250                return None;
251            }
252            Command::ListTags(cmd) => {
253                let ListTagsMsg {
254                    inner:
255                        ListTagsRequest {
256                            from,
257                            to,
258                            raw,
259                            hash_seq,
260                        },
261                    tx,
262                    ..
263                } = cmd;
264                let tags = self
265                    .state
266                    .tags
267                    .iter()
268                    .filter(move |(tag, value)| {
269                        if let Some(from) = &from {
270                            if tag < &from {
271                                return false;
272                            }
273                        }
274                        if let Some(to) = &to {
275                            if tag >= &to {
276                                return false;
277                            }
278                        }
279                        raw && value.format.is_raw() || hash_seq && value.format.is_hash_seq()
280                    })
281                    .map(|(tag, value)| TagInfo {
282                        name: tag.clone(),
283                        hash: value.hash,
284                        format: value.format,
285                    })
286                    .map(Ok);
287                tx.send(tags.collect()).await.ok();
288            }
289            Command::SetTag(SetTagMsg {
290                inner: SetTagRequest { name: tag, value },
291                tx,
292                ..
293            }) => {
294                self.state.tags.insert(tag, value);
295                tx.send(Ok(())).await.ok();
296            }
297            Command::CreateTag(CreateTagMsg {
298                inner: CreateTagRequest { value },
299                tx,
300                ..
301            }) => {
302                let tag = Tag::auto(SystemTime::now(), |tag| self.state.tags.contains_key(tag));
303                self.state.tags.insert(tag.clone(), value);
304                tx.send(Ok(tag)).await.ok();
305            }
306            Command::CreateTempTag(cmd) => {
307                trace!("{cmd:?}");
308                self.create_temp_tag(cmd).await;
309            }
310            Command::ListTempTags(cmd) => {
311                trace!("{cmd:?}");
312                let tts = self.temp_tags.list();
313                cmd.tx.send(tts).await.ok();
314            }
315            Command::ListBlobs(cmd) => {
316                let ListBlobsMsg { tx, .. } = cmd;
317                let blobs = self.state.data.keys().cloned().collect::<Vec<Hash>>();
318                self.spawn(async move {
319                    for blob in blobs {
320                        if tx.send(Ok(blob)).await.is_err() {
321                            break;
322                        }
323                    }
324                });
325            }
326            Command::BlobStatus(cmd) => {
327                trace!("{cmd:?}");
328                let BlobStatusMsg {
329                    inner: BlobStatusRequest { hash },
330                    tx,
331                    ..
332                } = cmd;
333                let res = match self.get(&hash) {
334                    None => api::blobs::BlobStatus::NotFound,
335                    Some(x) => {
336                        let bitfield = x.0.state.borrow().bitfield();
337                        if bitfield.is_complete() {
338                            BlobStatus::Complete {
339                                size: bitfield.size,
340                            }
341                        } else {
342                            BlobStatus::Partial {
343                                size: bitfield.validated_size(),
344                            }
345                        }
346                    }
347                };
348                tx.send(res).await.ok();
349            }
350            Command::DeleteBlobs(cmd) => {
351                trace!("{cmd:?}");
352                let DeleteBlobsMsg {
353                    inner: BlobDeleteRequest { hashes, force },
354                    tx,
355                    ..
356                } = cmd;
357                for hash in hashes {
358                    if !force && self.protected.contains(&hash) {
359                        continue;
360                    }
361                    self.state.data.remove(&hash);
362                }
363                tx.send(Ok(())).await.ok();
364            }
365            Command::Batch(cmd) => {
366                trace!("{cmd:?}");
367                let (id, scope) = self.temp_tags.create_scope();
368                self.spawn(handle_batch(cmd, id, scope));
369            }
370            Command::ClearProtected(cmd) => {
371                self.protected.clear();
372                cmd.tx.send(Ok(())).await.ok();
373            }
374            Command::ExportRanges(cmd) => {
375                let entry = self.get(&cmd.hash);
376                self.spawn(export_ranges(cmd, entry));
377            }
378            Command::SyncDb(SyncDbMsg { tx, .. }) => {
379                tx.send(Ok(())).await.ok();
380            }
381            Command::Shutdown(cmd) => {
382                return Some(cmd);
383            }
384        }
385        None
386    }
387
388    fn get(&mut self, hash: &Hash) -> Option<BaoFileHandle> {
389        if *hash == Hash::EMPTY {
390            Some(self.state.empty_hash.clone())
391        } else {
392            self.state.data.get(hash).cloned()
393        }
394    }
395
396    fn get_or_create_entry(&mut self, hash: Hash) -> BaoFileHandle {
397        if hash == Hash::EMPTY {
398            self.state.empty_hash.clone()
399        } else {
400            self.state
401                .data
402                .entry(hash)
403                .or_insert_with(|| BaoFileHandle::new_partial(hash))
404                .clone()
405        }
406    }
407
408    async fn create_temp_tag(&mut self, cmd: CreateTempTagMsg) {
409        let CreateTempTagMsg { tx, inner, .. } = cmd;
410        let mut tt = self.temp_tags.create(inner.scope, inner.value);
411        if tx.is_rpc() {
412            tt.leak();
413        }
414        tx.send(tt).await.ok();
415    }
416
417    async fn finish_import(&mut self, res: anyhow::Result<ImportEntry>) {
418        let import_data = match res {
419            Ok(entry) => entry,
420            Err(e) => {
421                error!("import failed: {e}");
422                return;
423            }
424        };
425        let hash = import_data.outboard.root().into();
426        let entry = self.get_or_create_entry(hash);
427        entry
428            .0
429            .state
430            .send_if_modified(|state: &mut BaoFileStorage| {
431                let BaoFileStorage::Partial(_) = state.deref() else {
432                    return false;
433                };
434                *state =
435                    CompleteStorage::new(import_data.data, import_data.outboard.data.into()).into();
436                true
437            });
438        let tt = self.temp_tags.create(
439            import_data.scope,
440            HashAndFormat {
441                hash,
442                format: import_data.format,
443            },
444        );
445        import_data.tx.send(AddProgressItem::Done(tt)).await.ok();
446    }
447
448    fn log_task_result(&self, res: Result<TaskResult, JoinError>) -> Option<TaskResult> {
449        match res {
450            Ok(x) => Some(x),
451            Err(e) => {
452                if e.is_cancelled() {
453                    trace!("task cancelled: {e}");
454                } else {
455                    error!("task failed: {e}");
456                }
457                None
458            }
459        }
460    }
461
462    pub async fn run(mut self) {
463        let shutdown = loop {
464            tokio::select! {
465                cmd = self.commands.recv() => {
466                    let Some(cmd) = cmd else {
467                        // last sender has been dropped.
468                        // exit immediately.
469                        break None;
470                    };
471                    if let Some(cmd) = self.handle_command(cmd).await {
472                        break Some(cmd);
473                    }
474                }
475                Some(res) = self.tasks.join_next(), if !self.tasks.is_empty() => {
476                    let Some(res) = self.log_task_result(res) else {
477                        continue;
478                    };
479                    match res {
480                        TaskResult::Import(res) => {
481                            self.finish_import(res).await;
482                        }
483                        TaskResult::Scope(scope) => {
484                            self.temp_tags.end_scope(scope);
485                        }
486                        TaskResult::Unit(_) => {}
487                    }
488                }
489            }
490        };
491        if let Some(shutdown) = shutdown {
492            shutdown.tx.send(()).await.ok();
493        }
494    }
495}
496
497async fn handle_batch(cmd: BatchMsg, id: Scope, scope: Arc<TempTagScope>) -> Scope {
498    if let Err(cause) = handle_batch_impl(cmd, id, &scope).await {
499        error!("batch failed: {cause}");
500    }
501    id
502}
503
504async fn handle_batch_impl(cmd: BatchMsg, id: Scope, scope: &Arc<TempTagScope>) -> api::Result<()> {
505    let BatchMsg { tx, mut rx, .. } = cmd;
506    trace!("created scope {}", id);
507    tx.send(id).await.map_err(api::Error::other)?;
508    while let Some(msg) = rx.recv().await? {
509        match msg {
510            BatchResponse::Drop(msg) => scope.on_drop(&msg),
511            BatchResponse::Ping => {}
512        }
513    }
514    Ok(())
515}
516
517async fn export_ranges(mut cmd: ExportRangesMsg, entry: Option<BaoFileHandle>) {
518    let Some(entry) = entry else {
519        let err = io::Error::new(io::ErrorKind::NotFound, "hash not found");
520        cmd.tx.send(ExportRangesItem::Error(err.into())).await.ok();
521        return;
522    };
523    if let Err(cause) = export_ranges_impl(cmd.inner, &mut cmd.tx, entry).await {
524        cmd.tx
525            .send(ExportRangesItem::Error(cause.into()))
526            .await
527            .ok();
528    }
529}
530
531async fn export_ranges_impl(
532    cmd: ExportRangesRequest,
533    tx: &mut mpsc::Sender<ExportRangesItem>,
534    entry: BaoFileHandle,
535) -> io::Result<()> {
536    let ExportRangesRequest { ranges, hash } = cmd;
537    let bitfield = entry.bitfield();
538    trace!(
539        "exporting ranges: {hash} {ranges:?} size={}",
540        bitfield.size()
541    );
542    debug_assert!(entry.hash() == hash, "hash mismatch");
543    let data = entry.data_reader();
544    let size = bitfield.size();
545    for range in ranges.iter() {
546        let range = match range {
547            RangeSetRange::Range(range) => size.min(*range.start)..size.min(*range.end),
548            RangeSetRange::RangeFrom(range) => size.min(*range.start)..size,
549        };
550        let requested = ChunkRanges::bytes(range.start..range.end);
551        if !bitfield.ranges.is_superset(&requested) {
552            return Err(io::Error::other(format!(
553                "missing range: {requested:?}, present: {bitfield:?}",
554            )));
555        }
556        let bs = 1024;
557        let mut offset = range.start;
558        loop {
559            let end: u64 = (offset + bs).min(range.end);
560            let size = (end - offset) as usize;
561            tx.send(
562                Leaf {
563                    offset,
564                    data: data.read_bytes_at(offset, size)?,
565                }
566                .into(),
567            )
568            .await?;
569            offset = end;
570            if offset >= range.end {
571                break;
572            }
573        }
574    }
575    Ok(())
576}
577
578fn chunk_range(leaf: &Leaf) -> ChunkRanges {
579    let start = ChunkNum::chunks(leaf.offset);
580    let end = ChunkNum::chunks(leaf.offset + leaf.data.len() as u64);
581    (start..end).into()
582}
583
584async fn import_bao(
585    entry: BaoFileHandle,
586    size: NonZeroU64,
587    mut stream: mpsc::Receiver<BaoContentItem>,
588    tx: irpc::channel::oneshot::Sender<api::Result<()>>,
589) {
590    let size = size.get();
591    entry
592        .0
593        .state
594        .send_if_modified(|state: &mut BaoFileStorage| {
595            let BaoFileStorage::Partial(entry) = state else {
596                // entry was already completed, no need to write
597                return false;
598            };
599            entry.size.write(0, size);
600            false
601        });
602    let tree = BaoTree::new(size, IROH_BLOCK_SIZE);
603    while let Some(item) = stream.recv().await.unwrap() {
604        entry.0.state.send_if_modified(|state| {
605            let BaoFileStorage::Partial(partial) = state else {
606                // entry was already completed, no need to write
607                return false;
608            };
609            match item {
610                BaoContentItem::Parent(parent) => {
611                    if let Some(offset) = tree.pre_order_offset(parent.node) {
612                        let mut pair = [0u8; 64];
613                        pair[..32].copy_from_slice(parent.pair.0.as_bytes());
614                        pair[32..].copy_from_slice(parent.pair.1.as_bytes());
615                        partial
616                            .outboard
617                            .write_at(offset * 64, &pair)
618                            .expect("writing to mem can never fail");
619                    }
620                    false
621                }
622                BaoContentItem::Leaf(leaf) => {
623                    let start = leaf.offset;
624                    partial
625                        .data
626                        .write_at(start, &leaf.data)
627                        .expect("writing to mem can never fail");
628                    let added = chunk_range(&leaf);
629                    let update = partial.bitfield.update(&Bitfield::new(added.clone(), size));
630                    if update.new_state().complete {
631                        let data = std::mem::take(&mut partial.data);
632                        let outboard = std::mem::take(&mut partial.outboard);
633                        let data: Bytes = <Vec<u8>>::try_from(data).unwrap().into();
634                        let outboard: Bytes = <Vec<u8>>::try_from(outboard).unwrap().into();
635                        *state = CompleteStorage::new(data, outboard).into();
636                    }
637                    update.changed()
638                }
639            }
640        });
641    }
642    tx.send(Ok(())).await.ok();
643}
644
645#[instrument(skip_all, fields(hash = tracing::field::Empty))]
646async fn export_bao(
647    entry: Option<BaoFileHandle>,
648    ranges: ChunkRanges,
649    mut sender: mpsc::Sender<EncodedItem>,
650) {
651    let Some(entry) = entry else {
652        let err = EncodeError::Io(io::Error::new(io::ErrorKind::NotFound, "hash not found"));
653        sender.send(err.into()).await.ok();
654        return;
655    };
656    tracing::Span::current().record("hash", tracing::field::display(entry.hash));
657    let data = entry.data_reader();
658    let outboard = entry.outboard_reader();
659    let tx = BaoTreeSender::new(&mut sender);
660    traverse_ranges_validated(data, outboard, &ranges, tx)
661        .await
662        .ok();
663}
664
665#[instrument(skip_all, fields(hash = %entry.hash.fmt_short()))]
666async fn observe(entry: BaoFileHandle, tx: mpsc::Sender<api::blobs::Bitfield>) {
667    entry.subscribe().forward(tx).await.ok();
668}
669
670async fn import_bytes(
671    data: Bytes,
672    scope: Scope,
673    format: BlobFormat,
674    tx: mpsc::Sender<AddProgressItem>,
675) -> anyhow::Result<ImportEntry> {
676    tx.send(AddProgressItem::Size(data.len() as u64)).await?;
677    tx.send(AddProgressItem::CopyDone).await?;
678    let outboard = PreOrderMemOutboard::create(&data, IROH_BLOCK_SIZE);
679    Ok(ImportEntry {
680        data,
681        outboard,
682        scope,
683        format,
684        tx,
685    })
686}
687
688async fn import_byte_stream(
689    scope: Scope,
690    format: BlobFormat,
691    mut rx: mpsc::Receiver<ImportByteStreamUpdate>,
692    tx: mpsc::Sender<AddProgressItem>,
693) -> anyhow::Result<ImportEntry> {
694    let mut res = Vec::new();
695    loop {
696        match rx.recv().await {
697            Ok(Some(ImportByteStreamUpdate::Bytes(data))) => {
698                res.extend_from_slice(&data);
699                tx.send(AddProgressItem::CopyProgress(res.len() as u64))
700                    .await?;
701            }
702            Ok(Some(ImportByteStreamUpdate::Done)) => {
703                break;
704            }
705            Ok(None) => {
706                return Err(api::Error::io(
707                    io::ErrorKind::UnexpectedEof,
708                    "byte stream ended unexpectedly",
709                )
710                .into());
711            }
712            Err(e) => {
713                return Err(e.into());
714            }
715        }
716    }
717    import_bytes(res.into(), scope, format, tx).await
718}
719
720#[instrument(skip_all, fields(path = %cmd.path.display()))]
721async fn import_path(cmd: ImportPathMsg) -> anyhow::Result<ImportEntry> {
722    let ImportPathMsg {
723        inner:
724            ImportPathRequest {
725                path,
726                scope,
727                format,
728                ..
729            },
730        tx,
731        ..
732    } = cmd;
733    let mut res = Vec::new();
734    let mut file = tokio::fs::File::open(path).await?;
735    let mut buf = [0u8; 1024 * 64];
736    loop {
737        let size = file.read(&mut buf).await?;
738        if size == 0 {
739            break;
740        }
741        res.extend_from_slice(&buf[..size]);
742        tx.send(AddProgressItem::CopyProgress(res.len() as u64))
743            .await?;
744    }
745    import_bytes(res.into(), scope, format, tx).await
746}
747
748#[instrument(skip_all, fields(hash = %cmd.hash.fmt_short(), path = %cmd.target.display()))]
749async fn export_path(entry: Option<BaoFileHandle>, cmd: ExportPathMsg) {
750    let ExportPathMsg { inner, mut tx, .. } = cmd;
751    let Some(entry) = entry else {
752        tx.send(ExportProgressItem::Error(api::Error::io(
753            io::ErrorKind::NotFound,
754            "hash not found",
755        )))
756        .await
757        .ok();
758        return;
759    };
760    match export_path_impl(entry, inner, &mut tx).await {
761        Ok(()) => tx.send(ExportProgressItem::Done).await.ok(),
762        Err(e) => tx.send(ExportProgressItem::Error(e.into())).await.ok(),
763    };
764}
765
766async fn export_path_impl(
767    entry: BaoFileHandle,
768    cmd: ExportPathRequest,
769    tx: &mut mpsc::Sender<ExportProgressItem>,
770) -> io::Result<()> {
771    let ExportPathRequest { target, .. } = cmd;
772    if !target.is_absolute() {
773        return Err(io::Error::new(
774            io::ErrorKind::InvalidInput,
775            "path is not absolute",
776        ));
777    }
778    if let Some(parent) = target.parent() {
779        std::fs::create_dir_all(parent)?;
780    }
781    // todo: for partial entries make sure to only write the part that is actually present
782    let mut file = std::fs::File::create(target)?;
783    let size = entry.0.state.borrow().size();
784    tx.send(ExportProgressItem::Size(size)).await?;
785    let mut buf = [0u8; 1024 * 64];
786    for offset in (0..size).step_by(1024 * 64) {
787        let len = std::cmp::min(size - offset, 1024 * 64) as usize;
788        let buf = &mut buf[..len];
789        entry.0.state.borrow().data().read_exact_at(offset, buf)?;
790        file.write_all(buf)?;
791        tx.try_send(ExportProgressItem::CopyProgress(offset))
792            .await
793            .map_err(|_e| io::Error::other(""))?;
794        yield_now().await;
795    }
796    Ok(())
797}
798
799struct ImportEntry {
800    scope: Scope,
801    format: BlobFormat,
802    data: Bytes,
803    outboard: PreOrderMemOutboard,
804    tx: mpsc::Sender<AddProgressItem>,
805}
806
807pub struct DataReader(BaoFileHandle);
808
809impl ReadBytesAt for DataReader {
810    fn read_bytes_at(&self, offset: u64, size: usize) -> std::io::Result<Bytes> {
811        let entry = self.0 .0.state.borrow();
812        entry.data().read_bytes_at(offset, size)
813    }
814}
815
816pub struct OutboardReader {
817    hash: blake3::Hash,
818    tree: BaoTree,
819    data: BaoFileHandle,
820}
821
822impl Outboard for OutboardReader {
823    fn root(&self) -> blake3::Hash {
824        self.hash
825    }
826
827    fn tree(&self) -> BaoTree {
828        self.tree
829    }
830
831    fn load(&self, node: TreeNode) -> io::Result<Option<(blake3::Hash, blake3::Hash)>> {
832        let Some(offset) = self.tree.pre_order_offset(node) else {
833            return Ok(None);
834        };
835        let mut buf = [0u8; 64];
836        let size = self
837            .data
838            .0
839            .state
840            .borrow()
841            .outboard()
842            .read_at(offset * 64, &mut buf)?;
843        if size != 64 {
844            return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "short read"));
845        }
846        let left: [u8; 32] = buf[..32].try_into().unwrap();
847        let right: [u8; 32] = buf[32..].try_into().unwrap();
848        Ok(Some((left.into(), right.into())))
849    }
850}
851
852struct State {
853    data: HashMap<Hash, BaoFileHandle>,
854    tags: BTreeMap<Tag, HashAndFormat>,
855    empty_hash: BaoFileHandle,
856}
857
858#[derive(Debug, derive_more::From)]
859pub enum BaoFileStorage {
860    Partial(PartialMemStorage),
861    Complete(CompleteStorage),
862}
863
864impl BaoFileStorage {
865    /// Get the bitfield of the storage.
866    pub fn bitfield(&self) -> Bitfield {
867        match self {
868            Self::Partial(entry) => entry.bitfield.clone(),
869            Self::Complete(entry) => Bitfield::complete(entry.size()),
870        }
871    }
872}
873
874#[derive(Debug)]
875pub struct BaoFileHandleInner {
876    state: watch::Sender<BaoFileStorage>,
877    hash: Hash,
878}
879
880/// A cheaply cloneable handle to a bao file, including the hash
881#[derive(Debug, Clone, derive_more::Deref)]
882pub struct BaoFileHandle(Arc<BaoFileHandleInner>);
883
884impl BaoFileHandle {
885    pub fn new_partial(hash: Hash) -> Self {
886        let (state, _) = watch::channel(BaoFileStorage::Partial(PartialMemStorage {
887            data: SparseMemFile::new(),
888            outboard: SparseMemFile::new(),
889            size: SizeInfo::default(),
890            bitfield: Bitfield::empty(),
891        }));
892        Self(Arc::new(BaoFileHandleInner { state, hash }))
893    }
894
895    pub fn hash(&self) -> Hash {
896        self.hash
897    }
898
899    pub fn bitfield(&self) -> Bitfield {
900        self.0.state.borrow().bitfield()
901    }
902
903    pub fn subscribe(&self) -> BaoFileStorageSubscriber {
904        BaoFileStorageSubscriber::new(self.0.state.subscribe())
905    }
906
907    pub fn data_reader(&self) -> DataReader {
908        DataReader(self.clone())
909    }
910
911    pub fn outboard_reader(&self) -> OutboardReader {
912        let entry = self.0.state.borrow();
913        let hash = self.hash.into();
914        let tree = BaoTree::new(entry.size(), IROH_BLOCK_SIZE);
915        OutboardReader {
916            hash,
917            tree,
918            data: self.clone(),
919        }
920    }
921}
922
923impl Default for BaoFileStorage {
924    fn default() -> Self {
925        Self::Partial(Default::default())
926    }
927}
928
929impl BaoFileStorage {
930    fn data(&self) -> &[u8] {
931        match self {
932            Self::Partial(entry) => entry.data.as_ref(),
933            Self::Complete(entry) => &entry.data,
934        }
935    }
936
937    fn outboard(&self) -> &[u8] {
938        match self {
939            Self::Partial(entry) => entry.outboard.as_ref(),
940            Self::Complete(entry) => &entry.outboard,
941        }
942    }
943
944    fn size(&self) -> u64 {
945        match self {
946            Self::Partial(entry) => entry.current_size(),
947            Self::Complete(entry) => entry.size(),
948        }
949    }
950}
951
952#[derive(Debug, Clone)]
953pub struct CompleteStorage {
954    pub(crate) data: Bytes,
955    pub(crate) outboard: Bytes,
956}
957
958impl CompleteStorage {
959    pub fn create(data: Bytes) -> (Hash, Self) {
960        let outboard = PreOrderMemOutboard::create(&data, IROH_BLOCK_SIZE);
961        let hash = outboard.root().into();
962        let outboard = outboard.data.into();
963        let entry = Self::new(data, outboard);
964        (hash, entry)
965    }
966
967    pub fn new(data: Bytes, outboard: Bytes) -> Self {
968        Self { data, outboard }
969    }
970
971    pub fn size(&self) -> u64 {
972        self.data.len() as u64
973    }
974}
975
976#[allow(dead_code)]
977fn print_outboard(hashes: &[u8]) {
978    assert!(hashes.len() % 64 == 0);
979    for chunk in hashes.chunks(64) {
980        let left: [u8; 32] = chunk[..32].try_into().unwrap();
981        let right: [u8; 32] = chunk[32..].try_into().unwrap();
982        let left = blake3::Hash::from(left);
983        let right = blake3::Hash::from(right);
984        println!("l: {left:?}, r: {right:?}");
985    }
986}
987
988pub struct BaoFileStorageSubscriber {
989    receiver: watch::Receiver<BaoFileStorage>,
990}
991
992impl BaoFileStorageSubscriber {
993    pub fn new(receiver: watch::Receiver<BaoFileStorage>) -> Self {
994        Self { receiver }
995    }
996
997    /// Forward observed *values* to the given sender
998    ///
999    /// Returns an error if sending fails, or if the last sender is dropped
1000    pub async fn forward(mut self, mut tx: mpsc::Sender<Bitfield>) -> anyhow::Result<()> {
1001        let value = self.receiver.borrow().bitfield();
1002        tx.send(value).await?;
1003        loop {
1004            self.update_or_closed(&mut tx).await?;
1005            let value = self.receiver.borrow().bitfield();
1006            tx.send(value.clone()).await?;
1007        }
1008    }
1009
1010    /// Forward observed *deltas* to the given sender
1011    ///
1012    /// Returns an error if sending fails, or if the last sender is dropped
1013    #[allow(dead_code)]
1014    pub async fn forward_delta(mut self, mut tx: mpsc::Sender<Bitfield>) -> anyhow::Result<()> {
1015        let value = self.receiver.borrow().bitfield();
1016        let mut old = value.clone();
1017        tx.send(value).await?;
1018        loop {
1019            self.update_or_closed(&mut tx).await?;
1020            let new = self.receiver.borrow().bitfield();
1021            let diff = old.diff(&new);
1022            if diff.is_empty() {
1023                continue;
1024            }
1025            tx.send(diff).await?;
1026            old = new;
1027        }
1028    }
1029
1030    async fn update_or_closed(&mut self, tx: &mut mpsc::Sender<Bitfield>) -> anyhow::Result<()> {
1031        tokio::select! {
1032            _ = tx.closed() => {
1033                // the sender is closed, we are done
1034                Err(irpc::channel::SendError::ReceiverClosed.into())
1035            }
1036            e = self.receiver.changed() => Ok(e?),
1037        }
1038    }
1039}
1040
1041#[cfg(test)]
1042mod tests {
1043    use n0_future::StreamExt;
1044    use testresult::TestResult;
1045
1046    use super::*;
1047
1048    #[tokio::test]
1049    async fn smoke() -> TestResult<()> {
1050        let store = MemStore::new();
1051        let tt = store.add_bytes(vec![0u8; 1024 * 64]).temp_tag().await?;
1052        let hash = *tt.hash();
1053        println!("hash: {hash:?}");
1054        let mut stream = store.export_bao(hash, ChunkRanges::all()).stream();
1055        while let Some(item) = stream.next().await {
1056            println!("item: {item:?}");
1057        }
1058        let stream = store.export_bao(hash, ChunkRanges::all());
1059        let exported = stream.bao_to_vec().await?;
1060
1061        let store2 = MemStore::new();
1062        let mut or = store2.observe(hash).stream().await?;
1063        tokio::spawn(async move {
1064            while let Some(event) = or.next().await {
1065                println!("event: {event:?}");
1066            }
1067        });
1068        store2
1069            .import_bao_bytes(hash, ChunkRanges::all(), exported.clone())
1070            .await?;
1071
1072        let exported2 = store2
1073            .export_bao(hash, ChunkRanges::all())
1074            .bao_to_vec()
1075            .await?;
1076        assert_eq!(exported, exported2);
1077
1078        Ok(())
1079    }
1080}