1use std::{
5 collections::{BTreeMap, HashMap},
6 io,
7 path::PathBuf,
8 sync::Arc,
9};
10
11use crate::{
12 store::{
13 EntryStatus, ExportMode, ImportMode, ImportProgress, Map, MapEntry, MapEntryMut,
14 ReadableStore,
15 },
16 util::{
17 progress::{BoxedProgressSender, IdGenerator, ProgressSender},
18 Tag,
19 },
20 BlobFormat, Hash, HashAndFormat, TempTag, IROH_BLOCK_SIZE,
21};
22use bao_tree::{
23 blake3,
24 io::{outboard::PreOrderMemOutboard, sync::Outboard},
25};
26use bytes::Bytes;
27use futures_lite::Stream;
28use iroh_io::AsyncSliceReader;
29use tokio::io::AsyncWriteExt;
30
31use super::{BaoBatchWriter, BaoBlobSize, ConsistencyCheckProgress, DbIter, ExportProgressCb};
32
33#[derive(Debug, Clone, Default)]
40pub struct Store(Arc<HashMap<Hash, (PreOrderMemOutboard<Bytes>, Bytes)>>);
41
42impl<K, V> FromIterator<(K, V)> for Store
43where
44 K: Into<String>,
45 V: AsRef<[u8]>,
46{
47 fn from_iter<T: IntoIterator<Item = (K, V)>>(iter: T) -> Self {
48 let (db, _m) = Self::new(iter);
49 db
50 }
51}
52
53impl Store {
54 pub fn new(
59 entries: impl IntoIterator<Item = (impl Into<String>, impl AsRef<[u8]>)>,
60 ) -> (Self, BTreeMap<String, blake3::Hash>) {
61 let mut names = BTreeMap::new();
62 let mut res = HashMap::new();
63 for (name, data) in entries.into_iter() {
64 let name = name.into();
65 let data: &[u8] = data.as_ref();
66 let outboard = PreOrderMemOutboard::create(data, IROH_BLOCK_SIZE).map_data(Bytes::from);
68 let hash = outboard.root();
69 names.insert(name, hash);
71 let data = Bytes::from(data.to_vec());
72 let hash = Hash::from(hash);
73 res.insert(hash, (outboard, data));
74 }
75 (Self(Arc::new(res)), names)
76 }
77
78 pub fn insert(&mut self, data: impl AsRef<[u8]>) -> Hash {
82 let inner = Arc::make_mut(&mut self.0);
83 let data: &[u8] = data.as_ref();
84 let outboard = PreOrderMemOutboard::create(data, IROH_BLOCK_SIZE).map_data(Bytes::from);
86 let hash = outboard.root();
87 let data = Bytes::from(data.to_vec());
88 let hash = Hash::from(hash);
89 inner.insert(hash, (outboard, data));
90 hash
91 }
92
93 pub fn insert_many(
95 &mut self,
96 items: impl IntoIterator<Item = impl AsRef<[u8]>>,
97 ) -> Option<Hash> {
98 let mut hash = None;
99 for item in items.into_iter() {
100 hash = Some(self.insert(item));
101 }
102 hash
103 }
104
105 pub fn get_content(&self, hash: &Hash) -> Option<Bytes> {
107 let entry = self.0.get(hash)?;
108 Some(entry.1.clone())
109 }
110
111 async fn export_impl(
112 &self,
113 hash: Hash,
114 target: PathBuf,
115 _mode: ExportMode,
116 progress: impl Fn(u64) -> io::Result<()> + Send + Sync + 'static,
117 ) -> io::Result<()> {
118 tracing::trace!("exporting {} to {}", hash, target.display());
119
120 if !target.is_absolute() {
121 return Err(io::Error::new(
122 io::ErrorKind::InvalidInput,
123 "target path must be absolute",
124 ));
125 }
126 let parent = target.parent().ok_or_else(|| {
127 io::Error::new(
128 io::ErrorKind::InvalidInput,
129 "target path has no parent directory",
130 )
131 })?;
132 tokio::fs::create_dir_all(parent).await?;
134 let data = self
135 .get_content(&hash)
136 .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "hash not found"))?;
137
138 let mut offset = 0u64;
139 let mut file = tokio::fs::File::create(&target).await?;
140 for chunk in data.chunks(1024 * 1024) {
141 progress(offset)?;
142 file.write_all(chunk).await?;
143 offset += chunk.len() as u64;
144 }
145 file.sync_all().await?;
146 drop(file);
147 Ok(())
148 }
149}
150
151#[derive(Debug, Clone)]
153pub struct Entry {
154 outboard: PreOrderMemOutboard<Bytes>,
155 data: Bytes,
156}
157
158impl MapEntry for Entry {
159 fn hash(&self) -> Hash {
160 self.outboard.root().into()
161 }
162
163 fn size(&self) -> BaoBlobSize {
164 BaoBlobSize::Verified(self.data.len() as u64)
165 }
166
167 async fn outboard(&self) -> io::Result<impl bao_tree::io::fsm::Outboard> {
168 Ok(self.outboard.clone())
169 }
170
171 async fn data_reader(&self) -> io::Result<impl AsyncSliceReader> {
172 Ok(self.data.clone())
173 }
174
175 fn is_complete(&self) -> bool {
176 true
177 }
178}
179
180impl Map for Store {
181 type Entry = Entry;
182
183 async fn get(&self, hash: &Hash) -> io::Result<Option<Self::Entry>> {
184 Ok(self.0.get(hash).map(|(o, d)| Entry {
185 outboard: o.clone(),
186 data: d.clone(),
187 }))
188 }
189}
190
191impl super::MapMut for Store {
192 type EntryMut = Entry;
193
194 async fn get_mut(&self, hash: &Hash) -> io::Result<Option<Self::EntryMut>> {
195 self.get(hash).await
196 }
197
198 async fn get_or_create(&self, _hash: Hash, _size: u64) -> io::Result<Entry> {
199 Err(io::Error::new(
200 io::ErrorKind::Other,
201 "cannot create temp entry in readonly database",
202 ))
203 }
204
205 fn entry_status_sync(&self, hash: &Hash) -> io::Result<EntryStatus> {
206 Ok(match self.0.contains_key(hash) {
207 true => EntryStatus::Complete,
208 false => EntryStatus::NotFound,
209 })
210 }
211
212 async fn entry_status(&self, hash: &Hash) -> io::Result<EntryStatus> {
213 self.entry_status_sync(hash)
214 }
215
216 async fn insert_complete(&self, _entry: Entry) -> io::Result<()> {
217 unreachable!()
219 }
220}
221
222impl ReadableStore for Store {
223 async fn blobs(&self) -> io::Result<DbIter<Hash>> {
224 Ok(Box::new(
225 self.0
226 .keys()
227 .copied()
228 .map(Ok)
229 .collect::<Vec<_>>()
230 .into_iter(),
231 ))
232 }
233
234 async fn tags(&self) -> io::Result<DbIter<(Tag, HashAndFormat)>> {
235 Ok(Box::new(std::iter::empty()))
236 }
237
238 fn temp_tags(&self) -> Box<dyn Iterator<Item = HashAndFormat> + Send + Sync + 'static> {
239 Box::new(std::iter::empty())
240 }
241
242 async fn consistency_check(
243 &self,
244 _repair: bool,
245 _tx: BoxedProgressSender<ConsistencyCheckProgress>,
246 ) -> io::Result<()> {
247 Ok(())
248 }
249
250 async fn export(
251 &self,
252 hash: Hash,
253 target: PathBuf,
254 mode: ExportMode,
255 progress: ExportProgressCb,
256 ) -> io::Result<()> {
257 self.export_impl(hash, target, mode, progress).await
258 }
259
260 async fn partial_blobs(&self) -> io::Result<DbIter<Hash>> {
261 Ok(Box::new(std::iter::empty()))
262 }
263}
264
265impl MapEntryMut for Entry {
266 async fn batch_writer(&self) -> io::Result<impl BaoBatchWriter> {
267 enum Bar {}
268 impl BaoBatchWriter for Bar {
269 async fn write_batch(
270 &mut self,
271 _size: u64,
272 _batch: Vec<bao_tree::io::fsm::BaoContentItem>,
273 ) -> io::Result<()> {
274 unreachable!()
275 }
276
277 async fn sync(&mut self) -> io::Result<()> {
278 unreachable!()
279 }
280 }
281
282 #[allow(unreachable_code)]
283 Ok(unreachable!() as Bar)
284 }
285}
286
287impl super::Store for Store {
288 async fn import_file(
289 &self,
290 data: PathBuf,
291 mode: ImportMode,
292 format: BlobFormat,
293 progress: impl ProgressSender<Msg = ImportProgress> + IdGenerator,
294 ) -> io::Result<(TempTag, u64)> {
295 let _ = (data, mode, progress, format);
296 Err(io::Error::new(io::ErrorKind::Other, "not implemented"))
297 }
298
299 async fn import_bytes(&self, bytes: Bytes, format: BlobFormat) -> io::Result<TempTag> {
301 let _ = (bytes, format);
302 Err(io::Error::new(io::ErrorKind::Other, "not implemented"))
303 }
304
305 async fn import_stream(
306 &self,
307 data: impl Stream<Item = io::Result<Bytes>> + Unpin + Send,
308 format: BlobFormat,
309 progress: impl ProgressSender<Msg = ImportProgress> + IdGenerator,
310 ) -> io::Result<(TempTag, u64)> {
311 let _ = (data, format, progress);
312 Err(io::Error::new(io::ErrorKind::Other, "not implemented"))
313 }
314
315 async fn set_tag(&self, _name: Tag, _hash: Option<HashAndFormat>) -> io::Result<()> {
316 Err(io::Error::new(io::ErrorKind::Other, "not implemented"))
317 }
318
319 async fn create_tag(&self, _hash: HashAndFormat) -> io::Result<Tag> {
320 Err(io::Error::new(io::ErrorKind::Other, "not implemented"))
321 }
322
323 fn temp_tag(&self, inner: HashAndFormat) -> TempTag {
324 TempTag::new(inner, None)
325 }
326
327 async fn gc_start(&self) -> io::Result<()> {
328 Ok(())
329 }
330
331 async fn delete(&self, _hashes: Vec<Hash>) -> io::Result<()> {
332 Err(io::Error::new(io::ErrorKind::Other, "not implemented"))
333 }
334
335 async fn shutdown(&self) {}
336}