cacache_sync/
index.rs

1use std::collections::HashSet;
2use std::fs::{self, OpenOptions};
3use std::hash::{Hash, Hasher};
4use std::io::{ErrorKind, Write};
5use std::path::{Path, PathBuf};
6use std::time::{SystemTime, UNIX_EPOCH};
7
8use digest::Digest;
9use either::{Left, Right};
10use serde::{Deserialize, Serialize};
11use serde_json::Value;
12use sha1::Sha1;
13use sha2::Sha256;
14use ssri::Integrity;
15use walkdir::WalkDir;
16
17use crate::errors::{Internal, InternalResult, Result};
18use crate::put::WriteOpts;
19
20const INDEX_VERSION: &str = "5";
21
22/// Represents a cache index entry, which points to content.
23#[derive(PartialEq, Debug)]
24pub struct Metadata {
25    /// Key this entry is stored under.
26    pub key: String,
27    /// Integrity hash for the stored data. Acts as a key into {cache}/content.
28    pub integrity: Integrity,
29    /// Timestamp in unix milliseconds when this entry was written.
30    pub time: u128,
31    /// Size of data associated with this entry.
32    pub size: usize,
33    /// Arbitrary JSON  associated with this entry.
34    pub metadata: Value,
35}
36
37#[derive(Deserialize, Serialize, Debug)]
38struct SerializableMetadata {
39    key: String,
40    integrity: Option<String>,
41    time: u128,
42    size: usize,
43    metadata: Value,
44}
45
46impl PartialEq for SerializableMetadata {
47    fn eq(&self, other: &Self) -> bool {
48        self.key == other.key
49    }
50}
51
52impl Eq for SerializableMetadata {}
53
54impl Hash for SerializableMetadata {
55    fn hash<H: Hasher>(&self, state: &mut H) {
56        self.key.hash(state);
57    }
58}
59
60pub fn insert(cache: &Path, key: &str, opts: WriteOpts) -> Result<Integrity> {
61    let bucket = bucket_path(cache, key);
62    fs::create_dir_all(bucket.parent().unwrap()).with_context(|| {
63        format!(
64            "Failed to create index bucket directory: {:?}",
65            bucket.parent().unwrap()
66        )
67    })?;
68    let stringified = serde_json::to_string(&SerializableMetadata {
69        key: key.to_owned(),
70        integrity: opts.sri.clone().map(|x| x.to_string()),
71        time: opts.time.unwrap_or_else(now),
72        size: opts.size.unwrap_or(0),
73        metadata: opts.metadata.unwrap_or(serde_json::Value::Null),
74    })
75    .with_context(|| format!("Failed to serialize entry with key `{}`", key))?;
76
77    let mut buck = OpenOptions::new()
78        .create(true)
79        .append(true)
80        .open(&bucket)
81        .with_context(|| format!("Failed to create or open index bucket at {:?}", bucket))?;
82
83    let out = format!("\n{}\t{}", hash_entry(&stringified), stringified);
84    buck.write_all(out.as_bytes())
85        .with_context(|| format!("Failed to write to index bucket at {:?}", bucket))?;
86    buck.flush()
87        .with_context(|| format!("Failed to flush bucket at {:?}", bucket))?;
88    Ok(opts
89        .sri
90        .or_else(|| "sha1-deadbeef".parse::<Integrity>().ok())
91        .unwrap())
92}
93
94pub fn find(cache: &Path, key: &str) -> Result<Option<Metadata>> {
95    let bucket = bucket_path(cache, key);
96    Ok(bucket_entries(&bucket)
97        .with_context(|| format!("Failed to read index bucket entries from {:?}", bucket))?
98        .into_iter()
99        .fold(None, |acc, entry| {
100            if entry.key == key {
101                if let Some(integrity) = entry.integrity {
102                    let integrity: Integrity = match integrity.parse() {
103                        Ok(sri) => sri,
104                        _ => return acc,
105                    };
106                    Some(Metadata {
107                        key: entry.key,
108                        integrity,
109                        size: entry.size,
110                        time: entry.time,
111                        metadata: entry.metadata,
112                    })
113                } else {
114                    None
115                }
116            } else {
117                acc
118            }
119        }))
120}
121
122pub fn delete(cache: &Path, key: &str) -> Result<()> {
123    insert(
124        cache,
125        key,
126        WriteOpts {
127            algorithm: None,
128            size: None,
129            sri: None,
130            time: None,
131            metadata: None,
132        },
133    )
134    .map(|_| ())
135}
136
137pub fn ls(cache: &Path) -> impl Iterator<Item = Result<Metadata>> {
138    WalkDir::new(cache.join(format!("index-v{}", INDEX_VERSION)))
139        .into_iter()
140        .map(|bucket| {
141            let bucket = bucket.to_internal()?;
142
143            if bucket.file_type().is_dir() {
144                return Ok(Vec::new());
145            }
146
147            Ok(bucket_entries(bucket.path())?
148                .into_iter()
149                .collect::<HashSet<SerializableMetadata>>()
150                .into_iter()
151                .filter_map(|se| {
152                    if let Some(i) = se.integrity {
153                        Some(Metadata {
154                            key: se.key,
155                            integrity: i.parse().unwrap(),
156                            time: se.time,
157                            size: se.size,
158                            metadata: se.metadata,
159                        })
160                    } else {
161                        None
162                    }
163                })
164                .collect())
165        })
166        .flat_map(|res| match res {
167            Ok(it) => Left(it.into_iter().map(Ok)),
168            Err(err) => Right(std::iter::once(Err(err))),
169        })
170}
171
172fn bucket_path(cache: &Path, key: &str) -> PathBuf {
173    let hashed = hash_key(key);
174    cache
175        .join(format!("index-v{}", INDEX_VERSION))
176        .join(&hashed[0..2])
177        .join(&hashed[2..4])
178        .join(&hashed[4..])
179}
180
181fn hash_key(key: &str) -> String {
182    let mut hasher = Sha1::new();
183    hasher.update(key);
184    hex::encode(hasher.finalize())
185}
186
187fn hash_entry(key: &str) -> String {
188    let mut hasher = Sha256::new();
189    hasher.update(key);
190    hex::encode(hasher.finalize())
191}
192
193fn now() -> u128 {
194    SystemTime::now()
195        .duration_since(UNIX_EPOCH)
196        .unwrap()
197        .as_millis()
198}
199
200fn bucket_entries(bucket: &Path) -> InternalResult<Vec<SerializableMetadata>> {
201    use std::io::{BufRead, BufReader};
202    fs::File::open(bucket)
203        .map(|file| {
204            BufReader::new(file)
205                .lines()
206                .filter_map(std::result::Result::ok)
207                .filter_map(|entry| {
208                    let entry_str = match entry.split('\t').collect::<Vec<&str>>()[..] {
209                        [hash, entry_str] if hash_entry(entry_str) == hash => entry_str,
210                        // Something's wrong with the entry. Abort.
211                        _ => return None,
212                    };
213                    serde_json::from_str::<SerializableMetadata>(entry_str).ok()
214                })
215                .collect()
216        })
217        .or_else(|err| {
218            if err.kind() == ErrorKind::NotFound {
219                Ok(Vec::new())
220            } else {
221                Err(err).to_internal()?
222            }
223        })
224}
225
226#[cfg(test)]
227mod tests {
228    use super::*;
229    use serde_json::json;
230
231    const MOCK_ENTRY: &str = "\n251d18a2b33264ea8655695fd23c88bd874cdea2c3dc9d8f9b7596717ad30fec\t{\"key\":\"hello\",\"integrity\":\"sha1-deadbeef\",\"time\":1234567,\"size\":0,\"metadata\":null}";
232
233    #[test]
234    fn insert_basic() {
235        let tmp = tempfile::tempdir().unwrap();
236        let dir = tmp.path().to_owned();
237        let sri: Integrity = "sha1-deadbeef".parse().unwrap();
238        let time = 1_234_567;
239        let opts = WriteOpts::new().integrity(sri).time(time);
240        insert(&dir, "hello", opts).unwrap();
241        let entry = std::fs::read_to_string(bucket_path(&dir, "hello")).unwrap();
242        assert_eq!(entry, MOCK_ENTRY);
243    }
244
245    #[test]
246    fn find_basic() {
247        let tmp = tempfile::tempdir().unwrap();
248        let dir = tmp.path().to_owned();
249        let sri: Integrity = "sha1-deadbeef".parse().unwrap();
250        let time = 1_234_567;
251        let bucket = bucket_path(&dir, "hello");
252        fs::create_dir_all(bucket.parent().unwrap()).unwrap();
253        fs::write(bucket, MOCK_ENTRY).unwrap();
254        let entry = find(&dir, "hello").unwrap().unwrap();
255        assert_eq!(
256            entry,
257            Metadata {
258                key: String::from("hello"),
259                integrity: sri,
260                time,
261                size: 0,
262                metadata: json!(null)
263            }
264        );
265    }
266
267    #[test]
268    fn find_none() {
269        let tmp = tempfile::tempdir().unwrap();
270        let dir = tmp.path().to_owned();
271        assert_eq!(find(&dir, "hello").unwrap(), None);
272    }
273
274    #[test]
275    fn delete_basic() {
276        let tmp = tempfile::tempdir().unwrap();
277        let dir = tmp.path().to_owned();
278        let sri: Integrity = "sha1-deadbeef".parse().unwrap();
279        let time = 1_234_567;
280        let opts = WriteOpts::new().integrity(sri).time(time);
281        insert(&dir, "hello", opts).unwrap();
282        delete(&dir, "hello").unwrap();
283        assert_eq!(find(&dir, "hello").unwrap(), None);
284    }
285
286    #[test]
287    fn round_trip() {
288        let tmp = tempfile::tempdir().unwrap();
289        let dir = tmp.path().to_owned();
290        let sri: Integrity = "sha1-deadbeef".parse().unwrap();
291        let time = 1_234_567;
292        let opts = WriteOpts::new().integrity(sri.clone()).time(time);
293        insert(&dir, "hello", opts).unwrap();
294        let entry = find(&dir, "hello").unwrap().unwrap();
295        assert_eq!(
296            entry,
297            Metadata {
298                key: String::from("hello"),
299                integrity: sri,
300                time,
301                size: 0,
302                metadata: json!(null)
303            }
304        );
305    }
306
307    #[test]
308    fn ls_basic() {
309        let tmp = tempfile::tempdir().unwrap();
310        let dir = tmp.path().to_owned();
311        let sri: Integrity = "sha1-deadbeef".parse().unwrap();
312        let time = 1_234_567;
313        let opts = WriteOpts::new().integrity(sri.clone()).time(time);
314        insert(&dir, "hello", opts).unwrap();
315        let opts = WriteOpts::new().integrity(sri).time(time);
316        insert(&dir, "world", opts).unwrap();
317
318        let mut entries = ls(&dir)
319            .map(|x| Ok(x?.key))
320            .collect::<Result<Vec<_>>>()
321            .unwrap();
322        entries.sort();
323        assert_eq!(entries, vec![String::from("hello"), String::from("world")])
324    }
325}