iroh_blobs/store/
readonly_mem.rs

1//! Readonly in-memory store.
2//!
3//! This can only serve data that is provided at creation time. It is much simpler
4//! than the mutable in-memory store and the file system store, and can serve as a
5//! good starting point for custom implementations.
6//!
7//! It can also be useful as a lightweight store for tests.
8use std::{
9    collections::HashMap,
10    io::{self, Write},
11    ops::Deref,
12    path::PathBuf,
13};
14
15use bao_tree::{
16    io::{
17        mixed::{traverse_ranges_validated, EncodedItem, ReadBytesAt},
18        outboard::PreOrderMemOutboard,
19        sync::ReadAt,
20        Leaf,
21    },
22    BaoTree, ChunkRanges,
23};
24use bytes::Bytes;
25use irpc::channel::mpsc;
26use n0_future::future::{self, yield_now};
27use range_collections::range_set::RangeSetRange;
28use ref_cast::RefCast;
29use tokio::task::{JoinError, JoinSet};
30
31use super::util::BaoTreeSender;
32use crate::{
33    api::{
34        self,
35        blobs::{Bitfield, ExportProgressItem},
36        proto::{
37            self, BlobStatus, Command, ExportBaoMsg, ExportBaoRequest, ExportPathMsg,
38            ExportPathRequest, ExportRangesItem, ExportRangesMsg, ExportRangesRequest,
39            ImportBaoMsg, ImportByteStreamMsg, ImportBytesMsg, ImportPathMsg, ObserveMsg,
40            ObserveRequest,
41        },
42        ApiClient, TempTag,
43    },
44    store::{mem::CompleteStorage, IROH_BLOCK_SIZE},
45    util::ChunkRangesExt,
46    Hash,
47};
48
49#[derive(Debug, Clone)]
50pub struct ReadonlyMemStore {
51    client: ApiClient,
52}
53
54impl Deref for ReadonlyMemStore {
55    type Target = crate::api::Store;
56
57    fn deref(&self) -> &Self::Target {
58        crate::api::Store::ref_from_sender(&self.client)
59    }
60}
61
62struct Actor {
63    commands: tokio::sync::mpsc::Receiver<proto::Command>,
64    tasks: JoinSet<()>,
65    data: HashMap<Hash, CompleteStorage>,
66}
67
68impl Actor {
69    fn new(
70        commands: tokio::sync::mpsc::Receiver<proto::Command>,
71        data: HashMap<Hash, CompleteStorage>,
72    ) -> Self {
73        Self {
74            data,
75            commands,
76            tasks: JoinSet::new(),
77        }
78    }
79
80    async fn handle_command(&mut self, cmd: Command) -> Option<irpc::channel::oneshot::Sender<()>> {
81        match cmd {
82            Command::ImportBao(ImportBaoMsg { tx, .. }) => {
83                tx.send(Err(api::Error::Io(io::Error::other(
84                    "import not supported",
85                ))))
86                .await
87                .ok();
88            }
89            Command::ImportBytes(ImportBytesMsg { tx, .. }) => {
90                tx.send(io::Error::other("import not supported").into())
91                    .await
92                    .ok();
93            }
94            Command::ImportByteStream(ImportByteStreamMsg { tx, .. }) => {
95                tx.send(io::Error::other("import not supported").into())
96                    .await
97                    .ok();
98            }
99            Command::ImportPath(ImportPathMsg { tx, .. }) => {
100                tx.send(io::Error::other("import not supported").into())
101                    .await
102                    .ok();
103            }
104            Command::Observe(ObserveMsg {
105                inner: ObserveRequest { hash },
106                tx,
107                ..
108            }) => {
109                let size = self.data.get_mut(&hash).map(|x| x.data.len() as u64);
110                self.tasks.spawn(async move {
111                    if let Some(size) = size {
112                        tx.send(Bitfield::complete(size)).await.ok();
113                    } else {
114                        tx.send(Bitfield::empty()).await.ok();
115                        future::pending::<()>().await;
116                    };
117                });
118            }
119            Command::ExportBao(ExportBaoMsg {
120                inner: ExportBaoRequest { hash, ranges },
121                tx,
122                ..
123            }) => {
124                let entry = self.data.get(&hash).cloned();
125                self.tasks.spawn(export_bao(hash, entry, ranges, tx));
126            }
127            Command::ExportPath(ExportPathMsg {
128                inner: ExportPathRequest { hash, target, .. },
129                tx,
130                ..
131            }) => {
132                let entry = self.data.get(&hash).cloned();
133                self.tasks.spawn(export_path(entry, target, tx));
134            }
135            Command::Batch(_cmd) => {}
136            Command::ClearProtected(cmd) => {
137                cmd.tx.send(Ok(())).await.ok();
138            }
139            Command::CreateTag(cmd) => {
140                cmd.tx
141                    .send(Err(io::Error::other("create tag not supported").into()))
142                    .await
143                    .ok();
144            }
145            Command::CreateTempTag(cmd) => {
146                cmd.tx.send(TempTag::new(cmd.inner.value, None)).await.ok();
147            }
148            Command::RenameTag(cmd) => {
149                cmd.tx
150                    .send(Err(io::Error::other("rename tag not supported").into()))
151                    .await
152                    .ok();
153            }
154            Command::DeleteTags(cmd) => {
155                cmd.tx
156                    .send(Err(io::Error::other("delete tags not supported").into()))
157                    .await
158                    .ok();
159            }
160            Command::DeleteBlobs(cmd) => {
161                cmd.tx
162                    .send(Err(io::Error::other("delete blobs not supported").into()))
163                    .await
164                    .ok();
165            }
166            Command::ListBlobs(cmd) => {
167                let hashes: Vec<Hash> = self.data.keys().cloned().collect();
168                self.tasks.spawn(async move {
169                    for hash in hashes {
170                        cmd.tx.send(Ok(hash)).await.ok();
171                    }
172                });
173            }
174            Command::BlobStatus(cmd) => {
175                let hash = cmd.inner.hash;
176                let entry = self.data.get(&hash);
177                let status = if let Some(entry) = entry {
178                    BlobStatus::Complete {
179                        size: entry.data.len() as u64,
180                    }
181                } else {
182                    BlobStatus::NotFound
183                };
184                cmd.tx.send(status).await.ok();
185            }
186            Command::ListTags(cmd) => {
187                cmd.tx.send(Vec::new()).await.ok();
188            }
189            Command::SetTag(cmd) => {
190                cmd.tx
191                    .send(Err(io::Error::other("set tag not supported").into()))
192                    .await
193                    .ok();
194            }
195            Command::ListTempTags(cmd) => {
196                cmd.tx.send(Vec::new()).await.ok();
197            }
198            Command::SyncDb(cmd) => {
199                cmd.tx.send(Ok(())).await.ok();
200            }
201            Command::Shutdown(cmd) => {
202                return Some(cmd.tx);
203            }
204            Command::ExportRanges(cmd) => {
205                let entry = self.data.get(&cmd.inner.hash).cloned();
206                self.tasks.spawn(export_ranges(cmd, entry));
207            }
208        }
209        None
210    }
211
212    fn log_unit_task(&self, res: Result<(), JoinError>) {
213        if let Err(e) = res {
214            tracing::error!("task failed: {e}");
215        }
216    }
217
218    async fn run(mut self) {
219        loop {
220            tokio::select! {
221                Some(cmd) = self.commands.recv() => {
222                    if let Some(shutdown) = self.handle_command(cmd).await {
223                        shutdown.send(()).await.ok();
224                        break;
225                    }
226                },
227                Some(res) = self.tasks.join_next(), if !self.tasks.is_empty() => {
228                    self.log_unit_task(res);
229                },
230                else => break,
231            }
232        }
233    }
234}
235
236async fn export_bao(
237    hash: Hash,
238    entry: Option<CompleteStorage>,
239    ranges: ChunkRanges,
240    mut sender: mpsc::Sender<EncodedItem>,
241) {
242    let entry = match entry {
243        Some(entry) => entry,
244        None => {
245            sender
246                .send(EncodedItem::Error(bao_tree::io::EncodeError::Io(
247                    io::Error::new(
248                        io::ErrorKind::UnexpectedEof,
249                        "export task ended unexpectedly",
250                    ),
251                )))
252                .await
253                .ok();
254            return;
255        }
256    };
257    let data = entry.data;
258    let outboard = entry.outboard;
259    let size = data.as_ref().len() as u64;
260    let tree = BaoTree::new(size, IROH_BLOCK_SIZE);
261    let outboard = PreOrderMemOutboard {
262        root: hash.into(),
263        tree,
264        data: outboard,
265    };
266    let sender = BaoTreeSender::ref_cast_mut(&mut sender);
267    traverse_ranges_validated(data.as_ref(), outboard, &ranges, sender)
268        .await
269        .ok();
270}
271
272async fn export_ranges(mut cmd: ExportRangesMsg, entry: Option<CompleteStorage>) {
273    let Some(entry) = entry else {
274        cmd.tx
275            .send(ExportRangesItem::Error(api::Error::io(
276                io::ErrorKind::NotFound,
277                "hash not found",
278            )))
279            .await
280            .ok();
281        return;
282    };
283    if let Err(cause) = export_ranges_impl(cmd.inner, &mut cmd.tx, entry).await {
284        cmd.tx
285            .send(ExportRangesItem::Error(cause.into()))
286            .await
287            .ok();
288    }
289}
290
291async fn export_ranges_impl(
292    cmd: ExportRangesRequest,
293    tx: &mut mpsc::Sender<ExportRangesItem>,
294    entry: CompleteStorage,
295) -> io::Result<()> {
296    let ExportRangesRequest { ranges, .. } = cmd;
297    let data = entry.data;
298    let size = data.len() as u64;
299    let bitfield = Bitfield::complete(size);
300    for range in ranges.iter() {
301        let range = match range {
302            RangeSetRange::Range(range) => size.min(*range.start)..size.min(*range.end),
303            RangeSetRange::RangeFrom(range) => size.min(*range.start)..size,
304        };
305        let requested = ChunkRanges::bytes(range.start..range.end);
306        if !bitfield.ranges.is_superset(&requested) {
307            return Err(io::Error::other(format!(
308                "missing range: {requested:?}, present: {bitfield:?}",
309            )));
310        }
311        let bs = 1024;
312        let mut offset = range.start;
313        loop {
314            let end: u64 = (offset + bs).min(range.end);
315            let size = (end - offset) as usize;
316            tx.send(
317                Leaf {
318                    offset,
319                    data: data.read_bytes_at(offset, size)?,
320                }
321                .into(),
322            )
323            .await?;
324            offset = end;
325            if offset >= range.end {
326                break;
327            }
328        }
329    }
330    Ok(())
331}
332
333impl ReadonlyMemStore {
334    pub fn new(items: impl IntoIterator<Item = impl AsRef<[u8]>>) -> Self {
335        let mut entries = HashMap::new();
336        for item in items {
337            let data = Bytes::copy_from_slice(item.as_ref());
338            let (hash, entry) = CompleteStorage::create(data);
339            entries.insert(hash, entry);
340        }
341        let (sender, receiver) = tokio::sync::mpsc::channel(1);
342        let actor = Actor::new(receiver, entries);
343        tokio::spawn(actor.run());
344        let local = irpc::LocalSender::from(sender);
345        Self {
346            client: local.into(),
347        }
348    }
349}
350
351async fn export_path(
352    entry: Option<CompleteStorage>,
353    target: PathBuf,
354    mut tx: mpsc::Sender<ExportProgressItem>,
355) {
356    let Some(entry) = entry else {
357        tx.send(api::Error::io(io::ErrorKind::NotFound, "hash not found").into())
358            .await
359            .ok();
360        return;
361    };
362    match export_path_impl(entry, target, &mut tx).await {
363        Ok(()) => tx.send(ExportProgressItem::Done).await.ok(),
364        Err(cause) => tx.send(api::Error::from(cause).into()).await.ok(),
365    };
366}
367
368async fn export_path_impl(
369    entry: CompleteStorage,
370    target: PathBuf,
371    tx: &mut mpsc::Sender<ExportProgressItem>,
372) -> io::Result<()> {
373    let data = entry.data;
374    // todo: for partial entries make sure to only write the part that is actually present
375    let mut file = std::fs::File::create(&target)?;
376    let size = data.len() as u64;
377    tx.send(ExportProgressItem::Size(size)).await?;
378    let mut buf = [0u8; 1024 * 64];
379    for offset in (0..size).step_by(1024 * 64) {
380        let len = std::cmp::min(size - offset, 1024 * 64) as usize;
381        let buf = &mut buf[..len];
382        data.as_ref().read_exact_at(offset, buf)?;
383        file.write_all(buf)?;
384        tx.try_send(ExportProgressItem::CopyProgress(offset))
385            .await
386            .map_err(|_e| io::Error::other("error"))?;
387        yield_now().await;
388    }
389    Ok(())
390}