apt_swarm/db/
exclusive.rs

1use crate::db::consume::{self, Consume};
2use crate::db::header::BlockHeader;
3use crate::errors::*;
4#[cfg(unix)]
5use advisory_lock::{AdvisoryFileLock, FileLockMode};
6use std::collections::BTreeSet;
7use std::path::{Path, PathBuf};
8use tokio::fs;
9use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncSeek, BufReader};
10
11#[derive(Debug)]
12pub struct Lock {
13    // we only need to hold this, but don't use it for anything
14    #[allow(dead_code)]
15    file: fs::File,
16}
17
18impl Lock {
19    #[cfg(unix)]
20    pub async fn acquire(path: &Path) -> Result<Self> {
21        debug!("Acquiring exclusive lock on directory: {path:?}");
22        let file = fs::File::open(path)
23            .await
24            .with_context(|| anyhow!("Failed to open directory: {path:?}"))?;
25        let file = file.into_std().await;
26        AdvisoryFileLock::try_lock(&file, FileLockMode::Exclusive)
27            .with_context(|| anyhow!("Failed to acquire exclusive lock for: {path:?}"))?;
28        debug!("Successfully acquired exclusive lock");
29        let file = file.into();
30        Ok(Self { file })
31    }
32
33    #[cfg(not(unix))]
34    pub async fn acquire(path: &Path) -> Result<Self> {
35        let path = path.join("lock");
36        debug!("Acquiring exclusive lock on file: {path:?}");
37        let file = fs::OpenOptions::new()
38            .write(true)
39            .create(true)
40            .append(true)
41            // do not allow others to read or modify this file while we have it open
42            .share_mode(0)
43            .open(&path)
44            .await
45            .with_context(|| anyhow!("Failed to acquire exclusive lock for: {path:?}"))?;
46        Ok(Self { file })
47    }
48}
49
50#[derive(Debug)]
51pub struct Exclusive {
52    // we only need to hold this, but don't use it for anything
53    #[allow(dead_code)]
54    lock: Lock,
55    verified_shards: BTreeSet<PathBuf>,
56}
57
58impl Exclusive {
59    pub async fn acquire(path: &Path) -> Result<Self> {
60        let lock = Lock::acquire(path).await?;
61        Ok(Exclusive {
62            lock,
63            verified_shards: BTreeSet::new(),
64        })
65    }
66
67    #[cfg(test)]
68    pub fn dummy() -> Result<Self> {
69        let file = tempfile::tempfile()?;
70        Ok(Exclusive {
71            lock: Lock {
72                file: fs::File::from_std(file),
73            },
74            verified_shards: BTreeSet::new(),
75        })
76    }
77
78    async fn verify_next_block<R: AsyncRead + AsyncSeek + Unpin + Send>(
79        path: &Path,
80        mut reader: R,
81    ) -> Result<u64> {
82        let (header, n) = BlockHeader::parse(&mut reader)
83            .await
84            .with_context(|| anyhow!("Failed to read block header: {path:?}"))?;
85
86        // skip over data, verify the expected number of bytes is present
87        consume::CheckedSkipValue::consume(&mut reader, &header)
88            .await
89            .with_context(|| anyhow!("Failed to process block: {path:?}"))?;
90        trace!("Successfully verified block data is present");
91
92        Ok(n as u64 + header.data_length)
93    }
94
95    pub async fn ensure_tail_integrity<P: AsRef<Path>>(
96        &mut self,
97        path: P,
98        file: &mut fs::File,
99    ) -> Result<()> {
100        let path = path.as_ref();
101        if !self.verified_shards.contains(path) {
102            debug!("Verifying tail integrity of on-disk file: {path:?}");
103            let mut last_valid_offset = 0;
104            let mut reader = BufReader::new(&mut *file);
105            loop {
106                // check if more data is available
107                if reader
108                    .fill_buf()
109                    .await
110                    .with_context(|| anyhow!("Failed to check for end of file: {path:?}"))?
111                    .is_empty()
112                {
113                    // reached EOF
114                    break;
115                }
116
117                // verify next block is fully present on disk
118                match Self::verify_next_block(path, &mut reader).await {
119                    Ok(n) => {
120                        last_valid_offset += n;
121                    }
122                    Err(err) => {
123                        warn!("File contains partial block, truncating to end of last valid block (offset={last_valid_offset}): {err:#}");
124                        file.set_len(last_valid_offset).await.with_context(|| {
125                            anyhow!("Failed to truncate file to last valid offset: {path:?}")
126                        })?;
127                        break;
128                    }
129                }
130            }
131
132            self.verified_shards.insert(path.to_owned());
133            debug!("Verified tail integrity of on-disk file: {path:?}");
134        }
135
136        Ok(())
137    }
138}
139
140#[cfg(test)]
141mod tests {
142    use super::*;
143    use tokio::io::{self, AsyncSeekExt, AsyncWriteExt};
144
145    fn init() {
146        let _ = env_logger::builder().is_test(true).try_init();
147    }
148
149    fn tempfile() -> Result<fs::File> {
150        Ok(fs::File::from_std(tempfile::tempfile()?))
151    }
152
153    async fn file_to_buf(file: &mut fs::File) -> Result<Vec<u8>> {
154        file.rewind().await?;
155        let mut buf = Vec::new();
156        io::copy(file, &mut buf)
157            .await
158            .context("Failed to read to buffer")?;
159        Ok(buf)
160    }
161
162    #[tokio::test]
163    async fn test_lock_directory() {
164        init();
165        let dir = tempfile::tempdir().unwrap();
166        let _lock = Lock::acquire(dir.path()).await.unwrap();
167        let err = Lock::acquire(dir.path()).await.err().unwrap().to_string();
168        let (err, _) = err.split_once(": ").unwrap();
169        assert_eq!(err, "Failed to acquire exclusive lock for");
170    }
171
172    #[tokio::test]
173    async fn test_release_lock() {
174        init();
175        let dir = tempfile::tempdir().unwrap();
176        {
177            let _lock = Lock::acquire(dir.path()).await.unwrap();
178            let err = Lock::acquire(dir.path()).await.err().unwrap().to_string();
179            let (err, _) = err.split_once(": ").unwrap();
180            assert_eq!(err, "Failed to acquire exclusive lock for");
181        }
182        let _lock = Lock::acquire(dir.path()).await.unwrap();
183    }
184
185    fn bytes_block1() -> Vec<u8> {
186        let mut bytes = Vec::new();
187        bytes.extend(39u16.to_be_bytes());
188        bytes.extend(b"sha256:");
189        bytes.extend([
190            0xe8, 0x47, 0x12, 0x23, 0x87, 0x09, 0x39, 0x8f, 0x6d, 0x34, 0x9d, 0xc2, 0x25, 0x0b,
191            0x0e, 0xfc, 0xa4, 0xb7, 0x2d, 0x8c, 0x2b, 0xfb, 0x7b, 0x74, 0x33, 0x9d, 0x30, 0xba,
192            0x94, 0x05, 0x6b, 0x14,
193        ]);
194        bytes.extend(1337u64.to_be_bytes());
195        bytes.extend(4u64.to_be_bytes());
196        bytes.extend(b"ohai");
197        bytes
198    }
199
200    fn bytes_block2() -> Vec<u8> {
201        let mut bytes = Vec::new();
202        bytes.extend(39u16.to_be_bytes());
203        bytes.extend(b"sha256:");
204        bytes.extend([
205            0xa8, 0xf0, 0xaf, 0x3c, 0x68, 0xac, 0xb0, 0x82, 0xa4, 0x65, 0xc9, 0x68, 0x0e, 0x79,
206            0x02, 0x61, 0x55, 0xcb, 0x56, 0x69, 0x2d, 0xa7, 0x36, 0x4d, 0xf7, 0x37, 0xc4, 0xe4,
207            0x75, 0xb7, 0x3a, 0x3a,
208        ]);
209        bytes.extend(1337u64.to_be_bytes());
210        bytes.extend(20u64.to_be_bytes());
211        bytes.extend(b"hello world, it's me");
212        bytes
213    }
214
215    #[tokio::test]
216    async fn test_tail_integrity_one_block() {
217        init();
218
219        // write data
220        let mut file = tempfile().unwrap();
221        file.write_all(&bytes_block1()).await.unwrap();
222        file.rewind().await.unwrap();
223
224        // verify
225        let mut exclusive = Exclusive::dummy().unwrap();
226        exclusive
227            .ensure_tail_integrity("/tmp/apt-swarm/sha256:xx", &mut file)
228            .await
229            .unwrap();
230
231        let buf = file_to_buf(&mut file).await.unwrap();
232        assert_eq!(
233            buf,
234            &[
235                0, 39, 115, 104, 97, 50, 53, 54, 58, 232, 71, 18, 35, 135, 9, 57, 143, 109, 52,
236                157, 194, 37, 11, 14, 252, 164, 183, 45, 140, 43, 251, 123, 116, 51, 157, 48, 186,
237                148, 5, 107, 20, 0, 0, 0, 0, 0, 0, 5, 57, 0, 0, 0, 0, 0, 0, 0, 4, 111, 104, 97,
238                105,
239            ]
240        );
241    }
242
243    #[tokio::test]
244    async fn test_tail_integrity_two_blocks() {
245        init();
246
247        // write data
248        let mut file = tempfile().unwrap();
249        file.write_all(&bytes_block1()).await.unwrap();
250        file.write_all(&bytes_block2()).await.unwrap();
251        file.rewind().await.unwrap();
252
253        // verify
254        let mut exclusive = Exclusive::dummy().unwrap();
255        exclusive
256            .ensure_tail_integrity("/tmp/apt-swarm/sha256:xx", &mut file)
257            .await
258            .unwrap();
259
260        let buf = file_to_buf(&mut file).await.unwrap();
261        assert_eq!(
262            buf,
263            &[
264                0, 39, 115, 104, 97, 50, 53, 54, 58, 232, 71, 18, 35, 135, 9, 57, 143, 109, 52,
265                157, 194, 37, 11, 14, 252, 164, 183, 45, 140, 43, 251, 123, 116, 51, 157, 48, 186,
266                148, 5, 107, 20, 0, 0, 0, 0, 0, 0, 5, 57, 0, 0, 0, 0, 0, 0, 0, 4, 111, 104, 97,
267                105, 0, 39, 115, 104, 97, 50, 53, 54, 58, 168, 240, 175, 60, 104, 172, 176, 130,
268                164, 101, 201, 104, 14, 121, 2, 97, 85, 203, 86, 105, 45, 167, 54, 77, 247, 55,
269                196, 228, 117, 183, 58, 58, 0, 0, 0, 0, 0, 0, 5, 57, 0, 0, 0, 0, 0, 0, 0, 20, 104,
270                101, 108, 108, 111, 32, 119, 111, 114, 108, 100, 44, 32, 105, 116, 39, 115, 32,
271                109, 101,
272            ]
273        );
274    }
275
276    #[tokio::test]
277    async fn test_tail_integrity_empty() {
278        init();
279
280        let mut file = tempfile().unwrap();
281        let mut exclusive = Exclusive::dummy().unwrap();
282        exclusive
283            .ensure_tail_integrity("/tmp/apt-swarm/sha256:xx", &mut file)
284            .await
285            .unwrap();
286        assert_eq!(
287            exclusive.verified_shards,
288            ["/tmp/apt-swarm/sha256:xx".into()].into_iter().collect()
289        );
290
291        let buf = file_to_buf(&mut file).await.unwrap();
292        assert_eq!(buf, b"");
293    }
294
295    #[tokio::test]
296    async fn test_tail_integrity_first_block_truncated() {
297        init();
298
299        // write data (test with partial block header)
300        let mut file = tempfile().unwrap();
301        file.write_all(&bytes_block1()[..43]).await.unwrap();
302        file.rewind().await.unwrap();
303
304        // verify
305        let mut exclusive = Exclusive::dummy().unwrap();
306        exclusive
307            .ensure_tail_integrity("/tmp/apt-swarm/sha256:xx", &mut file)
308            .await
309            .unwrap();
310
311        let buf = file_to_buf(&mut file).await.unwrap();
312        assert_eq!(buf, b"");
313    }
314
315    #[tokio::test]
316    async fn test_tail_integrity_second_block_truncated() {
317        init();
318
319        // write data (test with partial block data)
320        let mut file = tempfile().unwrap();
321        file.write_all(&bytes_block1()).await.unwrap();
322        file.write_all(&bytes_block2()[..58]).await.unwrap();
323        file.rewind().await.unwrap();
324
325        // verify
326        let mut exclusive = Exclusive::dummy().unwrap();
327        exclusive
328            .ensure_tail_integrity("/tmp/apt-swarm/sha256:xx", &mut file)
329            .await
330            .unwrap();
331
332        let buf = file_to_buf(&mut file).await.unwrap();
333        assert_eq!(
334            buf,
335            &[
336                0, 39, 115, 104, 97, 50, 53, 54, 58, 232, 71, 18, 35, 135, 9, 57, 143, 109, 52,
337                157, 194, 37, 11, 14, 252, 164, 183, 45, 140, 43, 251, 123, 116, 51, 157, 48, 186,
338                148, 5, 107, 20, 0, 0, 0, 0, 0, 0, 5, 57, 0, 0, 0, 0, 0, 0, 0, 4, 111, 104, 97,
339                105,
340            ]
341        );
342    }
343}