1use crate::db::consume::{self, Consume};
2use crate::db::header::BlockHeader;
3use crate::errors::*;
4#[cfg(unix)]
5use advisory_lock::{AdvisoryFileLock, FileLockMode};
6use std::collections::BTreeSet;
7use std::path::{Path, PathBuf};
8use tokio::fs;
9use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncSeek, BufReader};
10
11#[derive(Debug)]
12pub struct Lock {
13 #[allow(dead_code)]
15 file: fs::File,
16}
17
18impl Lock {
19 #[cfg(unix)]
20 pub async fn acquire(path: &Path) -> Result<Self> {
21 debug!("Acquiring exclusive lock on directory: {path:?}");
22 let file = fs::File::open(path)
23 .await
24 .with_context(|| anyhow!("Failed to open directory: {path:?}"))?;
25 let file = file.into_std().await;
26 AdvisoryFileLock::try_lock(&file, FileLockMode::Exclusive)
27 .with_context(|| anyhow!("Failed to acquire exclusive lock for: {path:?}"))?;
28 debug!("Successfully acquired exclusive lock");
29 let file = file.into();
30 Ok(Self { file })
31 }
32
33 #[cfg(not(unix))]
34 pub async fn acquire(path: &Path) -> Result<Self> {
35 let path = path.join("lock");
36 debug!("Acquiring exclusive lock on file: {path:?}");
37 let file = fs::OpenOptions::new()
38 .write(true)
39 .create(true)
40 .append(true)
41 .share_mode(0)
43 .open(&path)
44 .await
45 .with_context(|| anyhow!("Failed to acquire exclusive lock for: {path:?}"))?;
46 Ok(Self { file })
47 }
48}
49
50#[derive(Debug)]
51pub struct Exclusive {
52 #[allow(dead_code)]
54 lock: Lock,
55 verified_shards: BTreeSet<PathBuf>,
56}
57
58impl Exclusive {
59 pub async fn acquire(path: &Path) -> Result<Self> {
60 let lock = Lock::acquire(path).await?;
61 Ok(Exclusive {
62 lock,
63 verified_shards: BTreeSet::new(),
64 })
65 }
66
67 #[cfg(test)]
68 pub fn dummy() -> Result<Self> {
69 let file = tempfile::tempfile()?;
70 Ok(Exclusive {
71 lock: Lock {
72 file: fs::File::from_std(file),
73 },
74 verified_shards: BTreeSet::new(),
75 })
76 }
77
78 async fn verify_next_block<R: AsyncRead + AsyncSeek + Unpin + Send>(
79 path: &Path,
80 mut reader: R,
81 ) -> Result<u64> {
82 let (header, n) = BlockHeader::parse(&mut reader)
83 .await
84 .with_context(|| anyhow!("Failed to read block header: {path:?}"))?;
85
86 consume::CheckedSkipValue::consume(&mut reader, &header)
88 .await
89 .with_context(|| anyhow!("Failed to process block: {path:?}"))?;
90 trace!("Successfully verified block data is present");
91
92 Ok(n as u64 + header.data_length)
93 }
94
95 pub async fn ensure_tail_integrity<P: AsRef<Path>>(
96 &mut self,
97 path: P,
98 file: &mut fs::File,
99 ) -> Result<()> {
100 let path = path.as_ref();
101 if !self.verified_shards.contains(path) {
102 debug!("Verifying tail integrity of on-disk file: {path:?}");
103 let mut last_valid_offset = 0;
104 let mut reader = BufReader::new(&mut *file);
105 loop {
106 if reader
108 .fill_buf()
109 .await
110 .with_context(|| anyhow!("Failed to check for end of file: {path:?}"))?
111 .is_empty()
112 {
113 break;
115 }
116
117 match Self::verify_next_block(path, &mut reader).await {
119 Ok(n) => {
120 last_valid_offset += n;
121 }
122 Err(err) => {
123 warn!("File contains partial block, truncating to end of last valid block (offset={last_valid_offset}): {err:#}");
124 file.set_len(last_valid_offset).await.with_context(|| {
125 anyhow!("Failed to truncate file to last valid offset: {path:?}")
126 })?;
127 break;
128 }
129 }
130 }
131
132 self.verified_shards.insert(path.to_owned());
133 debug!("Verified tail integrity of on-disk file: {path:?}");
134 }
135
136 Ok(())
137 }
138}
139
140#[cfg(test)]
141mod tests {
142 use super::*;
143 use tokio::io::{self, AsyncSeekExt, AsyncWriteExt};
144
145 fn init() {
146 let _ = env_logger::builder().is_test(true).try_init();
147 }
148
149 fn tempfile() -> Result<fs::File> {
150 Ok(fs::File::from_std(tempfile::tempfile()?))
151 }
152
153 async fn file_to_buf(file: &mut fs::File) -> Result<Vec<u8>> {
154 file.rewind().await?;
155 let mut buf = Vec::new();
156 io::copy(file, &mut buf)
157 .await
158 .context("Failed to read to buffer")?;
159 Ok(buf)
160 }
161
162 #[tokio::test]
163 async fn test_lock_directory() {
164 init();
165 let dir = tempfile::tempdir().unwrap();
166 let _lock = Lock::acquire(dir.path()).await.unwrap();
167 let err = Lock::acquire(dir.path()).await.err().unwrap().to_string();
168 let (err, _) = err.split_once(": ").unwrap();
169 assert_eq!(err, "Failed to acquire exclusive lock for");
170 }
171
172 #[tokio::test]
173 async fn test_release_lock() {
174 init();
175 let dir = tempfile::tempdir().unwrap();
176 {
177 let _lock = Lock::acquire(dir.path()).await.unwrap();
178 let err = Lock::acquire(dir.path()).await.err().unwrap().to_string();
179 let (err, _) = err.split_once(": ").unwrap();
180 assert_eq!(err, "Failed to acquire exclusive lock for");
181 }
182 let _lock = Lock::acquire(dir.path()).await.unwrap();
183 }
184
185 fn bytes_block1() -> Vec<u8> {
186 let mut bytes = Vec::new();
187 bytes.extend(39u16.to_be_bytes());
188 bytes.extend(b"sha256:");
189 bytes.extend([
190 0xe8, 0x47, 0x12, 0x23, 0x87, 0x09, 0x39, 0x8f, 0x6d, 0x34, 0x9d, 0xc2, 0x25, 0x0b,
191 0x0e, 0xfc, 0xa4, 0xb7, 0x2d, 0x8c, 0x2b, 0xfb, 0x7b, 0x74, 0x33, 0x9d, 0x30, 0xba,
192 0x94, 0x05, 0x6b, 0x14,
193 ]);
194 bytes.extend(1337u64.to_be_bytes());
195 bytes.extend(4u64.to_be_bytes());
196 bytes.extend(b"ohai");
197 bytes
198 }
199
200 fn bytes_block2() -> Vec<u8> {
201 let mut bytes = Vec::new();
202 bytes.extend(39u16.to_be_bytes());
203 bytes.extend(b"sha256:");
204 bytes.extend([
205 0xa8, 0xf0, 0xaf, 0x3c, 0x68, 0xac, 0xb0, 0x82, 0xa4, 0x65, 0xc9, 0x68, 0x0e, 0x79,
206 0x02, 0x61, 0x55, 0xcb, 0x56, 0x69, 0x2d, 0xa7, 0x36, 0x4d, 0xf7, 0x37, 0xc4, 0xe4,
207 0x75, 0xb7, 0x3a, 0x3a,
208 ]);
209 bytes.extend(1337u64.to_be_bytes());
210 bytes.extend(20u64.to_be_bytes());
211 bytes.extend(b"hello world, it's me");
212 bytes
213 }
214
215 #[tokio::test]
216 async fn test_tail_integrity_one_block() {
217 init();
218
219 let mut file = tempfile().unwrap();
221 file.write_all(&bytes_block1()).await.unwrap();
222 file.rewind().await.unwrap();
223
224 let mut exclusive = Exclusive::dummy().unwrap();
226 exclusive
227 .ensure_tail_integrity("/tmp/apt-swarm/sha256:xx", &mut file)
228 .await
229 .unwrap();
230
231 let buf = file_to_buf(&mut file).await.unwrap();
232 assert_eq!(
233 buf,
234 &[
235 0, 39, 115, 104, 97, 50, 53, 54, 58, 232, 71, 18, 35, 135, 9, 57, 143, 109, 52,
236 157, 194, 37, 11, 14, 252, 164, 183, 45, 140, 43, 251, 123, 116, 51, 157, 48, 186,
237 148, 5, 107, 20, 0, 0, 0, 0, 0, 0, 5, 57, 0, 0, 0, 0, 0, 0, 0, 4, 111, 104, 97,
238 105,
239 ]
240 );
241 }
242
243 #[tokio::test]
244 async fn test_tail_integrity_two_blocks() {
245 init();
246
247 let mut file = tempfile().unwrap();
249 file.write_all(&bytes_block1()).await.unwrap();
250 file.write_all(&bytes_block2()).await.unwrap();
251 file.rewind().await.unwrap();
252
253 let mut exclusive = Exclusive::dummy().unwrap();
255 exclusive
256 .ensure_tail_integrity("/tmp/apt-swarm/sha256:xx", &mut file)
257 .await
258 .unwrap();
259
260 let buf = file_to_buf(&mut file).await.unwrap();
261 assert_eq!(
262 buf,
263 &[
264 0, 39, 115, 104, 97, 50, 53, 54, 58, 232, 71, 18, 35, 135, 9, 57, 143, 109, 52,
265 157, 194, 37, 11, 14, 252, 164, 183, 45, 140, 43, 251, 123, 116, 51, 157, 48, 186,
266 148, 5, 107, 20, 0, 0, 0, 0, 0, 0, 5, 57, 0, 0, 0, 0, 0, 0, 0, 4, 111, 104, 97,
267 105, 0, 39, 115, 104, 97, 50, 53, 54, 58, 168, 240, 175, 60, 104, 172, 176, 130,
268 164, 101, 201, 104, 14, 121, 2, 97, 85, 203, 86, 105, 45, 167, 54, 77, 247, 55,
269 196, 228, 117, 183, 58, 58, 0, 0, 0, 0, 0, 0, 5, 57, 0, 0, 0, 0, 0, 0, 0, 20, 104,
270 101, 108, 108, 111, 32, 119, 111, 114, 108, 100, 44, 32, 105, 116, 39, 115, 32,
271 109, 101,
272 ]
273 );
274 }
275
276 #[tokio::test]
277 async fn test_tail_integrity_empty() {
278 init();
279
280 let mut file = tempfile().unwrap();
281 let mut exclusive = Exclusive::dummy().unwrap();
282 exclusive
283 .ensure_tail_integrity("/tmp/apt-swarm/sha256:xx", &mut file)
284 .await
285 .unwrap();
286 assert_eq!(
287 exclusive.verified_shards,
288 ["/tmp/apt-swarm/sha256:xx".into()].into_iter().collect()
289 );
290
291 let buf = file_to_buf(&mut file).await.unwrap();
292 assert_eq!(buf, b"");
293 }
294
295 #[tokio::test]
296 async fn test_tail_integrity_first_block_truncated() {
297 init();
298
299 let mut file = tempfile().unwrap();
301 file.write_all(&bytes_block1()[..43]).await.unwrap();
302 file.rewind().await.unwrap();
303
304 let mut exclusive = Exclusive::dummy().unwrap();
306 exclusive
307 .ensure_tail_integrity("/tmp/apt-swarm/sha256:xx", &mut file)
308 .await
309 .unwrap();
310
311 let buf = file_to_buf(&mut file).await.unwrap();
312 assert_eq!(buf, b"");
313 }
314
315 #[tokio::test]
316 async fn test_tail_integrity_second_block_truncated() {
317 init();
318
319 let mut file = tempfile().unwrap();
321 file.write_all(&bytes_block1()).await.unwrap();
322 file.write_all(&bytes_block2()[..58]).await.unwrap();
323 file.rewind().await.unwrap();
324
325 let mut exclusive = Exclusive::dummy().unwrap();
327 exclusive
328 .ensure_tail_integrity("/tmp/apt-swarm/sha256:xx", &mut file)
329 .await
330 .unwrap();
331
332 let buf = file_to_buf(&mut file).await.unwrap();
333 assert_eq!(
334 buf,
335 &[
336 0, 39, 115, 104, 97, 50, 53, 54, 58, 232, 71, 18, 35, 135, 9, 57, 143, 109, 52,
337 157, 194, 37, 11, 14, 252, 164, 183, 45, 140, 43, 251, 123, 116, 51, 157, 48, 186,
338 148, 5, 107, 20, 0, 0, 0, 0, 0, 0, 5, 57, 0, 0, 0, 0, 0, 0, 0, 4, 111, 104, 97,
339 105,
340 ]
341 );
342 }
343}