1use crate::error::Error;
3use crate::repo::paths::{filestem_to_pin_cid, pin_path};
4use crate::repo::{DataStore, PinKind, PinMode, PinModeRequirement, PinStore, References};
5use async_trait::async_trait;
6use core::convert::TryFrom;
7use futures::stream::{BoxStream, TryStreamExt};
8use futures::StreamExt;
9use ipld_core::cid::Cid;
10use std::collections::{HashMap, HashSet};
11use std::path::{Path, PathBuf};
12use std::sync::Arc;
13use tokio::fs;
14use tokio::sync::{RwLock, Semaphore};
15use tokio_stream::{empty, wrappers::ReadDirStream};
16use tokio_util::either::Either;
17
18#[derive(Debug)]
24pub struct FsDataStore {
25 path: PathBuf,
28
29 lock: Arc<Semaphore>,
35
36 ds_guard: Arc<RwLock<()>>,
37}
38
39impl FsDataStore {
40 pub fn new(root: PathBuf) -> Self {
41 FsDataStore {
42 path: root,
43 ds_guard: Arc::default(),
44 lock: Arc::new(Semaphore::new(1)),
45 }
46 }
47
48 fn key(&self, key: &[u8]) -> Option<(String, String)> {
51 let key = String::from_utf8_lossy(key);
52 let mut key_segments = key.split('/').collect::<Vec<_>>();
53
54 let key_val = key_segments
55 .pop()
56 .map(PathBuf::from)
57 .map(|path| path.with_extension("data"))
58 .map(|path| path.to_string_lossy().to_string())?;
59
60 let key_path_raw = key_segments.join("/");
61
62 let key_path = match key_path_raw.starts_with('/') {
63 true => key_path_raw[1..].to_string(),
64 false => key_path_raw,
65 };
66
67 Some((key_path, key_val))
68 }
69
70 async fn write(&self, key: &[u8], val: &[u8]) -> std::io::Result<()> {
71 let data_path = self.path.join("data");
72 if !data_path.is_dir() {
73 tokio::fs::create_dir_all(&data_path).await?;
74 }
75
76 let (path, key) = self
77 .key(key)
78 .ok_or::<std::io::Error>(std::io::ErrorKind::NotFound.into())?;
79
80 let path = data_path.join(path);
81
82 if !path.is_dir() {
83 tokio::fs::create_dir_all(&path).await?;
84 }
85
86 let path = path.join(key);
87
88 if path.is_dir() {
89 return Err(std::io::ErrorKind::Other.into());
92 }
93
94 tokio::fs::write(path, val).await
95 }
96
97 fn _contains(&self, key: &[u8]) -> bool {
98 let data_path = self.path.join("data");
99 let Some((path, key)) = self.key(key) else {
100 return false;
101 };
102 let path = data_path.join(path);
103 let path = path.join(key);
104 path.is_file()
105 }
106
107 async fn delete(&self, key: &[u8]) -> std::io::Result<()> {
108 let data_path = self.path.join("data");
109 let (path, key) = self
110 .key(key)
111 .ok_or::<std::io::Error>(std::io::ErrorKind::NotFound.into())?;
112 let path = data_path.join(path);
113 let path = path.join(key);
114 tokio::fs::remove_file(path).await
115 }
116
117 async fn read(&self, key: &[u8]) -> std::io::Result<Option<Vec<u8>>> {
118 let data_path = self.path.join("data");
119 let (path, key) = self
120 .key(key)
121 .ok_or::<std::io::Error>(std::io::ErrorKind::NotFound.into())?;
122 let path = data_path.join(path);
123 let path = path.join(key);
124 if path.is_dir() {
125 return Ok(None);
126 }
127 tokio::fs::read(path).await.map(Some)
128 }
129}
130
131fn build_kv<R: AsRef<Path>, P: AsRef<Path>>(
132 data_path: R,
133 path: P,
134) -> BoxStream<'static, (Vec<u8>, Vec<u8>)> {
135 let data_path = data_path.as_ref().to_path_buf();
136 let path = path.as_ref().to_path_buf();
137 let st = async_stream::stream! {
138 if path.is_file() {
139 return;
140 }
141 let Ok(dir) = tokio::fs::read_dir(path).await else {
142 return;
143 };
144
145 let st =
146 ReadDirStream::new(dir).filter_map(|result| futures::future::ready(result.ok()));
147
148 for await entry in st {
149 let path = entry.path();
150 if path.is_dir() {
151 for await item in build_kv(&data_path, &path) {
152 yield item;
153 }
154 } else {
155 let root_str = data_path.to_string_lossy().to_string();
156 let path_str = path.to_string_lossy().to_string();
157 let raw_key = &path_str[root_str.len()..];
158 if raw_key.is_empty() {
159 continue;
160 }
161
162 let Some(key) = raw_key.get(0..raw_key.len() - 5) else {
163 continue;
164 };
165
166 if let Ok(bytes) = tokio::fs::read(path).await {
167 let key = key.as_bytes().to_vec();
168 yield (key, bytes)
169 }
170 }
171 }
172 };
173
174 st.boxed()
175}
176
177#[async_trait]
180impl DataStore for FsDataStore {
181 async fn init(&self) -> Result<(), Error> {
182 tokio::fs::create_dir_all(&self.path.join("pins")).await?;
184 tokio::fs::create_dir_all(&self.path.join("data")).await?;
185 Ok(())
186 }
187
188 async fn contains(&self, key: &[u8]) -> Result<bool, Error> {
189 let _g = self.ds_guard.read().await;
190 Ok(self._contains(key))
191 }
192
193 async fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Error> {
194 let _g = self.ds_guard.read().await;
195 self.read(key).await.map_err(Error::from)
196 }
197
198 async fn put(&self, key: &[u8], value: &[u8]) -> Result<(), Error> {
199 let _g = self.ds_guard.write().await;
200 self.write(key, value).await.map_err(Error::from)
201 }
202
203 async fn remove(&self, key: &[u8]) -> Result<(), Error> {
204 let _g = self.ds_guard.write().await;
205 self.delete(key).await.map_err(Error::from)
206 }
207
208 async fn iter(&self) -> BoxStream<'static, (Vec<u8>, Vec<u8>)> {
209 let data_path = self.path.join("data");
210 build_kv(&data_path, &data_path)
211 }
212}
213
214#[async_trait]
217impl PinStore for FsDataStore {
218 async fn is_pinned(&self, cid: &Cid) -> Result<bool, Error> {
219 let path = pin_path(self.path.join("pins"), cid);
220
221 if read_direct_or_recursive(path).await?.is_some() {
222 return Ok(true);
223 }
224
225 let st = self.list_pinfiles().await.try_filter_map(|(cid, mode)| {
226 futures::future::ready(if mode == PinMode::Recursive {
227 Ok(Some(cid))
228 } else {
229 Ok(None)
230 })
231 });
232
233 futures::pin_mut!(st);
234
235 while let Some(recursive) = TryStreamExt::try_next(&mut st).await? {
236 let (_, references) =
239 read_recursively_pinned(self.path.join("pins"), recursive).await?;
240
241 if references.into_iter().any(move |x| x == *cid) {
243 return Ok(true);
244 }
245 }
246
247 Ok(false)
248 }
249
250 async fn insert_direct_pin(&self, target: &Cid) -> Result<(), Error> {
251 let permit = Semaphore::acquire_owned(Arc::clone(&self.lock)).await?;
252
253 let mut path = pin_path(self.path.join("pins"), target);
254
255 let span = tracing::Span::current();
256
257 tokio::task::spawn_blocking(move || {
258 let _permit = permit;
260 let _entered = span.enter();
261
262 std::fs::create_dir_all(path.parent().expect("shard parent has to exist"))?;
263 path.set_extension("recursive");
264 if path.is_file() {
265 return Err(anyhow::anyhow!("already pinned recursively"));
266 }
267
268 path.set_extension("direct");
269 let f = std::fs::File::create(path)?;
270 f.sync_all()?;
271 Ok(())
272 })
273 .await??;
274
275 Ok(())
276 }
277
278 async fn insert_recursive_pin(
279 &self,
280 target: &Cid,
281 referenced: References<'_>,
282 ) -> Result<(), Error> {
283 let set = referenced
284 .try_collect::<std::collections::BTreeSet<_>>()
285 .await?;
286
287 let permit = Semaphore::acquire_owned(Arc::clone(&self.lock)).await?;
288
289 let mut path = pin_path(self.path.join("pins"), target);
290
291 let span = tracing::Span::current();
292
293 tokio::task::spawn_blocking(move || {
294 let _permit = permit; let _entered = span.enter();
296
297 std::fs::create_dir_all(path.parent().expect("shard parent has to exist"))?;
298 let count = set.len();
299 let cids = set.into_iter().map(|cid| cid.to_string());
300
301 path.set_extension("recursive_temp");
302
303 let file = std::fs::File::create(&path)?;
304
305 match sync_write_recursive_pin(file, count, cids) {
306 Ok(_) => {
307 let final_path = path.with_extension("recursive");
308 std::fs::rename(&path, final_path)?
309 }
310 Err(e) => {
311 let removed = std::fs::remove_file(&path);
312
313 match removed {
314 Ok(_) => debug!("cleaned up ok after botched recursive pin write"),
315 Err(e) => warn!("failed to cleanup temporary file: {}", e),
316 }
317
318 return Err(e);
319 }
320 }
321
322 path.set_extension("direct");
326
327 match std::fs::remove_file(&path) {
328 Ok(_) => { }
329 Err(e) if e.kind() == std::io::ErrorKind::NotFound => { }
330 Err(e) => {
331 warn!(
332 "failed to remove direct pin when adding recursive {:?}: {}",
333 path, e
334 );
335 }
336 }
337
338 Ok::<_, Error>(())
339 })
340 .await??;
341
342 Ok(())
343 }
344
345 async fn remove_direct_pin(&self, target: &Cid) -> Result<(), Error> {
346 let permit = Semaphore::acquire_owned(Arc::clone(&self.lock)).await?;
347
348 let mut path = pin_path(self.path.join("pins"), target);
349
350 let span = tracing::Span::current();
351
352 tokio::task::spawn_blocking(move || {
353 let _permit = permit; let _entered = span.enter();
355
356 path.set_extension("recursive");
357
358 if path.is_file() {
359 return Err(anyhow::anyhow!("is pinned recursively"));
360 }
361
362 path.set_extension("direct");
363
364 match std::fs::remove_file(&path) {
365 Ok(_) => {
366 trace!("direct pin removed");
367 Ok(())
368 }
369 Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
370 Err(anyhow::anyhow!("not pinned or pinned indirectly"))
371 }
372 Err(e) => Err(e.into()),
373 }
374 })
375 .await??;
376
377 Ok(())
378 }
379
380 async fn remove_recursive_pin(&self, target: &Cid, _: References<'_>) -> Result<(), Error> {
381 let permit = Semaphore::acquire_owned(Arc::clone(&self.lock)).await?;
382
383 let mut path = pin_path(self.path.join("pins"), target);
384
385 let span = tracing::Span::current();
386
387 tokio::task::spawn_blocking(move || {
388 let _permit = permit; let _entered = span.enter();
390
391 path.set_extension("direct");
392
393 let mut any = false;
394
395 match std::fs::remove_file(&path) {
396 Ok(_) => {
397 trace!("direct pin removed");
398 any |= true;
399 }
400 Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
401 }
404 Err(e) => return Err(Error::new(e)),
406 }
407
408 path.set_extension("recursive");
409
410 match std::fs::remove_file(&path) {
411 Ok(_) => {
412 trace!("recursive pin removed");
413 any |= true;
414 }
415 Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
416 }
419 Err(e) => return Err(e.into()),
420 }
421
422 if !any {
423 Err(anyhow::anyhow!("not pinned or pinned indirectly"))
424 } else {
425 Ok(())
426 }
427 })
428 .await??;
429
430 Ok(())
431 }
432
433 async fn list(
434 &self,
435 requirement: Option<PinMode>,
436 ) -> BoxStream<'static, Result<(Cid, PinMode), Error>> {
437 let cids = self.list_pinfiles().await;
439
440 let path = self.path.join("pins");
441
442 let requirement = PinModeRequirement::from(requirement);
443
444 let st = async_stream::try_stream! {
452
453 let mut returned: HashSet<Cid> = HashSet::default();
455
456 let mut recursive: HashSet<Cid> = HashSet::default();
458 let mut direct: HashSet<Cid> = HashSet::default();
459
460 let collect_recursive_for_indirect = requirement.is_indirect_or_any();
461
462 futures::pin_mut!(cids);
463
464 while let Some((cid, mode)) = TryStreamExt::try_next(&mut cids).await? {
465
466 let matches = requirement.matches(&mode);
467
468 if mode == PinMode::Recursive {
469 if collect_recursive_for_indirect {
470 recursive.insert(cid);
471 }
472 if matches && returned.insert(cid) {
473 yield (cid, mode);
476 }
477 } else if mode == PinMode::Direct && matches {
478 direct.insert(cid);
479 }
480 }
481
482 trace!(unique = returned.len(), "completed listing recursive");
483
484 for cid in direct {
488 if returned.insert(cid) {
489 yield (cid, PinMode::Direct)
490 }
491 }
492
493 trace!(unique = returned.len(), "completed listing direct");
494
495 if !collect_recursive_for_indirect {
496 return;
498 }
499
500 let mut recursive = futures::stream::iter(recursive.into_iter().map(Ok))
503 .map_ok(move |cid| read_recursively_pinned(path.clone(), cid))
504 .try_buffer_unordered(4);
505
506 while let Some((_, next_batch)) = TryStreamExt::try_next(&mut recursive).await? {
507 for indirect in next_batch {
508 if returned.insert(indirect) {
509 yield (indirect, PinMode::Indirect);
510 }
511 }
512
513 trace!(unique = returned.len(), "completed batch of indirect");
514 }
515 };
516
517 Box::pin(st)
518 }
519
520 async fn query(
521 &self,
522 ids: Vec<Cid>,
523 requirement: Option<PinMode>,
524 ) -> Result<Vec<(Cid, PinKind<Cid>)>, Error> {
525 let mut response = Vec::with_capacity(ids.len());
527 for _ in 0..ids.len() {
528 response.push(None);
529 }
530
531 let mut remaining = HashMap::new();
532
533 let (check_direct, searched_suffix, gather_indirect) = match requirement {
534 Some(PinMode::Direct) => (true, Some(PinMode::Direct), false),
535 Some(PinMode::Recursive) => (true, Some(PinMode::Recursive), false),
536 Some(PinMode::Indirect) => (false, None, true),
537 None => (true, None, true),
538 };
539
540 let searched_suffix = PinModeRequirement::from(searched_suffix);
541
542 let (mut response, mut remaining) = if check_direct {
543 let base = self.path.join("pins");
545 tokio::task::spawn_blocking(move || {
546 for (i, cid) in ids.into_iter().enumerate() {
547 let mut path = pin_path(base.clone(), &cid);
548
549 if let Some(mode) = sync_read_direct_or_recursive(&mut path) {
550 if searched_suffix.matches(&mode) {
551 response[i] = Some((
552 cid,
553 match mode {
554 PinMode::Direct => PinKind::Direct,
555 PinMode::Recursive => PinKind::Recursive(0),
557 _ => unreachable!(),
560 },
561 ));
562 continue;
563 }
564 }
565
566 if !gather_indirect {
567 return Err(anyhow::anyhow!("{} is not pinned", cid));
570 }
571
572 remaining.entry(cid).or_insert(i);
574 }
575
576 Ok((response, remaining))
577 })
578 .await??
579 } else {
580 for (i, cid) in ids.into_iter().enumerate() {
581 remaining.entry(cid).or_insert(i);
582 }
583 (response, remaining)
584 };
585
586 if !remaining.is_empty() {
590 assert!(gather_indirect);
591
592 trace!(
593 remaining = remaining.len(),
594 "query trying to find remaining indirect pins"
595 );
596
597 let recursives = self
598 .list_pinfiles()
599 .await
600 .try_filter_map(|(cid, mode)| {
601 futures::future::ready(if mode == PinMode::Recursive {
602 Ok(Some(cid))
603 } else {
604 Ok(None)
605 })
606 })
607 .map_ok(|cid| read_recursively_pinned(self.path.join("pins"), cid))
608 .try_buffer_unordered(4);
609
610 futures::pin_mut!(recursives);
611
612 'out: while let Some((referring, references)) =
613 TryStreamExt::try_next(&mut recursives).await?
614 {
615 for cid in references {
617 if let Some(index) = remaining.remove(&cid) {
618 response[index] = Some((cid, PinKind::IndirectFrom(referring)));
619
620 if remaining.is_empty() {
621 break 'out;
622 }
623 }
624 }
625 }
626 }
627
628 if let Some((cid, _)) = remaining.into_iter().next() {
629 return Err(anyhow::anyhow!("{} is not pinned", cid));
631 }
632
633 Ok(response.into_iter().flatten().collect())
636 }
637}
638
639impl FsDataStore {
640 async fn list_pinfiles(
641 &self,
642 ) -> impl futures::stream::Stream<Item = Result<(Cid, PinMode), Error>> + 'static {
643 let stream = match tokio::fs::read_dir(self.path.join("pins")).await {
644 Ok(st) => Either::Left(ReadDirStream::new(st)),
645 Err(e) => Either::Right(futures::stream::once(futures::future::ready(Err(e)))),
647 };
648
649 stream
650 .and_then(|d| async move {
651 Ok(if d.file_type().await?.is_dir() {
653 Either::Left(ReadDirStream::new(fs::read_dir(d.path()).await?))
654 } else {
655 Either::Right(empty())
656 })
657 })
658 .try_flatten()
660 .map_err(Error::new)
661 .try_filter_map(|d| {
663 let name = d.file_name();
664 let path: &std::path::Path = name.as_ref();
665
666 let mode = if path.extension() == Some("recursive".as_ref()) {
667 Some(PinMode::Recursive)
668 } else if path.extension() == Some("direct".as_ref()) {
669 Some(PinMode::Direct)
670 } else {
671 None
672 };
673
674 let maybe_tuple = mode.and_then(move |mode| {
675 filestem_to_pin_cid(path.file_stem()).map(move |cid| (cid, mode))
676 });
677
678 futures::future::ready(Ok(maybe_tuple))
679 })
680 }
681}
682
683async fn read_recursively_pinned(path: PathBuf, cid: Cid) -> Result<(Cid, Vec<Cid>), Error> {
689 let mut path = pin_path(path, &cid);
691 path.set_extension("recursive");
692 let contents = match tokio::fs::read(path).await {
693 Ok(vec) => vec,
694 Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
695 return Ok((cid, Vec::new()));
698 }
699 Err(e) => return Err(e.into()),
700 };
701
702 let cids: Vec<&str> = serde_json::from_slice(&contents)?;
703
704 let found = cids
707 .into_iter()
708 .map(Cid::try_from)
709 .collect::<Result<Vec<Cid>, _>>()?;
710
711 trace!(cid = %cid, count = found.len(), "read indirect pins");
712 Ok((cid, found))
713}
714
715async fn read_direct_or_recursive(mut block_path: PathBuf) -> Result<Option<PinMode>, Error> {
716 tokio::task::spawn_blocking(move || Ok(sync_read_direct_or_recursive(&mut block_path))).await?
717}
718
719fn sync_read_direct_or_recursive(block_path: &mut PathBuf) -> Option<PinMode> {
720 for (ext, mode) in &[
722 ("recursive", PinMode::Recursive),
723 ("direct", PinMode::Direct),
724 ] {
725 block_path.set_extension(ext);
726 if block_path.is_file() {
729 return Some(*mode);
730 }
731 }
732 None
733}
734
735fn sync_write_recursive_pin(
736 file: std::fs::File,
737 count: usize,
738 cids: impl Iterator<Item = String>,
739) -> Result<(), Error> {
740 use serde::{ser::SerializeSeq, Serializer};
741 use std::io::{BufWriter, Write};
742 let writer = BufWriter::new(file);
743
744 let mut serializer = serde_json::ser::Serializer::new(writer);
745
746 let mut seq = serializer.serialize_seq(Some(count))?;
747 for cid in cids {
748 seq.serialize_element(&cid)?;
749 }
750 seq.end()?;
751
752 let mut writer = serializer.into_inner();
753 writer.flush()?;
754
755 let file = writer.into_inner()?;
756 file.sync_all()?;
757 Ok(())
758}
759
760#[cfg(test)]
761crate::pinstore_interface_tests!(
762 common_tests,
763 crate::repo::datastore::flatfs::FsDataStore::new
764);
765
766#[cfg(test)]
767mod test {
768 use crate::repo::{datastore::flatfs::FsDataStore, DataStore};
769
770 #[tokio::test]
771 async fn test_kv_datastore() -> anyhow::Result<()> {
772 let tmp = std::env::temp_dir();
773 let store = FsDataStore::new(tmp.clone());
774 let key = [1, 2, 3, 4];
775 let value = [5, 6, 7, 8];
776
777 store.init().await?;
778
779 let contains = store.contains(&key).await.unwrap();
780 assert!(!contains);
781 let get = store.get(&key).await.unwrap_or_default();
782 assert_eq!(get, None);
783 assert!(store.remove(&key).await.is_err());
784
785 store.put(&key, &value).await.unwrap();
786 let contains = store.contains(&key).await.unwrap();
787 assert!(contains);
788 let get = store.get(&key).await.unwrap();
789 assert_eq!(get, Some(value.to_vec()));
790
791 store.remove(&key).await.unwrap();
792 let contains = store.contains(&key).await.unwrap();
793 assert!(!contains);
794 let get = store.get(&key).await.unwrap_or_default();
795 assert_eq!(get, None);
796 drop(store);
797 Ok(())
798 }
799}