1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
//! Persistent fs backed repo.
//!
//! Consists of [`FsDataStore`] and [`FsBlockStore`].

use crate::error::Error;
use async_trait::async_trait;
use parking_lot::Mutex;
use std::fs::File;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::Semaphore;

use super::{BlockRm, BlockRmError, Column, DataStore, Lock, LockError, RepoCid};

/// The PinStore implementation for FsDataStore
mod pinstore;

/// The FsBlockStore implementation
mod blocks;
pub use blocks::FsBlockStore;

/// Path mangling done for pins and blocks
mod paths;
use paths::{block_path, filestem_to_block_cid, filestem_to_pin_cid, pin_path};

/// FsDataStore which uses the filesystem as a lockable key-value store. Maintains a similar to
/// [`FsBlockStore`] sharded two level storage. Direct have empty files, recursive pins record all of
/// their indirect descendants. Pin files are separated by their file extensions.
///
/// When modifying, single lock is used.
///
/// For the [`crate::repo::PinStore`] implementation see `fs/pinstore.rs`.
#[derive(Debug)]
pub struct FsDataStore {
    /// The base directory under which we have a sharded directory structure, and the individual
    /// blocks are stored under the shard. See unixfs/examples/cat.rs for read example.
    path: PathBuf,

    /// Start with simple, conservative solution, allows concurrent queries but single writer.
    /// It is assumed the reads do not require permit as non-empty writes are done through
    /// tempfiles and the consistency regarding reads is not a concern right now. For garbage
    /// collection implementation, it might be needed to hold this permit for the duration of
    /// garbage collection, or something similar.
    lock: Arc<Semaphore>,
}

impl FsDataStore {
    pub fn new(root: PathBuf) -> Self {
        FsDataStore {
            path: root,
            lock: Arc::new(Semaphore::new(1)),
        }
    }
}

/// The column operations are all unimplemented pending at least downscoping of the
/// DataStore trait itself.
#[async_trait]
impl DataStore for FsDataStore {
    async fn init(&self) -> Result<(), Error> {
        // Although `pins` directory is created when inserting a data, is it not created when there are any attempts at listing the pins (thus causing to fail)
        tokio::fs::create_dir_all(&self.path.join("pins")).await?;
        Ok(())
    }

    async fn open(&self) -> Result<(), Error> {
        Ok(())
    }

    async fn contains(&self, _col: Column, _key: &[u8]) -> Result<bool, Error> {
        Err(anyhow::anyhow!("not implemented"))
    }

    async fn get(&self, _col: Column, _key: &[u8]) -> Result<Option<Vec<u8>>, Error> {
        Err(anyhow::anyhow!("not implemented"))
    }

    async fn put(&self, _col: Column, _key: &[u8], _value: &[u8]) -> Result<(), Error> {
        Err(anyhow::anyhow!("not implemented"))
    }

    async fn remove(&self, _col: Column, _key: &[u8]) -> Result<(), Error> {
        Err(anyhow::anyhow!("not implemented"))
    }

    async fn wipe(&self) {}
}

#[derive(Debug)]
pub struct FsLock {
    file: Mutex<Option<File>>,
    path: PathBuf,
    state: Mutex<State>,
}

#[derive(Debug)]
enum State {
    Unlocked,
    Exclusive,
}

impl FsLock {
    pub fn new(path: PathBuf) -> Self {
        Self {
            file: Mutex::new(None),
            path,
            state: Mutex::new(State::Unlocked),
        }
    }
}

impl Lock for FsLock {
    fn try_exclusive(&self) -> Result<(), LockError> {
        use fs2::FileExt;
        use std::fs::OpenOptions;

        let file = OpenOptions::new()
            .read(true)
            .write(true)
            .create(true)
            .open(&self.path)?;

        file.try_lock_exclusive()?;

        *self.state.lock() = State::Exclusive;
        *self.file.lock() = Some(file);

        Ok(())
    }
}

#[cfg(test)]
crate::pinstore_interface_tests!(common_tests, crate::repo::fs::FsDataStore::new);

#[cfg(test)]
mod tests {
    use super::{FsLock, Lock};

    #[test]
    fn creates_an_exclusive_repo_lock() {
        let temp_dir = std::env::temp_dir();
        let lockfile_path = temp_dir.join("repo_lock");

        let lock = FsLock::new(lockfile_path.clone());
        let result = lock.try_exclusive();
        assert!(result.is_ok());

        let failing_lock = FsLock::new(lockfile_path.clone());
        let result = failing_lock.try_exclusive();
        assert!(result.is_err());

        // Clean-up.
        std::fs::remove_file(lockfile_path).unwrap();
    }
}