rust_ipfs/repo/datastore/
flatfs.rs

1//! Persistent filesystem backed pin store. See [`FsDataStore`] for more information.
2use crate::error::Error;
3use crate::repo::paths::{filestem_to_pin_cid, pin_path};
4use crate::repo::{DataStore, PinKind, PinMode, PinModeRequirement, PinStore, References};
5use async_trait::async_trait;
6use core::convert::TryFrom;
7use futures::stream::{BoxStream, TryStreamExt};
8use futures::StreamExt;
9use ipld_core::cid::Cid;
10use std::collections::{HashMap, HashSet};
11use std::path::{Path, PathBuf};
12use std::sync::Arc;
13use tokio::fs;
14use tokio::sync::{RwLock, Semaphore};
15use tokio_stream::{empty, wrappers::ReadDirStream};
16use tokio_util::either::Either;
17
18/// FsDataStore which uses the filesystem as a lockable key-value store. Maintains a similar to
19/// [`FsBlockStore`] sharded two level storage. Direct have empty files, recursive pins record all of
20/// their indirect descendants. Pin files are separated by their file extensions.
21///
22/// When modifying, single lock is used.
23#[derive(Debug)]
24pub struct FsDataStore {
25    /// The base directory under which we have a sharded directory structure, and the individual
26    /// blocks are stored under the shard. See unixfs/examples/cat.rs for read example.
27    path: PathBuf,
28
29    /// Start with simple, conservative solution, allows concurrent queries but single writer.
30    /// It is assumed the reads do not require permit as non-empty writes are done through
31    /// tempfiles and the consistency regarding reads is not a concern right now. For garbage
32    /// collection implementation, it might be needed to hold this permit for the duration of
33    /// garbage collection, or something similar.
34    lock: Arc<Semaphore>,
35
36    ds_guard: Arc<RwLock<()>>,
37}
38
39impl FsDataStore {
40    pub fn new(root: PathBuf) -> Self {
41        FsDataStore {
42            path: root,
43            ds_guard: Arc::default(),
44            lock: Arc::new(Semaphore::new(1)),
45        }
46    }
47
48    // Instead of having the file be the key itself, we would split the key into segements with all but the last representing as a directory
49    // with the final item being a file.
50    fn key(&self, key: &[u8]) -> Option<(String, String)> {
51        let key = String::from_utf8_lossy(key);
52        let mut key_segments = key.split('/').collect::<Vec<_>>();
53
54        let key_val = key_segments
55            .pop()
56            .map(PathBuf::from)
57            .map(|path| path.with_extension("data"))
58            .map(|path| path.to_string_lossy().to_string())?;
59
60        let key_path_raw = key_segments.join("/");
61
62        let key_path = match key_path_raw.starts_with('/') {
63            true => key_path_raw[1..].to_string(),
64            false => key_path_raw,
65        };
66
67        Some((key_path, key_val))
68    }
69
70    async fn write(&self, key: &[u8], val: &[u8]) -> std::io::Result<()> {
71        let data_path = self.path.join("data");
72        if !data_path.is_dir() {
73            tokio::fs::create_dir_all(&data_path).await?;
74        }
75
76        let (path, key) = self
77            .key(key)
78            .ok_or::<std::io::Error>(std::io::ErrorKind::NotFound.into())?;
79
80        let path = data_path.join(path);
81
82        if !path.is_dir() {
83            tokio::fs::create_dir_all(&path).await?;
84        }
85
86        let path = path.join(key);
87
88        if path.is_dir() {
89            // The only reason why this would be a directory is if the key didnt exist, is invalid, or the item was
90            // actually a directory in which case we return an error here.
91            return Err(std::io::ErrorKind::Other.into());
92        }
93
94        tokio::fs::write(path, val).await
95    }
96
97    fn _contains(&self, key: &[u8]) -> bool {
98        let data_path = self.path.join("data");
99        let Some((path, key)) = self.key(key) else {
100            return false;
101        };
102        let path = data_path.join(path);
103        let path = path.join(key);
104        path.is_file()
105    }
106
107    async fn delete(&self, key: &[u8]) -> std::io::Result<()> {
108        let data_path = self.path.join("data");
109        let (path, key) = self
110            .key(key)
111            .ok_or::<std::io::Error>(std::io::ErrorKind::NotFound.into())?;
112        let path = data_path.join(path);
113        let path = path.join(key);
114        tokio::fs::remove_file(path).await
115    }
116
117    async fn read(&self, key: &[u8]) -> std::io::Result<Option<Vec<u8>>> {
118        let data_path = self.path.join("data");
119        let (path, key) = self
120            .key(key)
121            .ok_or::<std::io::Error>(std::io::ErrorKind::NotFound.into())?;
122        let path = data_path.join(path);
123        let path = path.join(key);
124        if path.is_dir() {
125            return Ok(None);
126        }
127        tokio::fs::read(path).await.map(Some)
128    }
129}
130
131fn build_kv<R: AsRef<Path>, P: AsRef<Path>>(
132    data_path: R,
133    path: P,
134) -> BoxStream<'static, (Vec<u8>, Vec<u8>)> {
135    let data_path = data_path.as_ref().to_path_buf();
136    let path = path.as_ref().to_path_buf();
137    let st = async_stream::stream! {
138        if path.is_file() {
139            return;
140        }
141        let Ok(dir) = tokio::fs::read_dir(path).await else {
142            return;
143        };
144
145        let st =
146            ReadDirStream::new(dir).filter_map(|result| futures::future::ready(result.ok()));
147
148        for await entry in st {
149            let path = entry.path();
150            if path.is_dir() {
151                for await item in build_kv(&data_path, &path) {
152                    yield item;
153                }
154            } else {
155                let root_str = data_path.to_string_lossy().to_string();
156                let path_str = path.to_string_lossy().to_string();
157                let raw_key = &path_str[root_str.len()..];
158                if raw_key.is_empty() {
159                    continue;
160                }
161
162                let Some(key) = raw_key.get(0..raw_key.len() - 5) else {
163                    continue;
164                };
165
166                if let Ok(bytes) = tokio::fs::read(path).await {
167                    let key = key.as_bytes().to_vec();
168                    yield (key, bytes)
169                }
170            }
171        }
172    };
173
174    st.boxed()
175}
176
177/// The column operations are all unimplemented pending at least downscoping of the
178/// DataStore trait itself.
179#[async_trait]
180impl DataStore for FsDataStore {
181    async fn init(&self) -> Result<(), Error> {
182        // Although `pins` directory is created when inserting a data, is it not created when there are any attempts at listing the pins (thus causing to fail)
183        tokio::fs::create_dir_all(&self.path.join("pins")).await?;
184        tokio::fs::create_dir_all(&self.path.join("data")).await?;
185        Ok(())
186    }
187
188    async fn contains(&self, key: &[u8]) -> Result<bool, Error> {
189        let _g = self.ds_guard.read().await;
190        Ok(self._contains(key))
191    }
192
193    async fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Error> {
194        let _g = self.ds_guard.read().await;
195        self.read(key).await.map_err(Error::from)
196    }
197
198    async fn put(&self, key: &[u8], value: &[u8]) -> Result<(), Error> {
199        let _g = self.ds_guard.write().await;
200        self.write(key, value).await.map_err(Error::from)
201    }
202
203    async fn remove(&self, key: &[u8]) -> Result<(), Error> {
204        let _g = self.ds_guard.write().await;
205        self.delete(key).await.map_err(Error::from)
206    }
207
208    async fn iter(&self) -> BoxStream<'static, (Vec<u8>, Vec<u8>)> {
209        let data_path = self.path.join("data");
210        build_kv(&data_path, &data_path)
211    }
212}
213
214// PinStore is a trait from ipfs::repo implemented on FsDataStore defined at ipfs::repo::fs or
215// parent module.
216#[async_trait]
217impl PinStore for FsDataStore {
218    async fn is_pinned(&self, cid: &Cid) -> Result<bool, Error> {
219        let path = pin_path(self.path.join("pins"), cid);
220
221        if read_direct_or_recursive(path).await?.is_some() {
222            return Ok(true);
223        }
224
225        let st = self.list_pinfiles().await.try_filter_map(|(cid, mode)| {
226            futures::future::ready(if mode == PinMode::Recursive {
227                Ok(Some(cid))
228            } else {
229                Ok(None)
230            })
231        });
232
233        futures::pin_mut!(st);
234
235        while let Some(recursive) = TryStreamExt::try_next(&mut st).await? {
236            // TODO: it might be much better to just deserialize the vec one by one and comparing while
237            // going
238            let (_, references) =
239                read_recursively_pinned(self.path.join("pins"), recursive).await?;
240
241            // if we always wrote down the cids in some order we might be able to binary search?
242            if references.into_iter().any(move |x| x == *cid) {
243                return Ok(true);
244            }
245        }
246
247        Ok(false)
248    }
249
250    async fn insert_direct_pin(&self, target: &Cid) -> Result<(), Error> {
251        let permit = Semaphore::acquire_owned(Arc::clone(&self.lock)).await?;
252
253        let mut path = pin_path(self.path.join("pins"), target);
254
255        let span = tracing::Span::current();
256
257        tokio::task::spawn_blocking(move || {
258            // move the permit to the blocking thread to ensure we keep it as long as needed
259            let _permit = permit;
260            let _entered = span.enter();
261
262            std::fs::create_dir_all(path.parent().expect("shard parent has to exist"))?;
263            path.set_extension("recursive");
264            if path.is_file() {
265                return Err(anyhow::anyhow!("already pinned recursively"));
266            }
267
268            path.set_extension("direct");
269            let f = std::fs::File::create(path)?;
270            f.sync_all()?;
271            Ok(())
272        })
273        .await??;
274
275        Ok(())
276    }
277
278    async fn insert_recursive_pin(
279        &self,
280        target: &Cid,
281        referenced: References<'_>,
282    ) -> Result<(), Error> {
283        let set = referenced
284            .try_collect::<std::collections::BTreeSet<_>>()
285            .await?;
286
287        let permit = Semaphore::acquire_owned(Arc::clone(&self.lock)).await?;
288
289        let mut path = pin_path(self.path.join("pins"), target);
290
291        let span = tracing::Span::current();
292
293        tokio::task::spawn_blocking(move || {
294            let _permit = permit; // again move to the threadpool thread
295            let _entered = span.enter();
296
297            std::fs::create_dir_all(path.parent().expect("shard parent has to exist"))?;
298            let count = set.len();
299            let cids = set.into_iter().map(|cid| cid.to_string());
300
301            path.set_extension("recursive_temp");
302
303            let file = std::fs::File::create(&path)?;
304
305            match sync_write_recursive_pin(file, count, cids) {
306                Ok(_) => {
307                    let final_path = path.with_extension("recursive");
308                    std::fs::rename(&path, final_path)?
309                }
310                Err(e) => {
311                    let removed = std::fs::remove_file(&path);
312
313                    match removed {
314                        Ok(_) => debug!("cleaned up ok after botched recursive pin write"),
315                        Err(e) => warn!("failed to cleanup temporary file: {}", e),
316                    }
317
318                    return Err(e);
319                }
320            }
321
322            // if we got this far, we have now written and renamed the recursive_temp into place.
323            // now we just need to remove the direct pin, if it exists
324
325            path.set_extension("direct");
326
327            match std::fs::remove_file(&path) {
328                Ok(_) => { /* good */ }
329                Err(e) if e.kind() == std::io::ErrorKind::NotFound => { /* good as well */ }
330                Err(e) => {
331                    warn!(
332                        "failed to remove direct pin when adding recursive {:?}: {}",
333                        path, e
334                    );
335                }
336            }
337
338            Ok::<_, Error>(())
339        })
340        .await??;
341
342        Ok(())
343    }
344
345    async fn remove_direct_pin(&self, target: &Cid) -> Result<(), Error> {
346        let permit = Semaphore::acquire_owned(Arc::clone(&self.lock)).await?;
347
348        let mut path = pin_path(self.path.join("pins"), target);
349
350        let span = tracing::Span::current();
351
352        tokio::task::spawn_blocking(move || {
353            let _permit = permit; // move in to threadpool thread
354            let _entered = span.enter();
355
356            path.set_extension("recursive");
357
358            if path.is_file() {
359                return Err(anyhow::anyhow!("is pinned recursively"));
360            }
361
362            path.set_extension("direct");
363
364            match std::fs::remove_file(&path) {
365                Ok(_) => {
366                    trace!("direct pin removed");
367                    Ok(())
368                }
369                Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
370                    Err(anyhow::anyhow!("not pinned or pinned indirectly"))
371                }
372                Err(e) => Err(e.into()),
373            }
374        })
375        .await??;
376
377        Ok(())
378    }
379
380    async fn remove_recursive_pin(&self, target: &Cid, _: References<'_>) -> Result<(), Error> {
381        let permit = Semaphore::acquire_owned(Arc::clone(&self.lock)).await?;
382
383        let mut path = pin_path(self.path.join("pins"), target);
384
385        let span = tracing::Span::current();
386
387        tokio::task::spawn_blocking(move || {
388            let _permit = permit; // move into threadpool thread
389            let _entered = span.enter();
390
391            path.set_extension("direct");
392
393            let mut any = false;
394
395            match std::fs::remove_file(&path) {
396                Ok(_) => {
397                    trace!("direct pin removed");
398                    any |= true;
399                }
400                Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
401                    // nevermind, we are just trying to remove the direct as it should go, if it
402                    // was left by mistake
403                }
404                // Error::new instead of e.into() to help out the type inference
405                Err(e) => return Err(Error::new(e)),
406            }
407
408            path.set_extension("recursive");
409
410            match std::fs::remove_file(&path) {
411                Ok(_) => {
412                    trace!("recursive pin removed");
413                    any |= true;
414                }
415                Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
416                    // we may have removed only the direct pin, but if we cleaned out a direct pin
417                    // this would have been a success
418                }
419                Err(e) => return Err(e.into()),
420            }
421
422            if !any {
423                Err(anyhow::anyhow!("not pinned or pinned indirectly"))
424            } else {
425                Ok(())
426            }
427        })
428        .await??;
429
430        Ok(())
431    }
432
433    async fn list(
434        &self,
435        requirement: Option<PinMode>,
436    ) -> BoxStream<'static, Result<(Cid, PinMode), Error>> {
437        // no locking, dirty reads are probably good enough until gc
438        let cids = self.list_pinfiles().await;
439
440        let path = self.path.join("pins");
441
442        let requirement = PinModeRequirement::from(requirement);
443
444        // depending on what was queried we must iterate through the results in the order of
445        // recursive, direct and indirect.
446        //
447        // if only one kind is required, we must return only those, which may or may not be
448        // easier than doing all of the work. this implementation follows:
449        //
450        // https://github.com/ipfs/go-ipfs/blob/2ae5c52f4f0f074864ea252e90e72e8d5999caba/core/coreapi/pin.go#L222
451        let st = async_stream::try_stream! {
452
453            // keep track of all returned not to give out duplicate cids
454            let mut returned: HashSet<Cid> = HashSet::default();
455
456            // the set of recursive will be interesting after all others
457            let mut recursive: HashSet<Cid> = HashSet::default();
458            let mut direct: HashSet<Cid> = HashSet::default();
459
460            let collect_recursive_for_indirect = requirement.is_indirect_or_any();
461
462            futures::pin_mut!(cids);
463
464            while let Some((cid, mode)) = TryStreamExt::try_next(&mut cids).await? {
465
466                let matches = requirement.matches(&mode);
467
468                if mode == PinMode::Recursive {
469                    if collect_recursive_for_indirect {
470                        recursive.insert(cid);
471                    }
472                    if matches && returned.insert(cid) {
473                        // the recursive pins can always be returned right away since they have
474                        // the highest priority in this listing or output
475                        yield (cid, mode);
476                    }
477                } else if mode == PinMode::Direct && matches {
478                    direct.insert(cid);
479                }
480            }
481
482            trace!(unique = returned.len(), "completed listing recursive");
483
484            // now that the recursive are done, next up in priority order are direct. the set
485            // of directly pinned and recursively pinned should be disjoint, but probably there
486            // are times when 100% accurate results are not possible... Nor needed.
487            for cid in direct {
488                if returned.insert(cid) {
489                    yield (cid, PinMode::Direct)
490                }
491            }
492
493            trace!(unique = returned.len(), "completed listing direct");
494
495            if !collect_recursive_for_indirect {
496                // we didn't collect the recursive to list the indirect so, done.
497                return;
498            }
499
500            // the threadpool passing adds probably some messaging latency, maybe run small
501            // amount in parallel?
502            let mut recursive = futures::stream::iter(recursive.into_iter().map(Ok))
503                .map_ok(move |cid| read_recursively_pinned(path.clone(), cid))
504                .try_buffer_unordered(4);
505
506            while let Some((_, next_batch)) = TryStreamExt::try_next(&mut recursive).await? {
507                for indirect in next_batch {
508                    if returned.insert(indirect) {
509                        yield (indirect, PinMode::Indirect);
510                    }
511                }
512
513                trace!(unique = returned.len(), "completed batch of indirect");
514            }
515        };
516
517        Box::pin(st)
518    }
519
520    async fn query(
521        &self,
522        ids: Vec<Cid>,
523        requirement: Option<PinMode>,
524    ) -> Result<Vec<(Cid, PinKind<Cid>)>, Error> {
525        // response vec gets written to whenever we find out what the pin is
526        let mut response = Vec::with_capacity(ids.len());
527        for _ in 0..ids.len() {
528            response.push(None);
529        }
530
531        let mut remaining = HashMap::new();
532
533        let (check_direct, searched_suffix, gather_indirect) = match requirement {
534            Some(PinMode::Direct) => (true, Some(PinMode::Direct), false),
535            Some(PinMode::Recursive) => (true, Some(PinMode::Recursive), false),
536            Some(PinMode::Indirect) => (false, None, true),
537            None => (true, None, true),
538        };
539
540        let searched_suffix = PinModeRequirement::from(searched_suffix);
541
542        let (mut response, mut remaining) = if check_direct {
543            // find the recursive and direct ones by just seeing if the files exist
544            let base = self.path.join("pins");
545            tokio::task::spawn_blocking(move || {
546                for (i, cid) in ids.into_iter().enumerate() {
547                    let mut path = pin_path(base.clone(), &cid);
548
549                    if let Some(mode) = sync_read_direct_or_recursive(&mut path) {
550                        if searched_suffix.matches(&mode) {
551                            response[i] = Some((
552                                cid,
553                                match mode {
554                                    PinMode::Direct => PinKind::Direct,
555                                    // FIXME: eech that recursive count is now out of place
556                                    PinMode::Recursive => PinKind::Recursive(0),
557                                    // FIXME: this is also quite unfortunate, should make an enum
558                                    // of two?
559                                    _ => unreachable!(),
560                                },
561                            ));
562                            continue;
563                        }
564                    }
565
566                    if !gather_indirect {
567                        // if we are only trying to find recursive or direct, we clearly have not
568                        // found what we were looking for
569                        return Err(anyhow::anyhow!("{} is not pinned", cid));
570                    }
571
572                    // use entry api to discard duplicate cids in input
573                    remaining.entry(cid).or_insert(i);
574                }
575
576                Ok((response, remaining))
577            })
578            .await??
579        } else {
580            for (i, cid) in ids.into_iter().enumerate() {
581                remaining.entry(cid).or_insert(i);
582            }
583            (response, remaining)
584        };
585
586        // now remaining must have all of the cids => first_index mappings which were not found to
587        // be recursive or direct.
588
589        if !remaining.is_empty() {
590            assert!(gather_indirect);
591
592            trace!(
593                remaining = remaining.len(),
594                "query trying to find remaining indirect pins"
595            );
596
597            let recursives = self
598                .list_pinfiles()
599                .await
600                .try_filter_map(|(cid, mode)| {
601                    futures::future::ready(if mode == PinMode::Recursive {
602                        Ok(Some(cid))
603                    } else {
604                        Ok(None)
605                    })
606                })
607                .map_ok(|cid| read_recursively_pinned(self.path.join("pins"), cid))
608                .try_buffer_unordered(4);
609
610            futures::pin_mut!(recursives);
611
612            'out: while let Some((referring, references)) =
613                TryStreamExt::try_next(&mut recursives).await?
614            {
615                // FIXME: maybe binary search?
616                for cid in references {
617                    if let Some(index) = remaining.remove(&cid) {
618                        response[index] = Some((cid, PinKind::IndirectFrom(referring)));
619
620                        if remaining.is_empty() {
621                            break 'out;
622                        }
623                    }
624                }
625            }
626        }
627
628        if let Some((cid, _)) = remaining.into_iter().next() {
629            // the error can be for any of these
630            return Err(anyhow::anyhow!("{} is not pinned", cid));
631        }
632
633        // the input can of course contain duplicate cids so handle them by just giving responses
634        // for the first of the duplicates
635        Ok(response.into_iter().flatten().collect())
636    }
637}
638
639impl FsDataStore {
640    async fn list_pinfiles(
641        &self,
642    ) -> impl futures::stream::Stream<Item = Result<(Cid, PinMode), Error>> + 'static {
643        let stream = match tokio::fs::read_dir(self.path.join("pins")).await {
644            Ok(st) => Either::Left(ReadDirStream::new(st)),
645            // make this into a stream which will only yield the initial error
646            Err(e) => Either::Right(futures::stream::once(futures::future::ready(Err(e)))),
647        };
648
649        stream
650            .and_then(|d| async move {
651                // map over the shard directories
652                Ok(if d.file_type().await?.is_dir() {
653                    Either::Left(ReadDirStream::new(fs::read_dir(d.path()).await?))
654                } else {
655                    Either::Right(empty())
656                })
657            })
658            // flatten each
659            .try_flatten()
660            .map_err(Error::new)
661            // convert the paths ending in ".data" into cid
662            .try_filter_map(|d| {
663                let name = d.file_name();
664                let path: &std::path::Path = name.as_ref();
665
666                let mode = if path.extension() == Some("recursive".as_ref()) {
667                    Some(PinMode::Recursive)
668                } else if path.extension() == Some("direct".as_ref()) {
669                    Some(PinMode::Direct)
670                } else {
671                    None
672                };
673
674                let maybe_tuple = mode.and_then(move |mode| {
675                    filestem_to_pin_cid(path.file_stem()).map(move |cid| (cid, mode))
676                });
677
678                futures::future::ready(Ok(maybe_tuple))
679            })
680    }
681}
682
683/// Reads our serialized format for recusive pins, which is JSON array of stringified Cids.
684///
685/// On file not found error returns an empty Vec as if nothing had happened. This is because we
686/// do "atomic writes" and file removals are expected to be atomic, but reads don't synchronize on
687/// writes, so while iterating it's possible that recursive pin is removed.
688async fn read_recursively_pinned(path: PathBuf, cid: Cid) -> Result<(Cid, Vec<Cid>), Error> {
689    // our fancy format is a Vec<Cid> as json
690    let mut path = pin_path(path, &cid);
691    path.set_extension("recursive");
692    let contents = match tokio::fs::read(path).await {
693        Ok(vec) => vec,
694        Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
695            // per method comment, return empty Vec; the pins may have seemed to be present earlier
696            // but no longer are.
697            return Ok((cid, Vec::new()));
698        }
699        Err(e) => return Err(e.into()),
700    };
701
702    let cids: Vec<&str> = serde_json::from_slice(&contents)?;
703
704    // returning a stream which is updated 8kB at time or such might be better, but this should
705    // scale quite up as well.
706    let found = cids
707        .into_iter()
708        .map(Cid::try_from)
709        .collect::<Result<Vec<Cid>, _>>()?;
710
711    trace!(cid = %cid, count = found.len(), "read indirect pins");
712    Ok((cid, found))
713}
714
715async fn read_direct_or_recursive(mut block_path: PathBuf) -> Result<Option<PinMode>, Error> {
716    tokio::task::spawn_blocking(move || Ok(sync_read_direct_or_recursive(&mut block_path))).await?
717}
718
719fn sync_read_direct_or_recursive(block_path: &mut PathBuf) -> Option<PinMode> {
720    // important to first check the recursive then only the direct; the latter might be a left over
721    for (ext, mode) in &[
722        ("recursive", PinMode::Recursive),
723        ("direct", PinMode::Direct),
724    ] {
725        block_path.set_extension(ext);
726        // Path::is_file calls fstat and coerces errors to false; this might be enough, as
727        // we are holding the lock
728        if block_path.is_file() {
729            return Some(*mode);
730        }
731    }
732    None
733}
734
735fn sync_write_recursive_pin(
736    file: std::fs::File,
737    count: usize,
738    cids: impl Iterator<Item = String>,
739) -> Result<(), Error> {
740    use serde::{ser::SerializeSeq, Serializer};
741    use std::io::{BufWriter, Write};
742    let writer = BufWriter::new(file);
743
744    let mut serializer = serde_json::ser::Serializer::new(writer);
745
746    let mut seq = serializer.serialize_seq(Some(count))?;
747    for cid in cids {
748        seq.serialize_element(&cid)?;
749    }
750    seq.end()?;
751
752    let mut writer = serializer.into_inner();
753    writer.flush()?;
754
755    let file = writer.into_inner()?;
756    file.sync_all()?;
757    Ok(())
758}
759
760#[cfg(test)]
761crate::pinstore_interface_tests!(
762    common_tests,
763    crate::repo::datastore::flatfs::FsDataStore::new
764);
765
766#[cfg(test)]
767mod test {
768    use crate::repo::{datastore::flatfs::FsDataStore, DataStore};
769
770    #[tokio::test]
771    async fn test_kv_datastore() -> anyhow::Result<()> {
772        let tmp = std::env::temp_dir();
773        let store = FsDataStore::new(tmp.clone());
774        let key = [1, 2, 3, 4];
775        let value = [5, 6, 7, 8];
776
777        store.init().await?;
778
779        let contains = store.contains(&key).await.unwrap();
780        assert!(!contains);
781        let get = store.get(&key).await.unwrap_or_default();
782        assert_eq!(get, None);
783        assert!(store.remove(&key).await.is_err());
784
785        store.put(&key, &value).await.unwrap();
786        let contains = store.contains(&key).await.unwrap();
787        assert!(contains);
788        let get = store.get(&key).await.unwrap();
789        assert_eq!(get, Some(value.to_vec()));
790
791        store.remove(&key).await.unwrap();
792        let contains = store.contains(&key).await.unwrap();
793        assert!(!contains);
794        let get = store.get(&key).await.unwrap_or_default();
795        assert_eq!(get, None);
796        drop(store);
797        Ok(())
798    }
799}