1#[cfg(unix)]
2use super::unix::DatabaseUnixClient;
3use super::{
4 compression, consume,
5 consume::Consume as _,
6 exclusive::Exclusive,
7 header::{BlockHeader, CryptoHash},
8 DatabaseClient, DatabaseHandle,
9};
10use crate::config::Config;
11use crate::db;
12use crate::errors::*;
13use crate::signed::Signed;
14use crate::sync;
15use async_trait::async_trait;
16use bstr::BStr;
17use futures::{Stream, StreamExt};
18use sequoia_openpgp::Fingerprint;
19use std::borrow::Cow;
20use std::io::ErrorKind;
21use std::path::{Path, PathBuf};
22use tokio::fs;
23use tokio::io::{AsyncBufReadExt, AsyncSeekExt, AsyncWriteExt, BufReader, SeekFrom};
24
25pub const SHARD_ID_SIZE: usize = 2;
26
27#[derive(Debug, PartialEq, Copy, Clone)]
30pub enum AccessMode {
31 Exclusive,
32 Relaxed,
33}
34
35fn folder_matches_prefix<'a>(folder: &str, full_prefix: &'a [u8]) -> Option<&'a [u8]> {
36 let (prefix, suffix) = full_prefix
37 .split_at_checked(folder.len())
38 .unwrap_or((full_prefix, &[]));
39 if BStr::new(folder.as_bytes()).starts_with(prefix) {
40 if prefix != full_prefix {
41 suffix.strip_prefix(b"/")
42 } else {
43 Some(suffix)
44 }
45 } else {
46 None
47 }
48}
49
50fn file_matches_prefix(file: &str, prefix: &[u8]) -> bool {
51 #[cfg(not(unix))]
53 use bstr::ByteSlice;
54 #[cfg(not(unix))]
55 let file = file.replace(':', "_");
56 #[cfg(not(unix))]
57 let prefix = prefix.replace(b":", b"_");
58 #[cfg(not(unix))]
59 let prefix = &prefix;
60
61 let prefix = prefix
63 .split_at_checked(file.len())
64 .map(|(prefix, _)| prefix)
65 .unwrap_or(prefix);
66 BStr::new(file.as_bytes()).starts_with(prefix)
67}
68
69fn derive_shard_name<'a>(key: &str, hash_str: &'a str) -> Result<Cow<'a, str>> {
70 let idx = hash_str
71 .find(':')
72 .with_context(|| anyhow!("Missing hash id in key: {key:?}"))?;
73
74 let (shard, _) = hash_str
75 .split_at_checked(idx + 1 + SHARD_ID_SIZE)
76 .with_context(|| anyhow!("Key is too short: {key:?}"))?;
77
78 if cfg!(unix) {
80 Ok(Cow::Borrowed(shard))
81 } else {
82 let shard = shard.replace(':', "_");
83 Ok(Cow::Owned(shard))
84 }
85}
86
87#[derive(Debug)]
88pub struct Database {
89 path: PathBuf,
90 exclusive: Option<Exclusive>,
91}
92
93#[async_trait]
94impl DatabaseClient for Database {
95 async fn add_release(&mut self, fp: &Fingerprint, signed: &Signed) -> Result<String> {
96 let normalized = signed.to_clear_signed()?;
97 let hash = CryptoHash::calculate(&normalized);
98
99 let (key, _new) = self.insert(fp, hash, &normalized).await?;
100 Ok(key)
101 }
102
103 async fn index_from_scan(&mut self, query: &sync::TreeQuery) -> Result<(String, usize)> {
104 sync::index_from_scan(self, query).await
105 }
106
107 async fn spill(&self, prefix: &[u8]) -> Result<Vec<(db::Key, db::Value)>> {
108 let mut out = Vec::new();
109 let stream = self.scan_values(prefix);
110 tokio::pin!(stream);
111 while let Some(item) = stream.next().await {
112 let (hash, data) = item.context("Failed to read from database (spill)")?;
113 out.push((hash, data));
114 }
115 Ok(out)
116 }
117
118 async fn get_value(&self, key: &[u8]) -> Result<db::Value> {
119 let value = self.get(key).await?;
120 let value = value.context("Key not found in database")?;
121 Ok(value)
122 }
123
124 async fn count(&mut self, prefix: &[u8]) -> Result<u64> {
125 let count = self.scan_keys(prefix).count().await;
126 Ok(count as u64)
127 }
128}
129
130impl Database {
131 pub async fn open(config: &Config, mode: AccessMode) -> Result<DatabaseHandle> {
132 #[cfg(unix)]
133 if mode != AccessMode::Exclusive {
134 let sock_path = config.db_socket_path()?;
135 if let Ok(client) = DatabaseUnixClient::connect(&sock_path).await {
136 return Ok(DatabaseHandle::Unix(client));
137 }
138 }
139
140 Ok(DatabaseHandle::Direct(
141 Self::open_directly(config, mode).await?,
142 ))
143 }
144
145 pub async fn open_directly(config: &Config, mode: AccessMode) -> Result<Self> {
146 let path = config.database_path()?;
147 let db = Self::open_at(path, mode).await?;
148 Ok(db)
149 }
150
151 pub async fn open_at(path: PathBuf, mode: AccessMode) -> Result<Self> {
152 debug!("Opening database at {path:?}");
153
154 fs::create_dir_all(&path)
155 .await
156 .with_context(|| anyhow!("Failed to create directory: {path:?}"))?;
157
158 let exclusive = if mode == AccessMode::Exclusive {
159 let exclusive = Exclusive::acquire(&path).await?;
160 Some(exclusive)
161 } else {
162 None
163 };
164
165 Ok(Database { path, exclusive })
166 }
167
168 pub async fn get<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<db::Value>> {
169 let stream = self.scan_values(key.as_ref());
170 tokio::pin!(stream);
171 let Some(entry) = stream.next().await else {
172 return Ok(None);
173 };
174 let entry = entry.context("Failed to read from database (get)")?;
175 Ok(Some(entry.1))
176 }
177
178 pub async fn insert(
179 &mut self,
180 fp: &Fingerprint,
181 hash: CryptoHash,
182 value: &[u8],
183 ) -> Result<(String, bool)> {
184 let fp_str = format!("{fp:X}");
186 let hash_str = hash.as_str();
187 let key = format!("{fp_str}/{hash_str}");
188
189 if self.count(key.as_bytes()).await? > 0 {
190 info!("Skipping document, already present: {key:?}");
191 return Ok((key, false));
192 }
193 info!("Adding document to database: {key:?}");
194
195 let shard = derive_shard_name(&key, hash_str)?;
197
198 let folder = self.path.join(fp_str);
199 fs::create_dir_all(&folder)
200 .await
201 .with_context(|| anyhow!("Failed to create folder: {folder:?}"))?;
202 let path = folder.join(&*shard);
203
204 let mut file = fs::OpenOptions::new()
206 .read(true)
207 .write(true)
208 .append(true)
209 .create(true)
210 .open(&path)
211 .await
212 .with_context(|| anyhow!("Failed to open file: {path:?}"))?;
213
214 let Some(exclusive) = &mut self.exclusive else {
216 bail!("Tried to perform insert on readonly database");
217 };
218 exclusive
219 .ensure_tail_integrity(&path, &mut file)
220 .await
221 .context("Failed to verify tail integrity")?;
222
223 file.seek(SeekFrom::End(0))
225 .await
226 .with_context(|| anyhow!("Failed to seek to end of file: {path:?}"))?;
227
228 let compressed = compression::compress(value)
230 .await
231 .with_context(|| anyhow!("Failed to compress block data: {path:?}"))?;
232 let header = BlockHeader::new(hash, value.len(), compressed.len());
233
234 header
236 .write(&mut file)
237 .await
238 .context("Failed to write block header")?;
239 file.write_all(&compressed)
240 .await
241 .context("Failed to write block data")?;
242
243 Ok((key, true))
244 }
245
246 async fn read_directory_sorted(path: &Path) -> Result<Vec<(PathBuf, String)>> {
247 let mut dir = match fs::read_dir(path).await {
248 Ok(dir) => dir,
249 Err(err) if err.kind() == ErrorKind::NotFound => return Ok(vec![]),
250 Err(err) => {
251 return Err(err).with_context(|| anyhow!("Failed to read directory: {path:?}"));
252 }
253 };
254
255 let mut out = Vec::new();
256 while let Some(entry) = dir
257 .next_entry()
258 .await
259 .with_context(|| anyhow!("Failed to read next directory entry: {path:?}"))?
260 {
261 let path = entry.path();
262
263 let filename = entry
264 .file_name()
265 .into_string()
266 .map_err(|err| anyhow!("Found invalid directory entry name: {err:?}"))?;
267
268 out.push((path, filename));
269 }
270
271 out.sort();
272 Ok(out)
273 }
274
275 async fn read_shard<C: consume::Consume>(
276 path: &Path,
277 folder_name: &str,
278 partitioned_prefix: &[u8],
279 ) -> Result<Vec<(db::Key, C::Item)>> {
280 let file = fs::File::open(path)
281 .await
282 .with_context(|| anyhow!("Failed to open database file: {path:?}"))?;
283
284 let mut out = Vec::new();
285 let mut reader = BufReader::new(file);
286
287 loop {
288 if reader
290 .fill_buf()
291 .await
292 .with_context(|| anyhow!("Failed to check for end of file: {path:?}"))?
293 .is_empty()
294 {
295 break;
297 }
298
299 let (header, _n) = BlockHeader::parse(&mut reader)
300 .await
301 .with_context(|| anyhow!("Failed to read block header: {path:?}"))?;
302
303 if header.hash.0.as_bytes().starts_with(partitioned_prefix) {
304 let data = C::consume(&mut reader, &header)
306 .await
307 .with_context(|| anyhow!("Failed to process block: {path:?}"))?;
308
309 let key = format!("{}/{}", folder_name, header.hash.0);
310 out.push((key.into_bytes(), data));
311 } else {
312 consume::FastSkipValue::consume(&mut reader, &header)
314 .await
315 .with_context(|| anyhow!("Failed to process block: {path:?}"))?;
316 }
317 }
318
319 out.sort();
320 Ok(out)
321 }
322
323 pub fn scan_keys<'a>(&'a self, prefix: &'a [u8]) -> impl Stream<Item = Result<db::Key>> + 'a {
324 self.scan_prefix::<consume::FastSkipValue>(prefix)
325 .map(|item| item.map(|(key, _value)| key))
326 }
327
328 pub fn scan_values<'a>(
329 &'a self,
330 prefix: &'a [u8],
331 ) -> impl Stream<Item = Result<(db::Key, db::Value)>> + 'a {
332 self.scan_prefix::<consume::ReadValue>(prefix)
333 }
334
335 fn scan_prefix<'a, C: consume::Consume>(
336 &'a self,
337 prefix: &'a [u8],
338 ) -> impl Stream<Item = Result<(db::Key, C::Item)>> + 'a {
339 async_stream::try_stream! {
340 for (folder_path, folder_name) in Self::read_directory_sorted(&self.path).await? {
341 if !folder_path.is_dir() {
342 warn!("Found unexpected file in storage folder: {folder_path:?}");
343 continue;
344 }
345
346 let Some(partitioned_prefix) = folder_matches_prefix(&folder_name, prefix)
347 else {
348 continue;
349 };
350
351 for (path, filename) in Self::read_directory_sorted(&folder_path).await? {
352 if !file_matches_prefix(&filename, partitioned_prefix) {
353 continue;
354 }
355
356 for item in Self::read_shard::<C>(&path, &folder_name, partitioned_prefix).await? {
357 yield item;
358 }
359 }
360 }
361 }
362 }
363}
364
365#[cfg(test)]
366mod tests {
367 use super::*;
368
369 #[test]
370 fn test_folder_folder_matches_prefix() {
371 assert_eq!(
372 folder_matches_prefix("ED541312A33F1128F10B1C6C54404762BBB6E853", b""),
373 Some(&b""[..])
374 );
375 assert_eq!(
376 folder_matches_prefix("ED541312A33F1128F10B1C6C54404762BBB6E853", b"E"),
377 Some(&b""[..])
378 );
379 assert_eq!(
380 folder_matches_prefix("ED541312A33F1128F10B1C6C54404762BBB6E853", b"EF"),
381 None
382 );
383 assert_eq!(
384 folder_matches_prefix("ED541312A33F1128F10B1C6C54404762BBB6E853", b"ED541312"),
385 Some(&b""[..])
386 );
387 assert_eq!(
388 folder_matches_prefix(
389 "ED541312A33F1128F10B1C6C54404762BBB6E853",
390 b"ED541312A33F1128F10B1C6C54404762BBB6E853"
391 ),
392 Some(&b""[..])
393 );
394 assert_eq!(
395 folder_matches_prefix(
396 "ED541312A33F1128F10B1C6C54404762BBB6E853",
397 b"ED541312A33F1128F10B1C6C54404762BBB6E853/"
398 ),
399 Some(&b""[..])
400 );
401 assert_eq!(
402 folder_matches_prefix(
403 "ED541312A33F1128F10B1C6C54404762BBB6E853",
404 b"ED541312A33F1128F10B1C6C54404762BBB6E853/sha256:"
405 ),
406 Some(&b"sha256:"[..])
407 );
408 assert_eq!(folder_matches_prefix(
409 "ED541312A33F1128F10B1C6C54404762BBB6E853",
410 b"ED541312A33F1128F10B1C6C54404762BBB6E853/sha256:ffe924d86aa74fdfe8b8d4b8cd9623c5df7aef626a7aada3416dc83e44e7939d"
411 ), Some(&b"sha256:ffe924d86aa74fdfe8b8d4b8cd9623c5df7aef626a7aada3416dc83e44e7939d"[..]));
412 }
413
414 #[test]
415 fn test_folder_folder_matches_prefix_bad_inputs() {
416 assert_eq!(
417 folder_matches_prefix(
418 "ED541312A33F1128F10B1C6C54404762BBB6E853",
419 b"ED541312A33F1128F10B1C6C54404762BBB6E853//"
420 ),
421 Some(&b"/"[..])
422 );
423 assert_eq!(
424 folder_matches_prefix(
425 "ED541312A33F1128F10B1C6C54404762BBB6E853",
426 b"ED541312A33F1128F10B1C6C54404762BBB6E8533"
427 ),
428 None
429 );
430 assert_eq!(
431 folder_matches_prefix(
432 "ED541312A33F1128F10B1C6C54404762BBB6E853",
433 b"ED541312A33F1128F10B1C6C54404762BBB6E85333"
434 ),
435 None
436 );
437 assert_eq!(
438 folder_matches_prefix(
439 "ED541312A33F1128F10B1C6C54404762BBB6E853",
440 b"ED541312A33F1128F10B1C6C54404762BBB6E8533/"
441 ),
442 None
443 );
444 }
445
446 #[test]
447 fn test_file_matches_prefix() {
448 assert!(file_matches_prefix("sha256:ff", b""));
449 assert!(file_matches_prefix("sha256:ff", b"sha"));
450 assert!(file_matches_prefix("sha256:ff", b"sha256:"));
451 assert!(file_matches_prefix("sha256:ff", b"sha256:f"));
452 assert!(file_matches_prefix("sha256:ff", b"sha256:ffe"));
453 assert!(file_matches_prefix(
454 "sha256:ff",
455 b"sha256:ffe924d86aa74fdfe8b8d4b8cd9623c5df7aef626a7aada34"
456 ));
457 assert!(!file_matches_prefix("sha256:ff", b"sha256:e"));
458 assert!(!file_matches_prefix("sha256:ff", b"sha256:fe"));
459 assert!(!file_matches_prefix(
460 "sha512:ff",
461 b"sha256:ffe924d86aa74fdfe8b8d4b8cd9623c5df7aef626a7aada34"
462 ));
463 assert!(!file_matches_prefix(
464 "sha256:ff",
465 b"sha512:ffe924d86aa74fdfe8b8d4b8cd9623c5df7aef626a7aada34"
466 ));
467 }
468}