1use std::{
9 collections::HashMap,
10 io::{self, Write},
11 ops::Deref,
12 path::PathBuf,
13};
14
15use bao_tree::{
16 io::{
17 mixed::{traverse_ranges_validated, EncodedItem, ReadBytesAt},
18 outboard::PreOrderMemOutboard,
19 sync::ReadAt,
20 Leaf,
21 },
22 BaoTree, ChunkRanges,
23};
24use bytes::Bytes;
25use irpc::channel::mpsc;
26use n0_future::future::{self, yield_now};
27use range_collections::range_set::RangeSetRange;
28use ref_cast::RefCast;
29use tokio::task::{JoinError, JoinSet};
30
31use super::util::BaoTreeSender;
32use crate::{
33 api::{
34 self,
35 blobs::{Bitfield, ExportProgressItem},
36 proto::{
37 self, BlobStatus, Command, ExportBaoMsg, ExportBaoRequest, ExportPathMsg,
38 ExportPathRequest, ExportRangesItem, ExportRangesMsg, ExportRangesRequest,
39 ImportBaoMsg, ImportByteStreamMsg, ImportBytesMsg, ImportPathMsg, ObserveMsg,
40 ObserveRequest,
41 },
42 ApiClient, TempTag,
43 },
44 store::{mem::CompleteStorage, IROH_BLOCK_SIZE},
45 util::ChunkRangesExt,
46 Hash,
47};
48
49#[derive(Debug, Clone)]
50pub struct ReadonlyMemStore {
51 client: ApiClient,
52}
53
54impl Deref for ReadonlyMemStore {
55 type Target = crate::api::Store;
56
57 fn deref(&self) -> &Self::Target {
58 crate::api::Store::ref_from_sender(&self.client)
59 }
60}
61
62struct Actor {
63 commands: tokio::sync::mpsc::Receiver<proto::Command>,
64 tasks: JoinSet<()>,
65 data: HashMap<Hash, CompleteStorage>,
66}
67
68impl Actor {
69 fn new(
70 commands: tokio::sync::mpsc::Receiver<proto::Command>,
71 data: HashMap<Hash, CompleteStorage>,
72 ) -> Self {
73 Self {
74 data,
75 commands,
76 tasks: JoinSet::new(),
77 }
78 }
79
80 async fn handle_command(&mut self, cmd: Command) -> Option<irpc::channel::oneshot::Sender<()>> {
81 match cmd {
82 Command::ImportBao(ImportBaoMsg { tx, .. }) => {
83 tx.send(Err(api::Error::Io(io::Error::other(
84 "import not supported",
85 ))))
86 .await
87 .ok();
88 }
89 Command::ImportBytes(ImportBytesMsg { tx, .. }) => {
90 tx.send(io::Error::other("import not supported").into())
91 .await
92 .ok();
93 }
94 Command::ImportByteStream(ImportByteStreamMsg { tx, .. }) => {
95 tx.send(io::Error::other("import not supported").into())
96 .await
97 .ok();
98 }
99 Command::ImportPath(ImportPathMsg { tx, .. }) => {
100 tx.send(io::Error::other("import not supported").into())
101 .await
102 .ok();
103 }
104 Command::Observe(ObserveMsg {
105 inner: ObserveRequest { hash },
106 tx,
107 ..
108 }) => {
109 let size = self.data.get_mut(&hash).map(|x| x.data.len() as u64);
110 self.tasks.spawn(async move {
111 if let Some(size) = size {
112 tx.send(Bitfield::complete(size)).await.ok();
113 } else {
114 tx.send(Bitfield::empty()).await.ok();
115 future::pending::<()>().await;
116 };
117 });
118 }
119 Command::ExportBao(ExportBaoMsg {
120 inner: ExportBaoRequest { hash, ranges },
121 tx,
122 ..
123 }) => {
124 let entry = self.data.get(&hash).cloned();
125 self.tasks.spawn(export_bao(hash, entry, ranges, tx));
126 }
127 Command::ExportPath(ExportPathMsg {
128 inner: ExportPathRequest { hash, target, .. },
129 tx,
130 ..
131 }) => {
132 let entry = self.data.get(&hash).cloned();
133 self.tasks.spawn(export_path(entry, target, tx));
134 }
135 Command::Batch(_cmd) => {}
136 Command::ClearProtected(cmd) => {
137 cmd.tx.send(Ok(())).await.ok();
138 }
139 Command::CreateTag(cmd) => {
140 cmd.tx
141 .send(Err(io::Error::other("create tag not supported").into()))
142 .await
143 .ok();
144 }
145 Command::CreateTempTag(cmd) => {
146 cmd.tx.send(TempTag::new(cmd.inner.value, None)).await.ok();
147 }
148 Command::RenameTag(cmd) => {
149 cmd.tx
150 .send(Err(io::Error::other("rename tag not supported").into()))
151 .await
152 .ok();
153 }
154 Command::DeleteTags(cmd) => {
155 cmd.tx
156 .send(Err(io::Error::other("delete tags not supported").into()))
157 .await
158 .ok();
159 }
160 Command::DeleteBlobs(cmd) => {
161 cmd.tx
162 .send(Err(io::Error::other("delete blobs not supported").into()))
163 .await
164 .ok();
165 }
166 Command::ListBlobs(cmd) => {
167 let hashes: Vec<Hash> = self.data.keys().cloned().collect();
168 self.tasks.spawn(async move {
169 for hash in hashes {
170 cmd.tx.send(Ok(hash)).await.ok();
171 }
172 });
173 }
174 Command::BlobStatus(cmd) => {
175 let hash = cmd.inner.hash;
176 let entry = self.data.get(&hash);
177 let status = if let Some(entry) = entry {
178 BlobStatus::Complete {
179 size: entry.data.len() as u64,
180 }
181 } else {
182 BlobStatus::NotFound
183 };
184 cmd.tx.send(status).await.ok();
185 }
186 Command::ListTags(cmd) => {
187 cmd.tx.send(Vec::new()).await.ok();
188 }
189 Command::SetTag(cmd) => {
190 cmd.tx
191 .send(Err(io::Error::other("set tag not supported").into()))
192 .await
193 .ok();
194 }
195 Command::ListTempTags(cmd) => {
196 cmd.tx.send(Vec::new()).await.ok();
197 }
198 Command::SyncDb(cmd) => {
199 cmd.tx.send(Ok(())).await.ok();
200 }
201 Command::Shutdown(cmd) => {
202 return Some(cmd.tx);
203 }
204 Command::ExportRanges(cmd) => {
205 let entry = self.data.get(&cmd.inner.hash).cloned();
206 self.tasks.spawn(export_ranges(cmd, entry));
207 }
208 }
209 None
210 }
211
212 fn log_unit_task(&self, res: Result<(), JoinError>) {
213 if let Err(e) = res {
214 tracing::error!("task failed: {e}");
215 }
216 }
217
218 async fn run(mut self) {
219 loop {
220 tokio::select! {
221 Some(cmd) = self.commands.recv() => {
222 if let Some(shutdown) = self.handle_command(cmd).await {
223 shutdown.send(()).await.ok();
224 break;
225 }
226 },
227 Some(res) = self.tasks.join_next(), if !self.tasks.is_empty() => {
228 self.log_unit_task(res);
229 },
230 else => break,
231 }
232 }
233 }
234}
235
236async fn export_bao(
237 hash: Hash,
238 entry: Option<CompleteStorage>,
239 ranges: ChunkRanges,
240 mut sender: mpsc::Sender<EncodedItem>,
241) {
242 let entry = match entry {
243 Some(entry) => entry,
244 None => {
245 sender
246 .send(EncodedItem::Error(bao_tree::io::EncodeError::Io(
247 io::Error::new(
248 io::ErrorKind::UnexpectedEof,
249 "export task ended unexpectedly",
250 ),
251 )))
252 .await
253 .ok();
254 return;
255 }
256 };
257 let data = entry.data;
258 let outboard = entry.outboard;
259 let size = data.as_ref().len() as u64;
260 let tree = BaoTree::new(size, IROH_BLOCK_SIZE);
261 let outboard = PreOrderMemOutboard {
262 root: hash.into(),
263 tree,
264 data: outboard,
265 };
266 let sender = BaoTreeSender::ref_cast_mut(&mut sender);
267 traverse_ranges_validated(data.as_ref(), outboard, &ranges, sender)
268 .await
269 .ok();
270}
271
272async fn export_ranges(mut cmd: ExportRangesMsg, entry: Option<CompleteStorage>) {
273 let Some(entry) = entry else {
274 cmd.tx
275 .send(ExportRangesItem::Error(api::Error::io(
276 io::ErrorKind::NotFound,
277 "hash not found",
278 )))
279 .await
280 .ok();
281 return;
282 };
283 if let Err(cause) = export_ranges_impl(cmd.inner, &mut cmd.tx, entry).await {
284 cmd.tx
285 .send(ExportRangesItem::Error(cause.into()))
286 .await
287 .ok();
288 }
289}
290
291async fn export_ranges_impl(
292 cmd: ExportRangesRequest,
293 tx: &mut mpsc::Sender<ExportRangesItem>,
294 entry: CompleteStorage,
295) -> io::Result<()> {
296 let ExportRangesRequest { ranges, .. } = cmd;
297 let data = entry.data;
298 let size = data.len() as u64;
299 let bitfield = Bitfield::complete(size);
300 for range in ranges.iter() {
301 let range = match range {
302 RangeSetRange::Range(range) => size.min(*range.start)..size.min(*range.end),
303 RangeSetRange::RangeFrom(range) => size.min(*range.start)..size,
304 };
305 let requested = ChunkRanges::bytes(range.start..range.end);
306 if !bitfield.ranges.is_superset(&requested) {
307 return Err(io::Error::other(format!(
308 "missing range: {requested:?}, present: {bitfield:?}",
309 )));
310 }
311 let bs = 1024;
312 let mut offset = range.start;
313 loop {
314 let end: u64 = (offset + bs).min(range.end);
315 let size = (end - offset) as usize;
316 tx.send(
317 Leaf {
318 offset,
319 data: data.read_bytes_at(offset, size)?,
320 }
321 .into(),
322 )
323 .await?;
324 offset = end;
325 if offset >= range.end {
326 break;
327 }
328 }
329 }
330 Ok(())
331}
332
333impl ReadonlyMemStore {
334 pub fn new(items: impl IntoIterator<Item = impl AsRef<[u8]>>) -> Self {
335 let mut entries = HashMap::new();
336 for item in items {
337 let data = Bytes::copy_from_slice(item.as_ref());
338 let (hash, entry) = CompleteStorage::create(data);
339 entries.insert(hash, entry);
340 }
341 let (sender, receiver) = tokio::sync::mpsc::channel(1);
342 let actor = Actor::new(receiver, entries);
343 tokio::spawn(actor.run());
344 let local = irpc::LocalSender::from(sender);
345 Self {
346 client: local.into(),
347 }
348 }
349}
350
351async fn export_path(
352 entry: Option<CompleteStorage>,
353 target: PathBuf,
354 mut tx: mpsc::Sender<ExportProgressItem>,
355) {
356 let Some(entry) = entry else {
357 tx.send(api::Error::io(io::ErrorKind::NotFound, "hash not found").into())
358 .await
359 .ok();
360 return;
361 };
362 match export_path_impl(entry, target, &mut tx).await {
363 Ok(()) => tx.send(ExportProgressItem::Done).await.ok(),
364 Err(cause) => tx.send(api::Error::from(cause).into()).await.ok(),
365 };
366}
367
368async fn export_path_impl(
369 entry: CompleteStorage,
370 target: PathBuf,
371 tx: &mut mpsc::Sender<ExportProgressItem>,
372) -> io::Result<()> {
373 let data = entry.data;
374 let mut file = std::fs::File::create(&target)?;
376 let size = data.len() as u64;
377 tx.send(ExportProgressItem::Size(size)).await?;
378 let mut buf = [0u8; 1024 * 64];
379 for offset in (0..size).step_by(1024 * 64) {
380 let len = std::cmp::min(size - offset, 1024 * 64) as usize;
381 let buf = &mut buf[..len];
382 data.as_ref().read_exact_at(offset, buf)?;
383 file.write_all(buf)?;
384 tx.try_send(ExportProgressItem::CopyProgress(offset))
385 .await
386 .map_err(|_e| io::Error::other("error"))?;
387 yield_now().await;
388 }
389 Ok(())
390}