1use crate::db::{Database, DatabaseClient};
2use crate::errors::*;
3use crate::keyring::Keyring;
4use crate::p2p::peerdb;
5use crate::signed::Signed;
6use bstr::BStr;
7use futures::StreamExt;
8use indexmap::{IndexMap, IndexSet};
9use sequoia_openpgp::Fingerprint;
10use sha2::{Digest, Sha256};
11use std::borrow::Cow;
12use std::collections::{BTreeMap, VecDeque};
13use std::fmt;
14use std::str;
15use std::str::FromStr;
16use std::time::Duration;
17use tokio::io;
18use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
19use tokio::time;
20
21pub const MAX_LINE_LENGTH: u64 = 512;
22
23pub const SYNC_INDEX_TIMEOUT: Duration = Duration::from_secs(120);
24pub const SYNC_READ_TIMEOUT: Duration = Duration::from_secs(30);
25
26pub const BATCH_INDEX_MAX_SIZE: usize = 16;
28
29pub const SPILL_THRESHOLD: usize = 1;
31
32const PEX_MAX_SUCCESS_AGE: Duration = Duration::from_secs(3600 * 24 * 5);
34
35#[derive(Debug, Clone)]
36pub enum Query {
37 Tree(TreeQuery),
38 Pex,
39 Unknown(String),
40}
41
42impl Query {
43 pub fn from_bytes(bytes: &[u8]) -> Result<Self> {
44 let line = bytes.strip_suffix(b"\n").unwrap_or(bytes);
45 let line = str::from_utf8(line).context("Query contains invalid utf8")?;
46 let query = line
47 .parse()
48 .with_context(|| anyhow!("Failed to parse input as query: {line:?}"))?;
49 Ok(query)
50 }
51}
52
53impl FromStr for Query {
54 type Err = Error;
55
56 fn from_str(query: &str) -> Result<Self> {
57 if let Some(cmd) = query.strip_prefix("//") {
58 if cmd == "pex" {
59 Ok(Query::Pex)
60 } else {
61 Ok(Query::Unknown(cmd.to_string()))
62 }
63 } else {
64 let query = query
65 .parse()
66 .context("Failed to parse input as tree-query")?;
67 Ok(Query::Tree(query))
68 }
69 }
70}
71
72#[derive(Debug, Clone)]
73pub struct TreeQuery {
74 pub fp: Fingerprint,
75 pub hash_algo: String,
76 pub prefix: Option<String>,
77}
78
79impl TreeQuery {
80 pub async fn write_to<W: AsyncWrite + Unpin>(&self, mut tx: W) -> Result<()> {
81 let mut out = format!("{:X} {}", self.fp, self.hash_algo);
82 if let Some(prefix) = &self.prefix {
83 out.push(' ');
84 out.push_str(prefix);
85 }
86 out.push('\n');
87 tx.write_all(out.as_bytes()).await?;
88 Ok(())
89 }
90
91 pub fn increment(&mut self) -> bool {
93 if let Some(prefix) = &mut self.prefix {
94 if prefix.ends_with('f') {
95 prefix.pop();
96 true
97 } else if let Some(c) = prefix.pop() {
98 let c = match c {
99 '0'..='8' | 'a'..='e' => (c as u8 + 1) as char,
100 '9' => 'a',
101 _ => c,
102 };
103 prefix.push(c);
104 false
105 } else {
106 false
108 }
109 } else {
110 debug!("Peers are already in sync, nothing to do here");
111 self.prefix = Some(String::new());
112 false
113 }
114 }
115
116 pub fn enter(&mut self) {
118 if let Some(prefix) = &mut self.prefix {
119 prefix.push('0');
120 } else {
121 self.prefix = Some("0".to_string());
122 }
123 }
124}
125
126impl fmt::Display for TreeQuery {
127 fn fmt(&self, w: &mut fmt::Formatter) -> fmt::Result {
128 let prefix = self.prefix.as_deref().unwrap_or("");
129 write!(w, "{:X}/{}:{}", self.fp, self.hash_algo, prefix)?;
130 Ok(())
131 }
132}
133
134impl FromStr for TreeQuery {
135 type Err = Error;
136
137 fn from_str(s: &str) -> Result<Self> {
138 let mut s = s.split(' ');
139 let fp = Fingerprint::from_str(s.next().context("Missing fingerprint")?)
140 .context("Invalid fingerprint")?;
141
142 let hash_algo = s.next().context("Missing hash algo")?;
143 if hash_algo != "sha256" {
144 bail!("Only sha256 is supported at the moment");
145 }
146
147 let prefix = s.next().map(String::from);
148
149 if let Some(garbage) = s.next() {
150 bail!("Detected trailing data, rejecting as invalid: {garbage:?}");
151 }
152
153 Ok(TreeQuery {
154 fp,
155 hash_algo: hash_algo.to_string(),
156 prefix,
157 })
158 }
159}
160
161#[derive(Debug, Default)]
162pub struct BatchIndex {
163 index: IndexMap<String, (String, usize)>,
164}
165
166impl BatchIndex {
167 pub fn new() -> Self {
168 BatchIndex::default()
169 }
170
171 pub fn add(&mut self, index: String, prefix: String, count: usize) -> Result<()> {
172 if self.index.len() < BATCH_INDEX_MAX_SIZE {
173 self.index.insert(prefix, (index, count));
174 Ok(())
175 } else {
176 bail!(
177 "Batch index is already at max capacity: {:?}",
178 self.index.len()
179 )
180 }
181 }
182
183 pub async fn write_to<W: AsyncWrite + Unpin>(&self, mut sink: W) -> Result<()> {
184 for (prefix, (index, count)) in &self.index {
185 sink.write_all(format!("{index} {prefix} {count}\n").as_bytes())
186 .await?;
187 }
188 Ok(())
189 }
190
191 pub fn parse_line(&mut self, line: &[u8]) -> Result<()> {
192 let line = line.strip_suffix(b"\n").unwrap_or(line);
193 let line = str::from_utf8(line).context("Response contains invalid utf8")?;
194
195 let mut s = line.split(' ');
196 let index = s.next().context("Missing index from response")?;
197 let prefix = s.next().context("Failed to get prefix for index")?;
198 let count = s.next().context("Failed to get number of children")?;
199 let count = count
200 .parse()
201 .context("Number of children is not a number")?;
202
203 self.add(index.to_string(), prefix.to_string(), count)
204 }
205
206 pub fn get(&self, key: &str) -> Option<&(String, usize)> {
207 self.index.get(key)
208 }
209
210 pub fn keys(&self) -> indexmap::map::Keys<String, (String, usize)> {
211 self.index.keys()
212 }
213
214 pub fn clear(&mut self) {
215 self.index.clear();
216 }
217}
218
219pub async fn index_from_scan(db: &Database, query: &TreeQuery) -> Result<(String, usize)> {
220 let prefix = query.to_string();
221
222 let mut counter = 0;
223 let mut hasher = Sha256::new();
224
225 let stream = db.scan_keys(prefix.as_bytes());
226 tokio::pin!(stream);
227 while let Some(item) = stream.next().await {
228 let hash = item.context("Failed to read from database (index_from_scan)")?;
229 hasher.update(&hash);
230 hasher.update(b"\n");
231 counter += 1;
232 }
233
234 let result = hasher.finalize();
235 Ok((format!("sha256:{result:x}"), counter))
236}
237
238pub async fn sync_yield<
239 D: DatabaseClient + Sync + Send,
240 R: AsyncRead + Unpin,
241 W: AsyncWrite + Unpin,
242>(
243 db: &mut D,
244 peerdb: Option<peerdb::Client>,
245 rx: R,
246 mut tx: W,
247 timeout: Option<Duration>,
248) -> Result<()> {
249 let mut rx = io::BufReader::new(rx);
250 loop {
251 let mut line = Vec::new();
252 let mut rrx = (&mut rx).take(MAX_LINE_LENGTH);
253 let read = rrx.read_until(b'\n', &mut line);
254
255 let n = if let Some(timeout) = timeout {
256 if let Ok(n) = time::timeout(timeout, read).await {
257 n
258 } else {
259 break;
260 }
261 } else {
262 read.await
263 }?;
264
265 if n == 0 {
266 break;
267 }
268
269 if !line.ends_with(b"\n") {
270 bail!(
271 "Client sent invalid request, exceeding size limit: {:?}",
272 BStr::new(&line)
273 );
274 }
275
276 let query = Query::from_bytes(&line)?;
277 trace!("Received query: {:?}", query);
278 match query {
279 Query::Tree(mut query) => {
280 let (index, total) = db.batch_index_from_scan(&mut query).await?;
281
282 if total > 0 && total <= SPILL_THRESHOLD {
283 let prefix = query.to_string();
284 for (hash, data) in db.spill(prefix.as_bytes()).await? {
285 trace!("Sending data packet to client: {:?}", BStr::new(&hash));
286 tx.write_all(format!(":{:x}\n", data.len()).as_bytes())
287 .await?;
288 tx.write_all(&data).await?;
289 }
290 tx.write_all(b":0\n").await?;
291 } else {
292 index.write_to(&mut tx).await?;
293 tx.write_all(b"\n").await?;
294 }
295 }
296 Query::Pex => {
297 let mut buf = String::new();
298 if let Some(peerdb) = &peerdb {
299 for addr in peerdb.sample(Some(PEX_MAX_SUCCESS_AGE)).await? {
300 buf += &format!("{addr}\n");
301 }
302 }
303 tx.write_all(format!(":{}\n", buf.len()).as_bytes()).await?;
304 tx.write_all(buf.as_bytes()).await?;
305 }
306 Query::Unknown(data) => {
307 debug!("Received unknown command from network: {data:?}");
308 tx.write_all(b":0\n").await?;
309 }
310 }
311 }
312
313 Ok(())
314}
315
316#[derive(Debug, Default)]
317pub struct SyncQueue {
318 queues: BTreeMap<usize, VecDeque<Option<String>>>,
319}
320
321impl SyncQueue {
322 pub fn push(&mut self, key: Option<String>) {
323 let len = key.as_ref().map(|s| s.len()).unwrap_or(0);
324 let queue = self.queues.entry(len).or_default();
325 queue.push_back(key);
326 }
327
328 pub fn pop_next(&mut self) -> Option<Option<String>> {
329 loop {
330 let mut entry = self.queues.last_entry()?;
331 let queue = entry.get_mut();
332 if let Some(item) = queue.pop_front() {
333 return Some(item);
334 } else {
335 entry.remove_entry();
336 }
337 }
338 }
339}
340
341pub async fn sync_pull_key<
342 D: DatabaseClient + Sync + Send,
343 R: AsyncRead + Unpin,
344 W: AsyncWrite + Unpin,
345>(
346 db: &mut D,
347 keyring: &Keyring,
348 fp: &Fingerprint,
349 dry_run: bool,
350 mut tx: W,
351 rx: &mut io::BufReader<R>,
352) -> Result<()> {
353 let mut query = TreeQuery {
354 fp: fp.clone(),
355 hash_algo: "sha256".to_string(),
356 prefix: None,
357 };
358
359 let mut queue = SyncQueue::default();
360 queue.push(None);
361
362 while let Some(item) = queue.pop_next() {
363 query.prefix = item;
364 info!("Requesting index for: {:?}", query.to_string());
365 query.write_to(&mut tx).await?;
366
367 let (our_index, _our_count) = db.batch_index_from_scan(&mut query).await?;
368 trace!("Our index: {our_index:?}");
369
370 let mut line = Vec::new();
371 let mut their_index = BatchIndex::new();
372
373 loop {
374 line.clear();
375
376 let mut rrx = rx.take(MAX_LINE_LENGTH);
377 let read = rrx.read_until(b'\n', &mut line);
378 let n = time::timeout(SYNC_INDEX_TIMEOUT, read)
379 .await
380 .context("Request for index timed out")?
381 .context("Failed to receive response from peer")?;
382
383 if n == 0 {
384 bail!("Reached unexpected eof while enumerating service");
385 }
386
387 if !line.ends_with(b"\n") {
388 bail!(
389 "Server sent invalid line, exceeding size limit: {:?}",
390 BStr::new(&line)
391 );
392 }
393
394 if line == b"\n" {
395 let keys = their_index
396 .keys()
397 .chain(our_index.keys())
398 .collect::<IndexSet<_>>();
399
400 for key in keys {
401 match (their_index.get(key), our_index.get(key)) {
402 (Some(theirs), Some(ours)) => {
403 trace!("Comparing index shards for key={key:?}, theirs={theirs:?}, ours={ours:?}");
404
405 if theirs.1 == 0 {
406 trace!(
407 "No children in this shard (key={key:?}), moving to next one"
408 );
409 } else if theirs == ours {
410 trace!("These shards are already in sync (key={key:?}), moving to next one");
411 } else {
412 trace!("Data to be found here (key={key:?}), trying to enumerate");
413 queue.push(Some(key.to_owned()));
414 }
415 }
416 _ => bail!("Some index shards are omitted, this is currently unsupported"),
417 }
418 }
419
420 their_index.clear();
421 break;
422 } else if let Some(line) = line.strip_prefix(b":") {
423 let line = line.strip_suffix(b"\n").unwrap_or(line);
424 let line = str::from_utf8(line).context("Length tag has invalid utf8")?;
425 trace!("Received len tag: {:?}", line);
426 let len = usize::from_str_radix(line, 16)
427 .with_context(|| anyhow!("Length tag is invalid number: {line:?}"))?;
428
429 if len == 0 {
430 trace!("Received all releases from shard, moving to next one");
431 while query.increment() {
432 trace!("Reached last entry in shard, returning to parent");
433 }
434 break;
435 }
436
437 info!("Reading data packet from remote: {len:?} bytes");
439
440 let mut remaining = len;
441 let mut buf = vec![0u8; len];
442 while remaining > 0 {
443 let read = rx.read(&mut buf[len - remaining..]);
444
445 let n = time::timeout(SYNC_READ_TIMEOUT, read)
446 .await
447 .context("Read from remote timed out")?
448 .context("Failed to receive data from peer")?;
449
450 if n == 0 {
451 bail!("Unexpected end of file");
452 }
453
454 remaining -= n;
455 trace!("Read {}/{} bytes from remote", len - remaining, len);
456 }
457 trace!("Finished reading data packet: {:?}", buf.len());
458
459 let mut bytes = &buf[..];
460 while !bytes.is_empty() {
461 let (signed, remaining) =
462 Signed::from_bytes(bytes).context("Failed to parse release file")?;
463
464 for (fp, variant) in signed.canonicalize(Some(keyring))? {
465 let fp = fp.context(
466 "Signature can't be imported because the signature is unverified",
467 )?;
468 if dry_run {
469 debug!("Skipping insert due to dry-run");
470 } else {
471 db.add_release(&fp, &variant).await?;
472 }
473 }
474
475 bytes = remaining;
476 }
477 } else {
478 their_index
479 .parse_line(&line)
480 .with_context(|| anyhow!("Failed to parse line of batch index: {line:?}"))?;
481 }
482 }
483 }
484
485 Ok(())
486}
487
488pub async fn sync_pull<
489 D: DatabaseClient + Sync + Send,
490 R: AsyncRead + Unpin,
491 W: AsyncWrite + Unpin,
492>(
493 db: &mut D,
494 keyring: &Keyring,
495 selected_keys: &[Fingerprint],
496 dry_run: bool,
497 mut tx: W,
498 rx: R,
499) -> Result<()> {
500 let selected_keys = if !selected_keys.is_empty() {
501 Cow::Borrowed(selected_keys)
502 } else {
503 Cow::Owned(keyring.all_fingerprints())
504 };
505
506 let mut rx = io::BufReader::new(rx);
507 for fp in selected_keys.iter() {
508 sync_pull_key(db, keyring, fp, dry_run, &mut tx, &mut rx).await?;
509 }
510
511 Ok(())
512}
513
514#[cfg(test)]
515mod tests {
516 use super::*;
517 use crate::db::AccessMode;
518 use futures::TryStreamExt;
519
520 fn init() {
521 let _ = env_logger::builder().is_test(true).try_init();
522 }
523
524 async fn open_temp_dbs() -> Result<(tempfile::TempDir, Database, Database)> {
525 let dir = tempfile::tempdir()?;
526 let db_a = Database::open_at(dir.path().join("a"), AccessMode::Exclusive).await?;
527 let db_b = Database::open_at(dir.path().join("b"), AccessMode::Exclusive).await?;
528 Ok((dir, db_a, db_b))
529 }
530
531 async fn run_sync(keyring: &Keyring, db_a: &mut Database, db_b: &mut Database) -> Result<()> {
532 let (client, server) = tokio::io::duplex(64);
533 let (client_rx, client_tx) = tokio::io::split(client);
534 let (server_rx, server_tx) = tokio::io::split(server);
535 let task_yield = sync_yield(db_a, None, server_rx, server_tx, None);
536 let task_pull = sync_pull(db_b, keyring, &[], false, client_tx, client_rx);
537
538 tokio::select! {
539 ret = task_pull => ret?,
540 ret = task_yield => bail!("Yield task was not expected to return: {ret:?}"),
541 }
542
543 Ok(())
544 }
545
546 #[tokio::test]
547 async fn test_sync_both_empty() -> Result<()> {
548 init();
549
550 let keyring =
551 Keyring::new(include_bytes!("../contrib/signal-desktop-keyring.gpg")).unwrap();
552 let (_, mut db_a, mut db_b) = open_temp_dbs().await.unwrap();
553 run_sync(&keyring, &mut db_a, &mut db_b).await.unwrap();
554
555 Ok(())
556 }
557
558 #[tokio::test]
559 async fn test_sync_full() -> Result<()> {
560 init();
561
562 let keyring =
563 Keyring::new(include_bytes!("../contrib/signal-desktop-keyring.gpg")).unwrap();
564 let (_, mut db_a, mut db_b) = open_temp_dbs().await.unwrap();
565
566 let data = [
567 b"-----BEGIN PGP SIGNED MESSAGE-----
568
569Origin: . xenial
570Label: . xenial
571Suite: xenial
572Codename: xenial
573Date: Thu, 23 Feb 2023 01:55:04 UTC
574Architectures: amd64
575Components: main
576Description: Generated by aptly
577MD5Sum:
578 cdb20787f1556bb38ae3b6017ef51327 132984 main/binary-amd64/Packages
579 001fc41d6c21eb85a43a13133584cbae 21567 main/binary-amd64/Packages.gz
580 3fb4f1a0169c3b2fff2c43c2a2277b51 17923 main/binary-amd64/Packages.bz2
581 c911b1bc4adf556f6fbd17c0c9cd8315 4794 main/Contents-amd64.gz
582 d8c35b55bc8e48e267b9ccdaf383976d 85 main/binary-amd64/Release
583SHA1:
584 455673b692a697ad3ada91a875096365f8da1524 132984 main/binary-amd64/Packages
585 e6a940039dcfc7f93f4a5501f15e75d1427b9464 21567 main/binary-amd64/Packages.gz
586 b8447294816bb063c10d0ba35f48dd5d1980f795 17923 main/binary-amd64/Packages.bz2
587 b6fd643edc8846c0914b44f3182dfc086877d944 4794 main/Contents-amd64.gz
588 992cb9cd8a0af2d9ad81d2b45342656d41157202 85 main/binary-amd64/Release
589SHA256:
590 989c22244106e44d789400d4da33d2ed64228ce94f48d1c2c37493118c992384 132984 main/binary-amd64/Packages
591 c46198172d00d4e01388832b61186a888da47e2c119c1e9dd7378fea206b1237 21567 main/binary-amd64/Packages.gz
592 b368e24d5c137448095f8940e3b371bff83e3e56159df6c58d4be83732a85554 17923 main/binary-amd64/Packages.bz2
593 bb347cbc00e02d73fef513965f1cd9f9e73100cd34097c43fdd1414668ec8ed8 4794 main/Contents-amd64.gz
594 e593f5bb98e0b6dbf5d0636ebff298b905b98a00402e2b20173fdb5da85c46d9 85 main/binary-amd64/Release
595-----BEGIN PGP SIGNATURE-----
596
597wsFcBAEBCAAGBQJj9sd6AAoJENmAoXRX9vsGOQ0P/3S63ctKl7QyxmRQ4UVJl70S
598hTxA90FbWp236nrEWw4EO/eVWiR/VbgFPacp/dyBpSmtTFl5cpOeyf2SYj5qfg5L
599cemYgUbaxRl+PBFGm7A14y82Ym0MUzF9cNWVK8bDXH9BKSljKKerXr4giOwjTkgh
600z2LoLxnrbhGkIWnSNiT0YvQrkxkSC5BjOInRiy/4Dr7LFAX/7KBzyPVwiDPxWQca
601dwtmI6EoZQP+zHDTR6RwnYOB7oME8aYIruwF9Vhu/unfdC4LpbNJDGL7VwQKUp8h
602ICupSwnRmHPV2raNBq58K6OunGvFO0oFaYUIQqbvGzu/5859YWhrdd7gBd9Fj4zI
603Ff7fHC+ZigCNCk7op4LykJ/3uJF8NvFlNxiagO+1tRko3V4tNbeSrXEKDhr5RQJz
604p/VdL1TXI/pVIobxbF5D/Lo8dCs5LjJsJ5rFlPgzjlREFn0hwKcDwB7M+rbPhuHV
6051R3lgdhW01ZghwOdTMiX1cShQwE7bvGtskn2WIHyIhEawpotGNpBFG2K5TdxfXA1
606m+wu4PLxfxOSb+VoQlH1enyDcR7m7XNtt692l++6nw3rq6Wv2zNc9DHRE+HNavJg
607zwlfH3L9OOoGfPMfRxrKqFzcob2gnKjptlHt3XpUx5ZwS4hcKB2lETT9ORVxe1NI
608rK5KKL67o5aLviVqo98l
609=MI63
610-----END PGP SIGNATURE-----
611",
612b"-----BEGIN PGP SIGNED MESSAGE-----
613
614Origin: . xenial
615Label: . xenial
616Suite: xenial
617Codename: xenial
618Date: Wed, 15 Feb 2023 23:18:08 UTC
619Architectures: amd64
620Components: main
621Description: Generated by aptly
622MD5Sum:
623 bea5a0f7c99209504f22d8faf10125fd 132287 main/binary-amd64/Packages
624 1aa4c130945a3a076a9f16546ca17a83 21467 main/binary-amd64/Packages.gz
625 f18c7f0779161104fed2aec72d9a44e2 17922 main/binary-amd64/Packages.bz2
626 fdb168fb0b8f575585d917ca0ffd98bb 4783 main/Contents-amd64.gz
627 d8c35b55bc8e48e267b9ccdaf383976d 85 main/binary-amd64/Release
628SHA1:
629 d01c164d99cf7c867b5f115770f58e4916d7a15f 132287 main/binary-amd64/Packages
630 41e78c4c558567f4936d9952eb928a32911cc56c 21467 main/binary-amd64/Packages.gz
631 8837bf2f3e2bad7c73712d003c1510c6171c53ee 17922 main/binary-amd64/Packages.bz2
632 e800a4c83e8d9ef564f5869ad838962550799c5e 4783 main/Contents-amd64.gz
633 992cb9cd8a0af2d9ad81d2b45342656d41157202 85 main/binary-amd64/Release
634SHA256:
635 9f9178b66c4d1d31d7b2b741f0835c2140552cac68861beaf2ecc55f0364c620 132287 main/binary-amd64/Packages
636 57386742060a0913236bafd5e8eb3b3334284e0d2ab8362a7c22c78175e9d89b 21467 main/binary-amd64/Packages.gz
637 ec097b64b5e3a39760a9b5ea6b02e91d0401994464dd0ce3de2a0a26a62230e2 17922 main/binary-amd64/Packages.bz2
638 e04d5fd71915c3003d55f3927e5af71a4831e30ffbb0efec6dceb36cd1b054fa 4783 main/Contents-amd64.gz
639 e593f5bb98e0b6dbf5d0636ebff298b905b98a00402e2b20173fdb5da85c46d9 85 main/binary-amd64/Release
640-----BEGIN PGP SIGNATURE-----
641
642wsFcBAABCAAGBQJj7WgwAAoJENmAoXRX9vsG69AQAJmvducnhHqCXQIsqjXrDMjU
643QUAw56MRunn7rHTFpJY0ZPLgQ5gVBibouNZ9x78wuJ784Sl+MIHC7RWdQYBEbWQ5
644haKjiI00BzDeXx4sUet1E+Ce5dhjK/UvoZIOy+ed5nv/HM7QFrvoxdADSDnYGy2o
645djFUVWR5kzkb5Tv7bcjJQWWf6JvY1Z12CgsG85ECYv2PE+tGgQjSwbxRDvFFzY1O
646Xy1EkjT+YDG6hy5CiKSZL7qPsjsLHeuRvat3oSlBWiFRnSuLOlsDozqzYMFqNx93
647GPQiFNiYEmkxDxiKLOcds7+Plz2FjQdQwv2msllJ4jA9PxYRiEbfH14/ELk+/snE
64866XID9dv91JbrwaI3NOoJZZmN+QYZ7WaAj3Uxl3cYnCGuIIt6z4KB2CYeyRa3f3K
649HbPq0mBchPPmavaQEfaNDQ+dzMuazR0VMoKfHGEp44r+XU+JH/lNzlxgQEMgVv43
6500B++zb4MYgheGUhu7Xdgd6XSQdZGxt4GieXLwIAXA0nmAFlZB7EAJcyHqz0hVo6m
651Q/m8Ja8hBw6lmyM5uCduF61BhnQDfuDQetLgGzrvOp3m2qfTag3QGtEijwhH8L2O
6523xuMqMjtJutTa557go0p+PLjAhMVQ0S7z+3aLn/368qnqlxSflDCPe4GMcaXmOCz
653RdMJMk9txqB8GM5F2sO3
654=gtrA
655-----END PGP SIGNATURE-----
656",
657b"-----BEGIN PGP SIGNED MESSAGE-----
658
659Origin: . xenial
660Label: . xenial
661Suite: xenial
662Codename: xenial
663Date: Fri, 10 Feb 2023 21:24:49 UTC
664Architectures: amd64
665Components: main
666Description: Generated by aptly
667MD5Sum:
668 1044a9316b629fb7ea4b964ecaf1ccf3 21255 main/binary-amd64/Packages.gz
669 6ee4dcbdb0c0e98e416b94f542f6cc1b 17584 main/binary-amd64/Packages.bz2
670 fdb168fb0b8f575585d917ca0ffd98bb 4783 main/Contents-amd64.gz
671 d8c35b55bc8e48e267b9ccdaf383976d 85 main/binary-amd64/Release
672 b2f1f73fabd4acfaca43d05bee1debca 130864 main/binary-amd64/Packages
673SHA1:
674 1bcb7cd08c94a3519b2dce77f3f5f5e16c312067 21255 main/binary-amd64/Packages.gz
675 61c0f6b35c7bd3a5f59094511ccd593dcb2b8c96 17584 main/binary-amd64/Packages.bz2
676 e800a4c83e8d9ef564f5869ad838962550799c5e 4783 main/Contents-amd64.gz
677 992cb9cd8a0af2d9ad81d2b45342656d41157202 85 main/binary-amd64/Release
678 4a7c86cb1c0caa36c92e1af844ebf0b7e2bf4cea 130864 main/binary-amd64/Packages
679SHA256:
680 481c1bf74f609fbf71eed01da98a05cbe884acc2efd6d0e2c1c65f9e72ddc2e6 21255 main/binary-amd64/Packages.gz
681 d9f8cc2cc5b2aa854c509caf96d9e1457e6cb0fd55597ac49408a96afb8a727b 17584 main/binary-amd64/Packages.bz2
682 e04d5fd71915c3003d55f3927e5af71a4831e30ffbb0efec6dceb36cd1b054fa 4783 main/Contents-amd64.gz
683 e593f5bb98e0b6dbf5d0636ebff298b905b98a00402e2b20173fdb5da85c46d9 85 main/binary-amd64/Release
684 d0bec4d8f926383f3d61dc79b8d5d352f71adcf8befb59e8e02aeabe8c19eeba 130864 main/binary-amd64/Packages
685-----BEGIN PGP SIGNATURE-----
686
687wsFcBAEBCAAGBQJj5rYjAAoJENmAoXRX9vsG3wUP/2X7ufCo5nJkyHhzOtTEI4Pq
688rz6P94r2S/OA7v99mVkKNyOYZ8hKMNccYumvkWaXBF+WkLemCPeJxaBbRUrulu3c
689GXNPHht8dusQYIxS2VQVYbHgXfwQ+Y3+P1wVLPNT+9Ka0POkUT4YiM1G8Zx3fwTq
690zUeCpV1TKgkrVQ4CF5DX8i9tcVmYUq8B+BouwQAFJxElM1cuYqGybG19H/od77nH
691tkv3n43P0TCZ9KR48ZXWXF+6v26SRse2YkergbNOtJwRfdMHzvc8d/nb7T/Iv3jM
692WmqUs6Ob4EioUTWYwi2H3y+LnzAPeSVEklfCS61LzlyFGelpxHGuTjaaMtCI2Bkb
693f3XyNjeVwUYmnGWrBMCI38CUnY0J0oXLrVUxYZoT0O9SSO2bpql64T2Flqn10Djk
694W8j7V9a5gNO69PkNEHWUylwolFvF/H8Zmc6QZbnnbFSpC4pMEeRhoI1v1CqPSMn8
695APOGWa1xHN9hj9g4AZfXvO56BDveo9lbNOmFs2EAmBEEj2hCiroRtuCxxDmwerq3
696MLtCJIkir3JdbefexXcbIoP5+tjl573nvKU+Kb4KhCJTBDEY6+6qZKTSBDESKTvq
697T2L60YwfXZsj6WCS9roTz9llmze3YjURbHNZpf4BO3zONwNNeqFZw3qYWNCyzRS+
698R4AjBHbzlyIGpU5BGNn3
699=KMXz
700-----END PGP SIGNATURE-----
701"
702];
703 for data in data {
704 let (signed, _) = Signed::from_bytes(data).unwrap();
705 db_a.add_release(
706 &"DBA36B5181D0C816F630E889D980A17457F6FB06".parse()?,
707 &signed,
708 )
709 .await
710 .unwrap();
711 }
712
713 let keys_a = db_a.scan_keys(b"").try_collect::<Vec<_>>().await.unwrap();
714 let keys_b = db_b.scan_keys(b"").try_collect::<Vec<_>>().await.unwrap();
715 assert_eq!(keys_a.len(), 3);
716 assert_eq!(keys_b.len(), 0);
717
718 run_sync(&keyring, &mut db_a, &mut db_b).await.unwrap();
719
720 let keys_a = db_a.scan_keys(b"").try_collect::<Vec<_>>().await.unwrap();
721 let keys_b = db_b.scan_keys(b"").try_collect::<Vec<_>>().await.unwrap();
722 assert_eq!(keys_a.len(), 3);
723 assert_eq!(keys_b.len(), 3);
724
725 Ok(())
726 }
727
728 #[tokio::test]
729 async fn test_sync_from_partial() -> Result<()> {
730 init();
731
732 let keyring =
733 Keyring::new(include_bytes!("../contrib/signal-desktop-keyring.gpg")).unwrap();
734 let (_, mut db_a, mut db_b) = open_temp_dbs().await.unwrap();
735
736 let data = [
737 b"-----BEGIN PGP SIGNED MESSAGE-----
738
739Origin: . xenial
740Label: . xenial
741Suite: xenial
742Codename: xenial
743Date: Thu, 23 Feb 2023 01:55:04 UTC
744Architectures: amd64
745Components: main
746Description: Generated by aptly
747MD5Sum:
748 cdb20787f1556bb38ae3b6017ef51327 132984 main/binary-amd64/Packages
749 001fc41d6c21eb85a43a13133584cbae 21567 main/binary-amd64/Packages.gz
750 3fb4f1a0169c3b2fff2c43c2a2277b51 17923 main/binary-amd64/Packages.bz2
751 c911b1bc4adf556f6fbd17c0c9cd8315 4794 main/Contents-amd64.gz
752 d8c35b55bc8e48e267b9ccdaf383976d 85 main/binary-amd64/Release
753SHA1:
754 455673b692a697ad3ada91a875096365f8da1524 132984 main/binary-amd64/Packages
755 e6a940039dcfc7f93f4a5501f15e75d1427b9464 21567 main/binary-amd64/Packages.gz
756 b8447294816bb063c10d0ba35f48dd5d1980f795 17923 main/binary-amd64/Packages.bz2
757 b6fd643edc8846c0914b44f3182dfc086877d944 4794 main/Contents-amd64.gz
758 992cb9cd8a0af2d9ad81d2b45342656d41157202 85 main/binary-amd64/Release
759SHA256:
760 989c22244106e44d789400d4da33d2ed64228ce94f48d1c2c37493118c992384 132984 main/binary-amd64/Packages
761 c46198172d00d4e01388832b61186a888da47e2c119c1e9dd7378fea206b1237 21567 main/binary-amd64/Packages.gz
762 b368e24d5c137448095f8940e3b371bff83e3e56159df6c58d4be83732a85554 17923 main/binary-amd64/Packages.bz2
763 bb347cbc00e02d73fef513965f1cd9f9e73100cd34097c43fdd1414668ec8ed8 4794 main/Contents-amd64.gz
764 e593f5bb98e0b6dbf5d0636ebff298b905b98a00402e2b20173fdb5da85c46d9 85 main/binary-amd64/Release
765-----BEGIN PGP SIGNATURE-----
766
767wsFcBAEBCAAGBQJj9sd6AAoJENmAoXRX9vsGOQ0P/3S63ctKl7QyxmRQ4UVJl70S
768hTxA90FbWp236nrEWw4EO/eVWiR/VbgFPacp/dyBpSmtTFl5cpOeyf2SYj5qfg5L
769cemYgUbaxRl+PBFGm7A14y82Ym0MUzF9cNWVK8bDXH9BKSljKKerXr4giOwjTkgh
770z2LoLxnrbhGkIWnSNiT0YvQrkxkSC5BjOInRiy/4Dr7LFAX/7KBzyPVwiDPxWQca
771dwtmI6EoZQP+zHDTR6RwnYOB7oME8aYIruwF9Vhu/unfdC4LpbNJDGL7VwQKUp8h
772ICupSwnRmHPV2raNBq58K6OunGvFO0oFaYUIQqbvGzu/5859YWhrdd7gBd9Fj4zI
773Ff7fHC+ZigCNCk7op4LykJ/3uJF8NvFlNxiagO+1tRko3V4tNbeSrXEKDhr5RQJz
774p/VdL1TXI/pVIobxbF5D/Lo8dCs5LjJsJ5rFlPgzjlREFn0hwKcDwB7M+rbPhuHV
7751R3lgdhW01ZghwOdTMiX1cShQwE7bvGtskn2WIHyIhEawpotGNpBFG2K5TdxfXA1
776m+wu4PLxfxOSb+VoQlH1enyDcR7m7XNtt692l++6nw3rq6Wv2zNc9DHRE+HNavJg
777zwlfH3L9OOoGfPMfRxrKqFzcob2gnKjptlHt3XpUx5ZwS4hcKB2lETT9ORVxe1NI
778rK5KKL67o5aLviVqo98l
779=MI63
780-----END PGP SIGNATURE-----
781",
782b"-----BEGIN PGP SIGNED MESSAGE-----
783
784Origin: . xenial
785Label: . xenial
786Suite: xenial
787Codename: xenial
788Date: Wed, 15 Feb 2023 23:18:08 UTC
789Architectures: amd64
790Components: main
791Description: Generated by aptly
792MD5Sum:
793 bea5a0f7c99209504f22d8faf10125fd 132287 main/binary-amd64/Packages
794 1aa4c130945a3a076a9f16546ca17a83 21467 main/binary-amd64/Packages.gz
795 f18c7f0779161104fed2aec72d9a44e2 17922 main/binary-amd64/Packages.bz2
796 fdb168fb0b8f575585d917ca0ffd98bb 4783 main/Contents-amd64.gz
797 d8c35b55bc8e48e267b9ccdaf383976d 85 main/binary-amd64/Release
798SHA1:
799 d01c164d99cf7c867b5f115770f58e4916d7a15f 132287 main/binary-amd64/Packages
800 41e78c4c558567f4936d9952eb928a32911cc56c 21467 main/binary-amd64/Packages.gz
801 8837bf2f3e2bad7c73712d003c1510c6171c53ee 17922 main/binary-amd64/Packages.bz2
802 e800a4c83e8d9ef564f5869ad838962550799c5e 4783 main/Contents-amd64.gz
803 992cb9cd8a0af2d9ad81d2b45342656d41157202 85 main/binary-amd64/Release
804SHA256:
805 9f9178b66c4d1d31d7b2b741f0835c2140552cac68861beaf2ecc55f0364c620 132287 main/binary-amd64/Packages
806 57386742060a0913236bafd5e8eb3b3334284e0d2ab8362a7c22c78175e9d89b 21467 main/binary-amd64/Packages.gz
807 ec097b64b5e3a39760a9b5ea6b02e91d0401994464dd0ce3de2a0a26a62230e2 17922 main/binary-amd64/Packages.bz2
808 e04d5fd71915c3003d55f3927e5af71a4831e30ffbb0efec6dceb36cd1b054fa 4783 main/Contents-amd64.gz
809 e593f5bb98e0b6dbf5d0636ebff298b905b98a00402e2b20173fdb5da85c46d9 85 main/binary-amd64/Release
810-----BEGIN PGP SIGNATURE-----
811
812wsFcBAABCAAGBQJj7WgwAAoJENmAoXRX9vsG69AQAJmvducnhHqCXQIsqjXrDMjU
813QUAw56MRunn7rHTFpJY0ZPLgQ5gVBibouNZ9x78wuJ784Sl+MIHC7RWdQYBEbWQ5
814haKjiI00BzDeXx4sUet1E+Ce5dhjK/UvoZIOy+ed5nv/HM7QFrvoxdADSDnYGy2o
815djFUVWR5kzkb5Tv7bcjJQWWf6JvY1Z12CgsG85ECYv2PE+tGgQjSwbxRDvFFzY1O
816Xy1EkjT+YDG6hy5CiKSZL7qPsjsLHeuRvat3oSlBWiFRnSuLOlsDozqzYMFqNx93
817GPQiFNiYEmkxDxiKLOcds7+Plz2FjQdQwv2msllJ4jA9PxYRiEbfH14/ELk+/snE
81866XID9dv91JbrwaI3NOoJZZmN+QYZ7WaAj3Uxl3cYnCGuIIt6z4KB2CYeyRa3f3K
819HbPq0mBchPPmavaQEfaNDQ+dzMuazR0VMoKfHGEp44r+XU+JH/lNzlxgQEMgVv43
8200B++zb4MYgheGUhu7Xdgd6XSQdZGxt4GieXLwIAXA0nmAFlZB7EAJcyHqz0hVo6m
821Q/m8Ja8hBw6lmyM5uCduF61BhnQDfuDQetLgGzrvOp3m2qfTag3QGtEijwhH8L2O
8223xuMqMjtJutTa557go0p+PLjAhMVQ0S7z+3aLn/368qnqlxSflDCPe4GMcaXmOCz
823RdMJMk9txqB8GM5F2sO3
824=gtrA
825-----END PGP SIGNATURE-----
826",
827b"-----BEGIN PGP SIGNED MESSAGE-----
828
829Origin: . xenial
830Label: . xenial
831Suite: xenial
832Codename: xenial
833Date: Fri, 10 Feb 2023 21:24:49 UTC
834Architectures: amd64
835Components: main
836Description: Generated by aptly
837MD5Sum:
838 1044a9316b629fb7ea4b964ecaf1ccf3 21255 main/binary-amd64/Packages.gz
839 6ee4dcbdb0c0e98e416b94f542f6cc1b 17584 main/binary-amd64/Packages.bz2
840 fdb168fb0b8f575585d917ca0ffd98bb 4783 main/Contents-amd64.gz
841 d8c35b55bc8e48e267b9ccdaf383976d 85 main/binary-amd64/Release
842 b2f1f73fabd4acfaca43d05bee1debca 130864 main/binary-amd64/Packages
843SHA1:
844 1bcb7cd08c94a3519b2dce77f3f5f5e16c312067 21255 main/binary-amd64/Packages.gz
845 61c0f6b35c7bd3a5f59094511ccd593dcb2b8c96 17584 main/binary-amd64/Packages.bz2
846 e800a4c83e8d9ef564f5869ad838962550799c5e 4783 main/Contents-amd64.gz
847 992cb9cd8a0af2d9ad81d2b45342656d41157202 85 main/binary-amd64/Release
848 4a7c86cb1c0caa36c92e1af844ebf0b7e2bf4cea 130864 main/binary-amd64/Packages
849SHA256:
850 481c1bf74f609fbf71eed01da98a05cbe884acc2efd6d0e2c1c65f9e72ddc2e6 21255 main/binary-amd64/Packages.gz
851 d9f8cc2cc5b2aa854c509caf96d9e1457e6cb0fd55597ac49408a96afb8a727b 17584 main/binary-amd64/Packages.bz2
852 e04d5fd71915c3003d55f3927e5af71a4831e30ffbb0efec6dceb36cd1b054fa 4783 main/Contents-amd64.gz
853 e593f5bb98e0b6dbf5d0636ebff298b905b98a00402e2b20173fdb5da85c46d9 85 main/binary-amd64/Release
854 d0bec4d8f926383f3d61dc79b8d5d352f71adcf8befb59e8e02aeabe8c19eeba 130864 main/binary-amd64/Packages
855-----BEGIN PGP SIGNATURE-----
856
857wsFcBAEBCAAGBQJj5rYjAAoJENmAoXRX9vsG3wUP/2X7ufCo5nJkyHhzOtTEI4Pq
858rz6P94r2S/OA7v99mVkKNyOYZ8hKMNccYumvkWaXBF+WkLemCPeJxaBbRUrulu3c
859GXNPHht8dusQYIxS2VQVYbHgXfwQ+Y3+P1wVLPNT+9Ka0POkUT4YiM1G8Zx3fwTq
860zUeCpV1TKgkrVQ4CF5DX8i9tcVmYUq8B+BouwQAFJxElM1cuYqGybG19H/od77nH
861tkv3n43P0TCZ9KR48ZXWXF+6v26SRse2YkergbNOtJwRfdMHzvc8d/nb7T/Iv3jM
862WmqUs6Ob4EioUTWYwi2H3y+LnzAPeSVEklfCS61LzlyFGelpxHGuTjaaMtCI2Bkb
863f3XyNjeVwUYmnGWrBMCI38CUnY0J0oXLrVUxYZoT0O9SSO2bpql64T2Flqn10Djk
864W8j7V9a5gNO69PkNEHWUylwolFvF/H8Zmc6QZbnnbFSpC4pMEeRhoI1v1CqPSMn8
865APOGWa1xHN9hj9g4AZfXvO56BDveo9lbNOmFs2EAmBEEj2hCiroRtuCxxDmwerq3
866MLtCJIkir3JdbefexXcbIoP5+tjl573nvKU+Kb4KhCJTBDEY6+6qZKTSBDESKTvq
867T2L60YwfXZsj6WCS9roTz9llmze3YjURbHNZpf4BO3zONwNNeqFZw3qYWNCyzRS+
868R4AjBHbzlyIGpU5BGNn3
869=KMXz
870-----END PGP SIGNATURE-----
871"
872];
873 for data in data {
874 let (signed, _) = Signed::from_bytes(data).unwrap();
875 db_a.add_release(
876 &"DBA36B5181D0C816F630E889D980A17457F6FB06".parse()?,
877 &signed,
878 )
879 .await
880 .unwrap();
881 }
882
883 let (signed, _) = Signed::from_bytes(b"-----BEGIN PGP SIGNED MESSAGE-----
884
885Origin: . xenial
886Label: . xenial
887Suite: xenial
888Codename: xenial
889Date: Wed, 15 Feb 2023 23:18:08 UTC
890Architectures: amd64
891Components: main
892Description: Generated by aptly
893MD5Sum:
894 bea5a0f7c99209504f22d8faf10125fd 132287 main/binary-amd64/Packages
895 1aa4c130945a3a076a9f16546ca17a83 21467 main/binary-amd64/Packages.gz
896 f18c7f0779161104fed2aec72d9a44e2 17922 main/binary-amd64/Packages.bz2
897 fdb168fb0b8f575585d917ca0ffd98bb 4783 main/Contents-amd64.gz
898 d8c35b55bc8e48e267b9ccdaf383976d 85 main/binary-amd64/Release
899SHA1:
900 d01c164d99cf7c867b5f115770f58e4916d7a15f 132287 main/binary-amd64/Packages
901 41e78c4c558567f4936d9952eb928a32911cc56c 21467 main/binary-amd64/Packages.gz
902 8837bf2f3e2bad7c73712d003c1510c6171c53ee 17922 main/binary-amd64/Packages.bz2
903 e800a4c83e8d9ef564f5869ad838962550799c5e 4783 main/Contents-amd64.gz
904 992cb9cd8a0af2d9ad81d2b45342656d41157202 85 main/binary-amd64/Release
905SHA256:
906 9f9178b66c4d1d31d7b2b741f0835c2140552cac68861beaf2ecc55f0364c620 132287 main/binary-amd64/Packages
907 57386742060a0913236bafd5e8eb3b3334284e0d2ab8362a7c22c78175e9d89b 21467 main/binary-amd64/Packages.gz
908 ec097b64b5e3a39760a9b5ea6b02e91d0401994464dd0ce3de2a0a26a62230e2 17922 main/binary-amd64/Packages.bz2
909 e04d5fd71915c3003d55f3927e5af71a4831e30ffbb0efec6dceb36cd1b054fa 4783 main/Contents-amd64.gz
910 e593f5bb98e0b6dbf5d0636ebff298b905b98a00402e2b20173fdb5da85c46d9 85 main/binary-amd64/Release
911-----BEGIN PGP SIGNATURE-----
912
913wsFcBAABCAAGBQJj7WgwAAoJENmAoXRX9vsG69AQAJmvducnhHqCXQIsqjXrDMjU
914QUAw56MRunn7rHTFpJY0ZPLgQ5gVBibouNZ9x78wuJ784Sl+MIHC7RWdQYBEbWQ5
915haKjiI00BzDeXx4sUet1E+Ce5dhjK/UvoZIOy+ed5nv/HM7QFrvoxdADSDnYGy2o
916djFUVWR5kzkb5Tv7bcjJQWWf6JvY1Z12CgsG85ECYv2PE+tGgQjSwbxRDvFFzY1O
917Xy1EkjT+YDG6hy5CiKSZL7qPsjsLHeuRvat3oSlBWiFRnSuLOlsDozqzYMFqNx93
918GPQiFNiYEmkxDxiKLOcds7+Plz2FjQdQwv2msllJ4jA9PxYRiEbfH14/ELk+/snE
91966XID9dv91JbrwaI3NOoJZZmN+QYZ7WaAj3Uxl3cYnCGuIIt6z4KB2CYeyRa3f3K
920HbPq0mBchPPmavaQEfaNDQ+dzMuazR0VMoKfHGEp44r+XU+JH/lNzlxgQEMgVv43
9210B++zb4MYgheGUhu7Xdgd6XSQdZGxt4GieXLwIAXA0nmAFlZB7EAJcyHqz0hVo6m
922Q/m8Ja8hBw6lmyM5uCduF61BhnQDfuDQetLgGzrvOp3m2qfTag3QGtEijwhH8L2O
9233xuMqMjtJutTa557go0p+PLjAhMVQ0S7z+3aLn/368qnqlxSflDCPe4GMcaXmOCz
924RdMJMk9txqB8GM5F2sO3
925=gtrA
926-----END PGP SIGNATURE-----
927").unwrap();
928
929 db_b.add_release(
930 &"DBA36B5181D0C816F630E889D980A17457F6FB06".parse()?,
931 &signed,
932 )
933 .await
934 .unwrap();
935
936 let keys_a = db_a.scan_keys(b"").try_collect::<Vec<_>>().await.unwrap();
937 let keys_b = db_b.scan_keys(b"").try_collect::<Vec<_>>().await.unwrap();
938 assert_eq!(keys_a.len(), 3);
939 assert_eq!(keys_b.len(), 1);
940
941 run_sync(&keyring, &mut db_a, &mut db_b).await.unwrap();
942
943 let keys_a = db_a.scan_keys(b"").try_collect::<Vec<_>>().await.unwrap();
944 let keys_b = db_b.scan_keys(b"").try_collect::<Vec<_>>().await.unwrap();
945 assert_eq!(keys_a.len(), 3);
946 assert_eq!(keys_b.len(), 3);
947
948 Ok(())
949 }
950}