1use std::{
9 collections::HashMap,
10 io::{self, Write},
11 ops::Deref,
12 path::PathBuf,
13};
14
15use bao_tree::{
16 io::{
17 mixed::{traverse_ranges_validated, EncodedItem, ReadBytesAt},
18 outboard::PreOrderMemOutboard,
19 sync::ReadAt,
20 Leaf,
21 },
22 BaoTree, ChunkRanges,
23};
24use bytes::Bytes;
25use irpc::channel::mpsc;
26use n0_future::future::{self, yield_now};
27use range_collections::range_set::RangeSetRange;
28use ref_cast::RefCast;
29use tokio::task::{JoinError, JoinSet};
30
31use super::util::BaoTreeSender;
32use crate::{
33 api::{
34 self,
35 blobs::{Bitfield, ExportProgressItem},
36 proto::{
37 self, BlobStatus, Command, ExportBaoMsg, ExportBaoRequest, ExportPathMsg,
38 ExportPathRequest, ExportRangesItem, ExportRangesMsg, ExportRangesRequest,
39 ImportBaoMsg, ImportByteStreamMsg, ImportBytesMsg, ImportPathMsg, ObserveMsg,
40 ObserveRequest, WaitIdleMsg,
41 },
42 ApiClient, TempTag,
43 },
44 protocol::ChunkRangesExt,
45 store::{mem::CompleteStorage, IROH_BLOCK_SIZE},
46 Hash,
47};
48
49#[derive(Debug, Clone)]
50pub struct ReadonlyMemStore {
51 client: ApiClient,
52}
53
54impl Deref for ReadonlyMemStore {
55 type Target = crate::api::Store;
56
57 fn deref(&self) -> &Self::Target {
58 crate::api::Store::ref_from_sender(&self.client)
59 }
60}
61
62struct Actor {
63 commands: tokio::sync::mpsc::Receiver<proto::Command>,
64 tasks: JoinSet<()>,
65 idle_waiters: Vec<irpc::channel::oneshot::Sender<()>>,
66 data: HashMap<Hash, CompleteStorage>,
67}
68
69impl Actor {
70 fn new(
71 commands: tokio::sync::mpsc::Receiver<proto::Command>,
72 data: HashMap<Hash, CompleteStorage>,
73 ) -> Self {
74 Self {
75 data,
76 commands,
77 tasks: JoinSet::new(),
78 idle_waiters: Vec::new(),
79 }
80 }
81
82 async fn handle_command(&mut self, cmd: Command) -> Option<irpc::channel::oneshot::Sender<()>> {
83 match cmd {
84 Command::ImportBao(ImportBaoMsg { tx, .. }) => {
85 tx.send(Err(api::Error::Io(io::Error::other(
86 "import not supported",
87 ))))
88 .await
89 .ok();
90 }
91 Command::WaitIdle(WaitIdleMsg { tx, .. }) => {
92 if self.tasks.is_empty() {
93 tx.send(()).await.ok();
95 } else {
96 self.idle_waiters.push(tx);
98 }
99 }
100 Command::ImportBytes(ImportBytesMsg { tx, .. }) => {
101 tx.send(io::Error::other("import not supported").into())
102 .await
103 .ok();
104 }
105 Command::ImportByteStream(ImportByteStreamMsg { tx, .. }) => {
106 tx.send(io::Error::other("import not supported").into())
107 .await
108 .ok();
109 }
110 Command::ImportPath(ImportPathMsg { tx, .. }) => {
111 tx.send(io::Error::other("import not supported").into())
112 .await
113 .ok();
114 }
115 Command::Observe(ObserveMsg {
116 inner: ObserveRequest { hash },
117 tx,
118 ..
119 }) => {
120 let size = self.data.get_mut(&hash).map(|x| x.data.len() as u64);
121 self.tasks.spawn(async move {
122 if let Some(size) = size {
123 tx.send(Bitfield::complete(size)).await.ok();
124 } else {
125 tx.send(Bitfield::empty()).await.ok();
126 future::pending::<()>().await;
127 };
128 });
129 }
130 Command::ExportBao(ExportBaoMsg {
131 inner: ExportBaoRequest { hash, ranges, .. },
132 tx,
133 ..
134 }) => {
135 let entry = self.data.get(&hash).cloned();
136 self.tasks.spawn(export_bao(hash, entry, ranges, tx));
137 }
138 Command::ExportPath(ExportPathMsg {
139 inner: ExportPathRequest { hash, target, .. },
140 tx,
141 ..
142 }) => {
143 let entry = self.data.get(&hash).cloned();
144 self.tasks.spawn(export_path(entry, target, tx));
145 }
146 Command::Batch(_cmd) => {}
147 Command::ClearProtected(cmd) => {
148 cmd.tx.send(Ok(())).await.ok();
149 }
150 Command::CreateTag(cmd) => {
151 cmd.tx
152 .send(Err(io::Error::other("create tag not supported").into()))
153 .await
154 .ok();
155 }
156 Command::CreateTempTag(cmd) => {
157 cmd.tx.send(TempTag::new(cmd.inner.value, None)).await.ok();
158 }
159 Command::RenameTag(cmd) => {
160 cmd.tx
161 .send(Err(io::Error::other("rename tag not supported").into()))
162 .await
163 .ok();
164 }
165 Command::DeleteTags(cmd) => {
166 cmd.tx
167 .send(Err(io::Error::other("delete tags not supported").into()))
168 .await
169 .ok();
170 }
171 Command::DeleteBlobs(cmd) => {
172 cmd.tx
173 .send(Err(io::Error::other("delete blobs not supported").into()))
174 .await
175 .ok();
176 }
177 Command::ListBlobs(cmd) => {
178 let hashes: Vec<Hash> = self.data.keys().cloned().collect();
179 self.tasks.spawn(async move {
180 for hash in hashes {
181 cmd.tx.send(Ok(hash)).await.ok();
182 }
183 });
184 }
185 Command::BlobStatus(cmd) => {
186 let hash = cmd.inner.hash;
187 let entry = self.data.get(&hash);
188 let status = if let Some(entry) = entry {
189 BlobStatus::Complete {
190 size: entry.data.len() as u64,
191 }
192 } else {
193 BlobStatus::NotFound
194 };
195 cmd.tx.send(status).await.ok();
196 }
197 Command::ListTags(cmd) => {
198 cmd.tx.send(Vec::new()).await.ok();
199 }
200 Command::SetTag(cmd) => {
201 cmd.tx
202 .send(Err(io::Error::other("set tag not supported").into()))
203 .await
204 .ok();
205 }
206 Command::ListTempTags(cmd) => {
207 cmd.tx.send(Vec::new()).await.ok();
208 }
209 Command::SyncDb(cmd) => {
210 cmd.tx.send(Ok(())).await.ok();
211 }
212 Command::Shutdown(cmd) => {
213 return Some(cmd.tx);
214 }
215 Command::ExportRanges(cmd) => {
216 let entry = self.data.get(&cmd.inner.hash).cloned();
217 self.tasks.spawn(export_ranges(cmd, entry));
218 }
219 }
220 None
221 }
222
223 fn log_unit_task(&self, res: Result<(), JoinError>) {
224 if let Err(e) = res {
225 tracing::error!("task failed: {e}");
226 }
227 }
228
229 async fn run(mut self) {
230 loop {
231 tokio::select! {
232 Some(cmd) = self.commands.recv() => {
233 if let Some(shutdown) = self.handle_command(cmd).await {
234 shutdown.send(()).await.ok();
235 break;
236 }
237 },
238 Some(res) = self.tasks.join_next(), if !self.tasks.is_empty() => {
239 self.log_unit_task(res);
240 if self.tasks.is_empty() {
241 for tx in self.idle_waiters.drain(..) {
243 tx.send(()).await.ok();
244 }
245 }
246 },
247 else => break,
248 }
249 }
250 }
251}
252
253async fn export_bao(
254 hash: Hash,
255 entry: Option<CompleteStorage>,
256 ranges: ChunkRanges,
257 mut sender: mpsc::Sender<EncodedItem>,
258) {
259 let entry = match entry {
260 Some(entry) => entry,
261 None => {
262 sender
263 .send(EncodedItem::Error(bao_tree::io::EncodeError::Io(
264 io::Error::new(
265 io::ErrorKind::UnexpectedEof,
266 "export task ended unexpectedly",
267 ),
268 )))
269 .await
270 .ok();
271 return;
272 }
273 };
274 let data = entry.data;
275 let outboard = entry.outboard;
276 let size = data.as_ref().len() as u64;
277 let tree = BaoTree::new(size, IROH_BLOCK_SIZE);
278 let outboard = PreOrderMemOutboard {
279 root: hash.into(),
280 tree,
281 data: outboard,
282 };
283 let sender = BaoTreeSender::ref_cast_mut(&mut sender);
284 traverse_ranges_validated(data.as_ref(), outboard, &ranges, sender)
285 .await
286 .ok();
287}
288
289async fn export_ranges(mut cmd: ExportRangesMsg, entry: Option<CompleteStorage>) {
290 let Some(entry) = entry else {
291 cmd.tx
292 .send(ExportRangesItem::Error(api::Error::io(
293 io::ErrorKind::NotFound,
294 "hash not found",
295 )))
296 .await
297 .ok();
298 return;
299 };
300 if let Err(cause) = export_ranges_impl(cmd.inner, &mut cmd.tx, entry).await {
301 cmd.tx
302 .send(ExportRangesItem::Error(cause.into()))
303 .await
304 .ok();
305 }
306}
307
308async fn export_ranges_impl(
309 cmd: ExportRangesRequest,
310 tx: &mut mpsc::Sender<ExportRangesItem>,
311 entry: CompleteStorage,
312) -> io::Result<()> {
313 let ExportRangesRequest { ranges, .. } = cmd;
314 let data = entry.data;
315 let size = data.len() as u64;
316 let bitfield = Bitfield::complete(size);
317 for range in ranges.iter() {
318 let range = match range {
319 RangeSetRange::Range(range) => size.min(*range.start)..size.min(*range.end),
320 RangeSetRange::RangeFrom(range) => size.min(*range.start)..size,
321 };
322 let requested = ChunkRanges::bytes(range.start..range.end);
323 if !bitfield.ranges.is_superset(&requested) {
324 return Err(io::Error::other(format!(
325 "missing range: {requested:?}, present: {bitfield:?}",
326 )));
327 }
328 let bs = 1024;
329 let mut offset = range.start;
330 loop {
331 let end: u64 = (offset + bs).min(range.end);
332 let size = (end - offset) as usize;
333 tx.send(
334 Leaf {
335 offset,
336 data: data.read_bytes_at(offset, size)?,
337 }
338 .into(),
339 )
340 .await?;
341 offset = end;
342 if offset >= range.end {
343 break;
344 }
345 }
346 }
347 Ok(())
348}
349
350impl ReadonlyMemStore {
351 pub fn new(items: impl IntoIterator<Item = impl AsRef<[u8]>>) -> Self {
352 let mut entries = HashMap::new();
353 for item in items {
354 let data = Bytes::copy_from_slice(item.as_ref());
355 let (hash, entry) = CompleteStorage::create(data);
356 entries.insert(hash, entry);
357 }
358 let (sender, receiver) = tokio::sync::mpsc::channel(1);
359 let actor = Actor::new(receiver, entries);
360 tokio::spawn(actor.run());
361 let local = irpc::LocalSender::from(sender);
362 Self {
363 client: local.into(),
364 }
365 }
366}
367
368async fn export_path(
369 entry: Option<CompleteStorage>,
370 target: PathBuf,
371 mut tx: mpsc::Sender<ExportProgressItem>,
372) {
373 let Some(entry) = entry else {
374 tx.send(api::Error::io(io::ErrorKind::NotFound, "hash not found").into())
375 .await
376 .ok();
377 return;
378 };
379 match export_path_impl(entry, target, &mut tx).await {
380 Ok(()) => tx.send(ExportProgressItem::Done).await.ok(),
381 Err(cause) => tx.send(api::Error::from(cause).into()).await.ok(),
382 };
383}
384
385async fn export_path_impl(
386 entry: CompleteStorage,
387 target: PathBuf,
388 tx: &mut mpsc::Sender<ExportProgressItem>,
389) -> io::Result<()> {
390 let data = entry.data;
391 let mut file = std::fs::File::create(&target)?;
393 let size = data.len() as u64;
394 tx.send(ExportProgressItem::Size(size)).await?;
395 let mut buf = [0u8; 1024 * 64];
396 for offset in (0..size).step_by(1024 * 64) {
397 let len = std::cmp::min(size - offset, 1024 * 64) as usize;
398 let buf = &mut buf[..len];
399 data.as_ref().read_exact_at(offset, buf)?;
400 file.write_all(buf)?;
401 tx.try_send(ExportProgressItem::CopyProgress(offset))
402 .await
403 .map_err(|_e| io::Error::other("error"))?;
404 yield_now().await;
405 }
406 Ok(())
407}