iroh_blobs/store/
readonly_mem.rs

1//! Readonly in-memory store.
2//!
3//! This can only serve data that is provided at creation time. It is much simpler
4//! than the mutable in-memory store and the file system store, and can serve as a
5//! good starting point for custom implementations.
6//!
7//! It can also be useful as a lightweight store for tests.
8use std::{
9    collections::HashMap,
10    io::{self, Write},
11    ops::Deref,
12    path::PathBuf,
13};
14
15use bao_tree::{
16    io::{
17        mixed::{traverse_ranges_validated, EncodedItem, ReadBytesAt},
18        outboard::PreOrderMemOutboard,
19        sync::ReadAt,
20        Leaf,
21    },
22    BaoTree, ChunkRanges,
23};
24use bytes::Bytes;
25use irpc::channel::mpsc;
26use n0_future::future::{self, yield_now};
27use range_collections::range_set::RangeSetRange;
28use ref_cast::RefCast;
29use tokio::task::{JoinError, JoinSet};
30
31use super::util::BaoTreeSender;
32use crate::{
33    api::{
34        self,
35        blobs::{Bitfield, ExportProgressItem},
36        proto::{
37            self, BlobStatus, Command, ExportBaoMsg, ExportBaoRequest, ExportPathMsg,
38            ExportPathRequest, ExportRangesItem, ExportRangesMsg, ExportRangesRequest,
39            ImportBaoMsg, ImportByteStreamMsg, ImportBytesMsg, ImportPathMsg, ObserveMsg,
40            ObserveRequest, WaitIdleMsg,
41        },
42        ApiClient, TempTag,
43    },
44    protocol::ChunkRangesExt,
45    store::{mem::CompleteStorage, IROH_BLOCK_SIZE},
46    Hash,
47};
48
49#[derive(Debug, Clone)]
50pub struct ReadonlyMemStore {
51    client: ApiClient,
52}
53
54impl Deref for ReadonlyMemStore {
55    type Target = crate::api::Store;
56
57    fn deref(&self) -> &Self::Target {
58        crate::api::Store::ref_from_sender(&self.client)
59    }
60}
61
62struct Actor {
63    commands: tokio::sync::mpsc::Receiver<proto::Command>,
64    tasks: JoinSet<()>,
65    idle_waiters: Vec<irpc::channel::oneshot::Sender<()>>,
66    data: HashMap<Hash, CompleteStorage>,
67}
68
69impl Actor {
70    fn new(
71        commands: tokio::sync::mpsc::Receiver<proto::Command>,
72        data: HashMap<Hash, CompleteStorage>,
73    ) -> Self {
74        Self {
75            data,
76            commands,
77            tasks: JoinSet::new(),
78            idle_waiters: Vec::new(),
79        }
80    }
81
82    async fn handle_command(&mut self, cmd: Command) -> Option<irpc::channel::oneshot::Sender<()>> {
83        match cmd {
84            Command::ImportBao(ImportBaoMsg { tx, .. }) => {
85                tx.send(Err(api::Error::Io(io::Error::other(
86                    "import not supported",
87                ))))
88                .await
89                .ok();
90            }
91            Command::WaitIdle(WaitIdleMsg { tx, .. }) => {
92                if self.tasks.is_empty() {
93                    // we are currently idle
94                    tx.send(()).await.ok();
95                } else {
96                    // wait for idle state
97                    self.idle_waiters.push(tx);
98                }
99            }
100            Command::ImportBytes(ImportBytesMsg { tx, .. }) => {
101                tx.send(io::Error::other("import not supported").into())
102                    .await
103                    .ok();
104            }
105            Command::ImportByteStream(ImportByteStreamMsg { tx, .. }) => {
106                tx.send(io::Error::other("import not supported").into())
107                    .await
108                    .ok();
109            }
110            Command::ImportPath(ImportPathMsg { tx, .. }) => {
111                tx.send(io::Error::other("import not supported").into())
112                    .await
113                    .ok();
114            }
115            Command::Observe(ObserveMsg {
116                inner: ObserveRequest { hash },
117                tx,
118                ..
119            }) => {
120                let size = self.data.get_mut(&hash).map(|x| x.data.len() as u64);
121                self.tasks.spawn(async move {
122                    if let Some(size) = size {
123                        tx.send(Bitfield::complete(size)).await.ok();
124                    } else {
125                        tx.send(Bitfield::empty()).await.ok();
126                        future::pending::<()>().await;
127                    };
128                });
129            }
130            Command::ExportBao(ExportBaoMsg {
131                inner: ExportBaoRequest { hash, ranges, .. },
132                tx,
133                ..
134            }) => {
135                let entry = self.data.get(&hash).cloned();
136                self.tasks.spawn(export_bao(hash, entry, ranges, tx));
137            }
138            Command::ExportPath(ExportPathMsg {
139                inner: ExportPathRequest { hash, target, .. },
140                tx,
141                ..
142            }) => {
143                let entry = self.data.get(&hash).cloned();
144                self.tasks.spawn(export_path(entry, target, tx));
145            }
146            Command::Batch(_cmd) => {}
147            Command::ClearProtected(cmd) => {
148                cmd.tx.send(Ok(())).await.ok();
149            }
150            Command::CreateTag(cmd) => {
151                cmd.tx
152                    .send(Err(io::Error::other("create tag not supported").into()))
153                    .await
154                    .ok();
155            }
156            Command::CreateTempTag(cmd) => {
157                cmd.tx.send(TempTag::new(cmd.inner.value, None)).await.ok();
158            }
159            Command::RenameTag(cmd) => {
160                cmd.tx
161                    .send(Err(io::Error::other("rename tag not supported").into()))
162                    .await
163                    .ok();
164            }
165            Command::DeleteTags(cmd) => {
166                cmd.tx
167                    .send(Err(io::Error::other("delete tags not supported").into()))
168                    .await
169                    .ok();
170            }
171            Command::DeleteBlobs(cmd) => {
172                cmd.tx
173                    .send(Err(io::Error::other("delete blobs not supported").into()))
174                    .await
175                    .ok();
176            }
177            Command::ListBlobs(cmd) => {
178                let hashes: Vec<Hash> = self.data.keys().cloned().collect();
179                self.tasks.spawn(async move {
180                    for hash in hashes {
181                        cmd.tx.send(Ok(hash)).await.ok();
182                    }
183                });
184            }
185            Command::BlobStatus(cmd) => {
186                let hash = cmd.inner.hash;
187                let entry = self.data.get(&hash);
188                let status = if let Some(entry) = entry {
189                    BlobStatus::Complete {
190                        size: entry.data.len() as u64,
191                    }
192                } else {
193                    BlobStatus::NotFound
194                };
195                cmd.tx.send(status).await.ok();
196            }
197            Command::ListTags(cmd) => {
198                cmd.tx.send(Vec::new()).await.ok();
199            }
200            Command::SetTag(cmd) => {
201                cmd.tx
202                    .send(Err(io::Error::other("set tag not supported").into()))
203                    .await
204                    .ok();
205            }
206            Command::ListTempTags(cmd) => {
207                cmd.tx.send(Vec::new()).await.ok();
208            }
209            Command::SyncDb(cmd) => {
210                cmd.tx.send(Ok(())).await.ok();
211            }
212            Command::Shutdown(cmd) => {
213                return Some(cmd.tx);
214            }
215            Command::ExportRanges(cmd) => {
216                let entry = self.data.get(&cmd.inner.hash).cloned();
217                self.tasks.spawn(export_ranges(cmd, entry));
218            }
219        }
220        None
221    }
222
223    fn log_unit_task(&self, res: Result<(), JoinError>) {
224        if let Err(e) = res {
225            tracing::error!("task failed: {e}");
226        }
227    }
228
229    async fn run(mut self) {
230        loop {
231            tokio::select! {
232                Some(cmd) = self.commands.recv() => {
233                    if let Some(shutdown) = self.handle_command(cmd).await {
234                        shutdown.send(()).await.ok();
235                        break;
236                    }
237                },
238                Some(res) = self.tasks.join_next(), if !self.tasks.is_empty() => {
239                    self.log_unit_task(res);
240                    if self.tasks.is_empty() {
241                        // we are idle now
242                        for tx in self.idle_waiters.drain(..) {
243                            tx.send(()).await.ok();
244                        }
245                    }
246                },
247                else => break,
248            }
249        }
250    }
251}
252
253async fn export_bao(
254    hash: Hash,
255    entry: Option<CompleteStorage>,
256    ranges: ChunkRanges,
257    mut sender: mpsc::Sender<EncodedItem>,
258) {
259    let entry = match entry {
260        Some(entry) => entry,
261        None => {
262            sender
263                .send(EncodedItem::Error(bao_tree::io::EncodeError::Io(
264                    io::Error::new(
265                        io::ErrorKind::UnexpectedEof,
266                        "export task ended unexpectedly",
267                    ),
268                )))
269                .await
270                .ok();
271            return;
272        }
273    };
274    let data = entry.data;
275    let outboard = entry.outboard;
276    let size = data.as_ref().len() as u64;
277    let tree = BaoTree::new(size, IROH_BLOCK_SIZE);
278    let outboard = PreOrderMemOutboard {
279        root: hash.into(),
280        tree,
281        data: outboard,
282    };
283    let sender = BaoTreeSender::ref_cast_mut(&mut sender);
284    traverse_ranges_validated(data.as_ref(), outboard, &ranges, sender)
285        .await
286        .ok();
287}
288
289async fn export_ranges(mut cmd: ExportRangesMsg, entry: Option<CompleteStorage>) {
290    let Some(entry) = entry else {
291        cmd.tx
292            .send(ExportRangesItem::Error(api::Error::io(
293                io::ErrorKind::NotFound,
294                "hash not found",
295            )))
296            .await
297            .ok();
298        return;
299    };
300    if let Err(cause) = export_ranges_impl(cmd.inner, &mut cmd.tx, entry).await {
301        cmd.tx
302            .send(ExportRangesItem::Error(cause.into()))
303            .await
304            .ok();
305    }
306}
307
308async fn export_ranges_impl(
309    cmd: ExportRangesRequest,
310    tx: &mut mpsc::Sender<ExportRangesItem>,
311    entry: CompleteStorage,
312) -> io::Result<()> {
313    let ExportRangesRequest { ranges, .. } = cmd;
314    let data = entry.data;
315    let size = data.len() as u64;
316    let bitfield = Bitfield::complete(size);
317    for range in ranges.iter() {
318        let range = match range {
319            RangeSetRange::Range(range) => size.min(*range.start)..size.min(*range.end),
320            RangeSetRange::RangeFrom(range) => size.min(*range.start)..size,
321        };
322        let requested = ChunkRanges::bytes(range.start..range.end);
323        if !bitfield.ranges.is_superset(&requested) {
324            return Err(io::Error::other(format!(
325                "missing range: {requested:?}, present: {bitfield:?}",
326            )));
327        }
328        let bs = 1024;
329        let mut offset = range.start;
330        loop {
331            let end: u64 = (offset + bs).min(range.end);
332            let size = (end - offset) as usize;
333            tx.send(
334                Leaf {
335                    offset,
336                    data: data.read_bytes_at(offset, size)?,
337                }
338                .into(),
339            )
340            .await?;
341            offset = end;
342            if offset >= range.end {
343                break;
344            }
345        }
346    }
347    Ok(())
348}
349
350impl ReadonlyMemStore {
351    pub fn new(items: impl IntoIterator<Item = impl AsRef<[u8]>>) -> Self {
352        let mut entries = HashMap::new();
353        for item in items {
354            let data = Bytes::copy_from_slice(item.as_ref());
355            let (hash, entry) = CompleteStorage::create(data);
356            entries.insert(hash, entry);
357        }
358        let (sender, receiver) = tokio::sync::mpsc::channel(1);
359        let actor = Actor::new(receiver, entries);
360        tokio::spawn(actor.run());
361        let local = irpc::LocalSender::from(sender);
362        Self {
363            client: local.into(),
364        }
365    }
366}
367
368async fn export_path(
369    entry: Option<CompleteStorage>,
370    target: PathBuf,
371    mut tx: mpsc::Sender<ExportProgressItem>,
372) {
373    let Some(entry) = entry else {
374        tx.send(api::Error::io(io::ErrorKind::NotFound, "hash not found").into())
375            .await
376            .ok();
377        return;
378    };
379    match export_path_impl(entry, target, &mut tx).await {
380        Ok(()) => tx.send(ExportProgressItem::Done).await.ok(),
381        Err(cause) => tx.send(api::Error::from(cause).into()).await.ok(),
382    };
383}
384
385async fn export_path_impl(
386    entry: CompleteStorage,
387    target: PathBuf,
388    tx: &mut mpsc::Sender<ExportProgressItem>,
389) -> io::Result<()> {
390    let data = entry.data;
391    // todo: for partial entries make sure to only write the part that is actually present
392    let mut file = std::fs::File::create(&target)?;
393    let size = data.len() as u64;
394    tx.send(ExportProgressItem::Size(size)).await?;
395    let mut buf = [0u8; 1024 * 64];
396    for offset in (0..size).step_by(1024 * 64) {
397        let len = std::cmp::min(size - offset, 1024 * 64) as usize;
398        let buf = &mut buf[..len];
399        data.as_ref().read_exact_at(offset, buf)?;
400        file.write_all(buf)?;
401        tx.try_send(ExportProgressItem::CopyProgress(offset))
402            .await
403            .map_err(|_e| io::Error::other("error"))?;
404        yield_now().await;
405    }
406    Ok(())
407}