apt_swarm/db/
disk.rs

1#[cfg(unix)]
2use super::unix::DatabaseUnixClient;
3use super::{
4    compression, consume,
5    consume::Consume as _,
6    exclusive::Exclusive,
7    header::{BlockHeader, CryptoHash},
8    DatabaseClient, DatabaseHandle,
9};
10use crate::config::Config;
11use crate::db;
12use crate::errors::*;
13use crate::signed::Signed;
14use crate::sync;
15use async_trait::async_trait;
16use bstr::BStr;
17use futures::{Stream, StreamExt};
18use sequoia_openpgp::Fingerprint;
19use std::borrow::Cow;
20use std::io::ErrorKind;
21use std::path::{Path, PathBuf};
22use tokio::fs;
23use tokio::io::{AsyncBufReadExt, AsyncSeekExt, AsyncWriteExt, BufReader, SeekFrom};
24
25pub const SHARD_ID_SIZE: usize = 2;
26
27/// Writers should open the database in exclusive mode
28/// Readers can operate on a database that's being written to
29#[derive(Debug, PartialEq, Copy, Clone)]
30pub enum AccessMode {
31    Exclusive,
32    Relaxed,
33}
34
35fn folder_matches_prefix<'a>(folder: &str, full_prefix: &'a [u8]) -> Option<&'a [u8]> {
36    let (prefix, suffix) = full_prefix
37        .split_at_checked(folder.len())
38        .unwrap_or((full_prefix, &[]));
39    if BStr::new(folder.as_bytes()).starts_with(prefix) {
40        if prefix != full_prefix {
41            suffix.strip_prefix(b"/")
42        } else {
43            Some(suffix)
44        }
45    } else {
46        None
47    }
48}
49
50fn file_matches_prefix(file: &str, prefix: &[u8]) -> bool {
51    // on windows we need to do extra normalization because `:` is illegal
52    #[cfg(not(unix))]
53    use bstr::ByteSlice;
54    #[cfg(not(unix))]
55    let file = file.replace(':', "_");
56    #[cfg(not(unix))]
57    let prefix = prefix.replace(b":", b"_");
58    #[cfg(not(unix))]
59    let prefix = &prefix;
60
61    // ensure filename qualifies for prefix we're looking for
62    let prefix = prefix
63        .split_at_checked(file.len())
64        .map(|(prefix, _)| prefix)
65        .unwrap_or(prefix);
66    BStr::new(file.as_bytes()).starts_with(prefix)
67}
68
69fn derive_shard_name<'a>(key: &str, hash_str: &'a str) -> Result<Cow<'a, str>> {
70    let idx = hash_str
71        .find(':')
72        .with_context(|| anyhow!("Missing hash id in key: {key:?}"))?;
73
74    let (shard, _) = hash_str
75        .split_at_checked(idx + 1 + SHARD_ID_SIZE)
76        .with_context(|| anyhow!("Key is too short: {key:?}"))?;
77
78    // perform extra normalization if necessary
79    if cfg!(unix) {
80        Ok(Cow::Borrowed(shard))
81    } else {
82        let shard = shard.replace(':', "_");
83        Ok(Cow::Owned(shard))
84    }
85}
86
87#[derive(Debug)]
88pub struct Database {
89    path: PathBuf,
90    exclusive: Option<Exclusive>,
91}
92
93#[async_trait]
94impl DatabaseClient for Database {
95    async fn add_release(&mut self, fp: &Fingerprint, signed: &Signed) -> Result<String> {
96        let normalized = signed.to_clear_signed()?;
97        let hash = CryptoHash::calculate(&normalized);
98
99        let (key, _new) = self.insert(fp, hash, &normalized).await?;
100        Ok(key)
101    }
102
103    async fn index_from_scan(&mut self, query: &sync::TreeQuery) -> Result<(String, usize)> {
104        sync::index_from_scan(self, query).await
105    }
106
107    async fn spill(&self, prefix: &[u8]) -> Result<Vec<(db::Key, db::Value)>> {
108        let mut out = Vec::new();
109        let stream = self.scan_values(prefix);
110        tokio::pin!(stream);
111        while let Some(item) = stream.next().await {
112            let (hash, data) = item.context("Failed to read from database (spill)")?;
113            out.push((hash, data));
114        }
115        Ok(out)
116    }
117
118    async fn get_value(&self, key: &[u8]) -> Result<db::Value> {
119        let value = self.get(key).await?;
120        let value = value.context("Key not found in database")?;
121        Ok(value)
122    }
123
124    async fn count(&mut self, prefix: &[u8]) -> Result<u64> {
125        let count = self.scan_keys(prefix).count().await;
126        Ok(count as u64)
127    }
128}
129
130impl Database {
131    pub async fn open(config: &Config, mode: AccessMode) -> Result<DatabaseHandle> {
132        #[cfg(unix)]
133        if mode != AccessMode::Exclusive {
134            let sock_path = config.db_socket_path()?;
135            if let Ok(client) = DatabaseUnixClient::connect(&sock_path).await {
136                return Ok(DatabaseHandle::Unix(client));
137            }
138        }
139
140        Ok(DatabaseHandle::Direct(
141            Self::open_directly(config, mode).await?,
142        ))
143    }
144
145    pub async fn open_directly(config: &Config, mode: AccessMode) -> Result<Self> {
146        let path = config.database_path()?;
147        let db = Self::open_at(path, mode).await?;
148        Ok(db)
149    }
150
151    pub async fn open_at(path: PathBuf, mode: AccessMode) -> Result<Self> {
152        debug!("Opening database at {path:?}");
153
154        fs::create_dir_all(&path)
155            .await
156            .with_context(|| anyhow!("Failed to create directory: {path:?}"))?;
157
158        let exclusive = if mode == AccessMode::Exclusive {
159            let exclusive = Exclusive::acquire(&path).await?;
160            Some(exclusive)
161        } else {
162            None
163        };
164
165        Ok(Database { path, exclusive })
166    }
167
168    pub async fn get<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<db::Value>> {
169        let stream = self.scan_values(key.as_ref());
170        tokio::pin!(stream);
171        let Some(entry) = stream.next().await else {
172            return Ok(None);
173        };
174        let entry = entry.context("Failed to read from database (get)")?;
175        Ok(Some(entry.1))
176    }
177
178    pub async fn insert(
179        &mut self,
180        fp: &Fingerprint,
181        hash: CryptoHash,
182        value: &[u8],
183    ) -> Result<(String, bool)> {
184        // check if write is necessary
185        let fp_str = format!("{fp:X}");
186        let hash_str = hash.as_str();
187        let key = format!("{fp_str}/{hash_str}");
188
189        if self.count(key.as_bytes()).await? > 0 {
190            info!("Skipping document, already present: {key:?}");
191            return Ok((key, false));
192        }
193        info!("Adding document to database: {key:?}");
194
195        // determine file to write to
196        let shard = derive_shard_name(&key, hash_str)?;
197
198        let folder = self.path.join(fp_str);
199        fs::create_dir_all(&folder)
200            .await
201            .with_context(|| anyhow!("Failed to create folder: {folder:?}"))?;
202        let path = folder.join(&*shard);
203
204        // open file
205        let mut file = fs::OpenOptions::new()
206            .read(true)
207            .write(true)
208            .append(true)
209            .create(true)
210            .open(&path)
211            .await
212            .with_context(|| anyhow!("Failed to open file: {path:?}"))?;
213
214        // ensure file is in clean state
215        let Some(exclusive) = &mut self.exclusive else {
216            bail!("Tried to perform insert on readonly database");
217        };
218        exclusive
219            .ensure_tail_integrity(&path, &mut file)
220            .await
221            .context("Failed to verify tail integrity")?;
222
223        // seek to end of file for appending data
224        file.seek(SeekFrom::End(0))
225            .await
226            .with_context(|| anyhow!("Failed to seek to end of file: {path:?}"))?;
227
228        // prepare header and data
229        let compressed = compression::compress(value)
230            .await
231            .with_context(|| anyhow!("Failed to compress block data: {path:?}"))?;
232        let header = BlockHeader::new(hash, value.len(), compressed.len());
233
234        // write to file
235        header
236            .write(&mut file)
237            .await
238            .context("Failed to write block header")?;
239        file.write_all(&compressed)
240            .await
241            .context("Failed to write block data")?;
242
243        Ok((key, true))
244    }
245
246    async fn read_directory_sorted(path: &Path) -> Result<Vec<(PathBuf, String)>> {
247        let mut dir = match fs::read_dir(path).await {
248            Ok(dir) => dir,
249            Err(err) if err.kind() == ErrorKind::NotFound => return Ok(vec![]),
250            Err(err) => {
251                return Err(err).with_context(|| anyhow!("Failed to read directory: {path:?}"));
252            }
253        };
254
255        let mut out = Vec::new();
256        while let Some(entry) = dir
257            .next_entry()
258            .await
259            .with_context(|| anyhow!("Failed to read next directory entry: {path:?}"))?
260        {
261            let path = entry.path();
262
263            let filename = entry
264                .file_name()
265                .into_string()
266                .map_err(|err| anyhow!("Found invalid directory entry name: {err:?}"))?;
267
268            out.push((path, filename));
269        }
270
271        out.sort();
272        Ok(out)
273    }
274
275    async fn read_shard<C: consume::Consume>(
276        path: &Path,
277        folder_name: &str,
278        partitioned_prefix: &[u8],
279    ) -> Result<Vec<(db::Key, C::Item)>> {
280        let file = fs::File::open(path)
281            .await
282            .with_context(|| anyhow!("Failed to open database file: {path:?}"))?;
283
284        let mut out = Vec::new();
285        let mut reader = BufReader::new(file);
286
287        loop {
288            // check if more data is available
289            if reader
290                .fill_buf()
291                .await
292                .with_context(|| anyhow!("Failed to check for end of file: {path:?}"))?
293                .is_empty()
294            {
295                // reached EOF
296                break;
297            }
298
299            let (header, _n) = BlockHeader::parse(&mut reader)
300                .await
301                .with_context(|| anyhow!("Failed to read block header: {path:?}"))?;
302
303            if header.hash.0.as_bytes().starts_with(partitioned_prefix) {
304                // header is eligible, add to list
305                let data = C::consume(&mut reader, &header)
306                    .await
307                    .with_context(|| anyhow!("Failed to process block: {path:?}"))?;
308
309                let key = format!("{}/{}", folder_name, header.hash.0);
310                out.push((key.into_bytes(), data));
311            } else {
312                // does not match prefix, skip over it
313                consume::FastSkipValue::consume(&mut reader, &header)
314                    .await
315                    .with_context(|| anyhow!("Failed to process block: {path:?}"))?;
316            }
317        }
318
319        out.sort();
320        Ok(out)
321    }
322
323    pub fn scan_keys<'a>(&'a self, prefix: &'a [u8]) -> impl Stream<Item = Result<db::Key>> + 'a {
324        self.scan_prefix::<consume::FastSkipValue>(prefix)
325            .map(|item| item.map(|(key, _value)| key))
326    }
327
328    pub fn scan_values<'a>(
329        &'a self,
330        prefix: &'a [u8],
331    ) -> impl Stream<Item = Result<(db::Key, db::Value)>> + 'a {
332        self.scan_prefix::<consume::ReadValue>(prefix)
333    }
334
335    fn scan_prefix<'a, C: consume::Consume>(
336        &'a self,
337        prefix: &'a [u8],
338    ) -> impl Stream<Item = Result<(db::Key, C::Item)>> + 'a {
339        async_stream::try_stream! {
340            for (folder_path, folder_name) in Self::read_directory_sorted(&self.path).await? {
341                if !folder_path.is_dir() {
342                    warn!("Found unexpected file in storage folder: {folder_path:?}");
343                    continue;
344                }
345
346                let Some(partitioned_prefix) = folder_matches_prefix(&folder_name, prefix)
347                else {
348                    continue;
349                };
350
351                for (path, filename) in Self::read_directory_sorted(&folder_path).await? {
352                    if !file_matches_prefix(&filename, partitioned_prefix) {
353                        continue;
354                    }
355
356                    for item in Self::read_shard::<C>(&path, &folder_name, partitioned_prefix).await? {
357                        yield item;
358                    }
359                }
360            }
361        }
362    }
363}
364
365#[cfg(test)]
366mod tests {
367    use super::*;
368
369    #[test]
370    fn test_folder_folder_matches_prefix() {
371        assert_eq!(
372            folder_matches_prefix("ED541312A33F1128F10B1C6C54404762BBB6E853", b""),
373            Some(&b""[..])
374        );
375        assert_eq!(
376            folder_matches_prefix("ED541312A33F1128F10B1C6C54404762BBB6E853", b"E"),
377            Some(&b""[..])
378        );
379        assert_eq!(
380            folder_matches_prefix("ED541312A33F1128F10B1C6C54404762BBB6E853", b"EF"),
381            None
382        );
383        assert_eq!(
384            folder_matches_prefix("ED541312A33F1128F10B1C6C54404762BBB6E853", b"ED541312"),
385            Some(&b""[..])
386        );
387        assert_eq!(
388            folder_matches_prefix(
389                "ED541312A33F1128F10B1C6C54404762BBB6E853",
390                b"ED541312A33F1128F10B1C6C54404762BBB6E853"
391            ),
392            Some(&b""[..])
393        );
394        assert_eq!(
395            folder_matches_prefix(
396                "ED541312A33F1128F10B1C6C54404762BBB6E853",
397                b"ED541312A33F1128F10B1C6C54404762BBB6E853/"
398            ),
399            Some(&b""[..])
400        );
401        assert_eq!(
402            folder_matches_prefix(
403                "ED541312A33F1128F10B1C6C54404762BBB6E853",
404                b"ED541312A33F1128F10B1C6C54404762BBB6E853/sha256:"
405            ),
406            Some(&b"sha256:"[..])
407        );
408        assert_eq!(folder_matches_prefix(
409            "ED541312A33F1128F10B1C6C54404762BBB6E853",
410            b"ED541312A33F1128F10B1C6C54404762BBB6E853/sha256:ffe924d86aa74fdfe8b8d4b8cd9623c5df7aef626a7aada3416dc83e44e7939d"
411        ), Some(&b"sha256:ffe924d86aa74fdfe8b8d4b8cd9623c5df7aef626a7aada3416dc83e44e7939d"[..]));
412    }
413
414    #[test]
415    fn test_folder_folder_matches_prefix_bad_inputs() {
416        assert_eq!(
417            folder_matches_prefix(
418                "ED541312A33F1128F10B1C6C54404762BBB6E853",
419                b"ED541312A33F1128F10B1C6C54404762BBB6E853//"
420            ),
421            Some(&b"/"[..])
422        );
423        assert_eq!(
424            folder_matches_prefix(
425                "ED541312A33F1128F10B1C6C54404762BBB6E853",
426                b"ED541312A33F1128F10B1C6C54404762BBB6E8533"
427            ),
428            None
429        );
430        assert_eq!(
431            folder_matches_prefix(
432                "ED541312A33F1128F10B1C6C54404762BBB6E853",
433                b"ED541312A33F1128F10B1C6C54404762BBB6E85333"
434            ),
435            None
436        );
437        assert_eq!(
438            folder_matches_prefix(
439                "ED541312A33F1128F10B1C6C54404762BBB6E853",
440                b"ED541312A33F1128F10B1C6C54404762BBB6E8533/"
441            ),
442            None
443        );
444    }
445
446    #[test]
447    fn test_file_matches_prefix() {
448        assert!(file_matches_prefix("sha256:ff", b""));
449        assert!(file_matches_prefix("sha256:ff", b"sha"));
450        assert!(file_matches_prefix("sha256:ff", b"sha256:"));
451        assert!(file_matches_prefix("sha256:ff", b"sha256:f"));
452        assert!(file_matches_prefix("sha256:ff", b"sha256:ffe"));
453        assert!(file_matches_prefix(
454            "sha256:ff",
455            b"sha256:ffe924d86aa74fdfe8b8d4b8cd9623c5df7aef626a7aada34"
456        ));
457        assert!(!file_matches_prefix("sha256:ff", b"sha256:e"));
458        assert!(!file_matches_prefix("sha256:ff", b"sha256:fe"));
459        assert!(!file_matches_prefix(
460            "sha512:ff",
461            b"sha256:ffe924d86aa74fdfe8b8d4b8cd9623c5df7aef626a7aada34"
462        ));
463        assert!(!file_matches_prefix(
464            "sha256:ff",
465            b"sha512:ffe924d86aa74fdfe8b8d4b8cd9623c5df7aef626a7aada34"
466        ));
467    }
468}