apt_swarm/
sync.rs

1use crate::db::{Database, DatabaseClient};
2use crate::errors::*;
3use crate::keyring::Keyring;
4use crate::p2p::peerdb;
5use crate::signed::Signed;
6use bstr::BStr;
7use futures::StreamExt;
8use indexmap::{IndexMap, IndexSet};
9use sequoia_openpgp::Fingerprint;
10use sha2::{Digest, Sha256};
11use std::borrow::Cow;
12use std::collections::{BTreeMap, VecDeque};
13use std::fmt;
14use std::str;
15use std::str::FromStr;
16use std::time::Duration;
17use tokio::io;
18use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
19use tokio::time;
20
21pub const MAX_LINE_LENGTH: u64 = 512;
22
23pub const SYNC_INDEX_TIMEOUT: Duration = Duration::from_secs(120);
24pub const SYNC_READ_TIMEOUT: Duration = Duration::from_secs(30);
25
26// We expect entries from 0-f
27pub const BATCH_INDEX_MAX_SIZE: usize = 16;
28
29/// If the number of entries is greater than zero, but <= this threshold, send a dump instead of an index
30pub const SPILL_THRESHOLD: usize = 1;
31
32/// Stop announcing peers we couldn't handshake with recently
33const PEX_MAX_SUCCESS_AGE: Duration = Duration::from_secs(3600 * 24 * 5);
34
35#[derive(Debug, Clone)]
36pub enum Query {
37    Tree(TreeQuery),
38    Pex,
39    Unknown(String),
40}
41
42impl Query {
43    pub fn from_bytes(bytes: &[u8]) -> Result<Self> {
44        let line = bytes.strip_suffix(b"\n").unwrap_or(bytes);
45        let line = str::from_utf8(line).context("Query contains invalid utf8")?;
46        let query = line
47            .parse()
48            .with_context(|| anyhow!("Failed to parse input as query: {line:?}"))?;
49        Ok(query)
50    }
51}
52
53impl FromStr for Query {
54    type Err = Error;
55
56    fn from_str(query: &str) -> Result<Self> {
57        if let Some(cmd) = query.strip_prefix("//") {
58            if cmd == "pex" {
59                Ok(Query::Pex)
60            } else {
61                Ok(Query::Unknown(cmd.to_string()))
62            }
63        } else {
64            let query = query
65                .parse()
66                .context("Failed to parse input as tree-query")?;
67            Ok(Query::Tree(query))
68        }
69    }
70}
71
72#[derive(Debug, Clone)]
73pub struct TreeQuery {
74    pub fp: Fingerprint,
75    pub hash_algo: String,
76    pub prefix: Option<String>,
77}
78
79impl TreeQuery {
80    pub async fn write_to<W: AsyncWrite + Unpin>(&self, mut tx: W) -> Result<()> {
81        let mut out = format!("{:X} {}", self.fp, self.hash_algo);
82        if let Some(prefix) = &self.prefix {
83            out.push(' ');
84            out.push_str(prefix);
85        }
86        out.push('\n');
87        tx.write_all(out.as_bytes()).await?;
88        Ok(())
89    }
90
91    /// Switch to the next shard
92    pub fn increment(&mut self) -> bool {
93        if let Some(prefix) = &mut self.prefix {
94            if prefix.ends_with('f') {
95                prefix.pop();
96                true
97            } else if let Some(c) = prefix.pop() {
98                let c = match c {
99                    '0'..='8' | 'a'..='e' => (c as u8 + 1) as char,
100                    '9' => 'a',
101                    _ => c,
102                };
103                prefix.push(c);
104                false
105            } else {
106                // prefix is empty, keyspace has been traversed
107                false
108            }
109        } else {
110            debug!("Peers are already in sync, nothing to do here");
111            self.prefix = Some(String::new());
112            false
113        }
114    }
115
116    /// Traverse into the first entry in this shard
117    pub fn enter(&mut self) {
118        if let Some(prefix) = &mut self.prefix {
119            prefix.push('0');
120        } else {
121            self.prefix = Some("0".to_string());
122        }
123    }
124}
125
126impl fmt::Display for TreeQuery {
127    fn fmt(&self, w: &mut fmt::Formatter) -> fmt::Result {
128        let prefix = self.prefix.as_deref().unwrap_or("");
129        write!(w, "{:X}/{}:{}", self.fp, self.hash_algo, prefix)?;
130        Ok(())
131    }
132}
133
134impl FromStr for TreeQuery {
135    type Err = Error;
136
137    fn from_str(s: &str) -> Result<Self> {
138        let mut s = s.split(' ');
139        let fp = Fingerprint::from_str(s.next().context("Missing fingerprint")?)
140            .context("Invalid fingerprint")?;
141
142        let hash_algo = s.next().context("Missing hash algo")?;
143        if hash_algo != "sha256" {
144            bail!("Only sha256 is supported at the moment");
145        }
146
147        let prefix = s.next().map(String::from);
148
149        if let Some(garbage) = s.next() {
150            bail!("Detected trailing data, rejecting as invalid: {garbage:?}");
151        }
152
153        Ok(TreeQuery {
154            fp,
155            hash_algo: hash_algo.to_string(),
156            prefix,
157        })
158    }
159}
160
161#[derive(Debug, Default)]
162pub struct BatchIndex {
163    index: IndexMap<String, (String, usize)>,
164}
165
166impl BatchIndex {
167    pub fn new() -> Self {
168        BatchIndex::default()
169    }
170
171    pub fn add(&mut self, index: String, prefix: String, count: usize) -> Result<()> {
172        if self.index.len() < BATCH_INDEX_MAX_SIZE {
173            self.index.insert(prefix, (index, count));
174            Ok(())
175        } else {
176            bail!(
177                "Batch index is already at max capacity: {:?}",
178                self.index.len()
179            )
180        }
181    }
182
183    pub async fn write_to<W: AsyncWrite + Unpin>(&self, mut sink: W) -> Result<()> {
184        for (prefix, (index, count)) in &self.index {
185            sink.write_all(format!("{index} {prefix} {count}\n").as_bytes())
186                .await?;
187        }
188        Ok(())
189    }
190
191    pub fn parse_line(&mut self, line: &[u8]) -> Result<()> {
192        let line = line.strip_suffix(b"\n").unwrap_or(line);
193        let line = str::from_utf8(line).context("Response contains invalid utf8")?;
194
195        let mut s = line.split(' ');
196        let index = s.next().context("Missing index from response")?;
197        let prefix = s.next().context("Failed to get prefix for index")?;
198        let count = s.next().context("Failed to get number of children")?;
199        let count = count
200            .parse()
201            .context("Number of children is not a number")?;
202
203        self.add(index.to_string(), prefix.to_string(), count)
204    }
205
206    pub fn get(&self, key: &str) -> Option<&(String, usize)> {
207        self.index.get(key)
208    }
209
210    pub fn keys(&self) -> indexmap::map::Keys<String, (String, usize)> {
211        self.index.keys()
212    }
213
214    pub fn clear(&mut self) {
215        self.index.clear();
216    }
217}
218
219pub async fn index_from_scan(db: &Database, query: &TreeQuery) -> Result<(String, usize)> {
220    let prefix = query.to_string();
221
222    let mut counter = 0;
223    let mut hasher = Sha256::new();
224
225    let stream = db.scan_keys(prefix.as_bytes());
226    tokio::pin!(stream);
227    while let Some(item) = stream.next().await {
228        let hash = item.context("Failed to read from database (index_from_scan)")?;
229        hasher.update(&hash);
230        hasher.update(b"\n");
231        counter += 1;
232    }
233
234    let result = hasher.finalize();
235    Ok((format!("sha256:{result:x}"), counter))
236}
237
238pub async fn sync_yield<
239    D: DatabaseClient + Sync + Send,
240    R: AsyncRead + Unpin,
241    W: AsyncWrite + Unpin,
242>(
243    db: &mut D,
244    peerdb: Option<peerdb::Client>,
245    rx: R,
246    mut tx: W,
247    timeout: Option<Duration>,
248) -> Result<()> {
249    let mut rx = io::BufReader::new(rx);
250    loop {
251        let mut line = Vec::new();
252        let mut rrx = (&mut rx).take(MAX_LINE_LENGTH);
253        let read = rrx.read_until(b'\n', &mut line);
254
255        let n = if let Some(timeout) = timeout {
256            if let Ok(n) = time::timeout(timeout, read).await {
257                n
258            } else {
259                break;
260            }
261        } else {
262            read.await
263        }?;
264
265        if n == 0 {
266            break;
267        }
268
269        if !line.ends_with(b"\n") {
270            bail!(
271                "Client sent invalid request, exceeding size limit: {:?}",
272                BStr::new(&line)
273            );
274        }
275
276        let query = Query::from_bytes(&line)?;
277        trace!("Received query: {:?}", query);
278        match query {
279            Query::Tree(mut query) => {
280                let (index, total) = db.batch_index_from_scan(&mut query).await?;
281
282                if total > 0 && total <= SPILL_THRESHOLD {
283                    let prefix = query.to_string();
284                    for (hash, data) in db.spill(prefix.as_bytes()).await? {
285                        trace!("Sending data packet to client: {:?}", BStr::new(&hash));
286                        tx.write_all(format!(":{:x}\n", data.len()).as_bytes())
287                            .await?;
288                        tx.write_all(&data).await?;
289                    }
290                    tx.write_all(b":0\n").await?;
291                } else {
292                    index.write_to(&mut tx).await?;
293                    tx.write_all(b"\n").await?;
294                }
295            }
296            Query::Pex => {
297                let mut buf = String::new();
298                if let Some(peerdb) = &peerdb {
299                    for addr in peerdb.sample(Some(PEX_MAX_SUCCESS_AGE)).await? {
300                        buf += &format!("{addr}\n");
301                    }
302                }
303                tx.write_all(format!(":{}\n", buf.len()).as_bytes()).await?;
304                tx.write_all(buf.as_bytes()).await?;
305            }
306            Query::Unknown(data) => {
307                debug!("Received unknown command from network: {data:?}");
308                tx.write_all(b":0\n").await?;
309            }
310        }
311    }
312
313    Ok(())
314}
315
316#[derive(Debug, Default)]
317pub struct SyncQueue {
318    queues: BTreeMap<usize, VecDeque<Option<String>>>,
319}
320
321impl SyncQueue {
322    pub fn push(&mut self, key: Option<String>) {
323        let len = key.as_ref().map(|s| s.len()).unwrap_or(0);
324        let queue = self.queues.entry(len).or_default();
325        queue.push_back(key);
326    }
327
328    pub fn pop_next(&mut self) -> Option<Option<String>> {
329        loop {
330            let mut entry = self.queues.last_entry()?;
331            let queue = entry.get_mut();
332            if let Some(item) = queue.pop_front() {
333                return Some(item);
334            } else {
335                entry.remove_entry();
336            }
337        }
338    }
339}
340
341pub async fn sync_pull_key<
342    D: DatabaseClient + Sync + Send,
343    R: AsyncRead + Unpin,
344    W: AsyncWrite + Unpin,
345>(
346    db: &mut D,
347    keyring: &Keyring,
348    fp: &Fingerprint,
349    dry_run: bool,
350    mut tx: W,
351    rx: &mut io::BufReader<R>,
352) -> Result<()> {
353    let mut query = TreeQuery {
354        fp: fp.clone(),
355        hash_algo: "sha256".to_string(),
356        prefix: None,
357    };
358
359    let mut queue = SyncQueue::default();
360    queue.push(None);
361
362    while let Some(item) = queue.pop_next() {
363        query.prefix = item;
364        info!("Requesting index for: {:?}", query.to_string());
365        query.write_to(&mut tx).await?;
366
367        let (our_index, _our_count) = db.batch_index_from_scan(&mut query).await?;
368        trace!("Our index: {our_index:?}");
369
370        let mut line = Vec::new();
371        let mut their_index = BatchIndex::new();
372
373        loop {
374            line.clear();
375
376            let mut rrx = rx.take(MAX_LINE_LENGTH);
377            let read = rrx.read_until(b'\n', &mut line);
378            let n = time::timeout(SYNC_INDEX_TIMEOUT, read)
379                .await
380                .context("Request for index timed out")?
381                .context("Failed to receive response from peer")?;
382
383            if n == 0 {
384                bail!("Reached unexpected eof while enumerating service");
385            }
386
387            if !line.ends_with(b"\n") {
388                bail!(
389                    "Server sent invalid line, exceeding size limit: {:?}",
390                    BStr::new(&line)
391                );
392            }
393
394            if line == b"\n" {
395                let keys = their_index
396                    .keys()
397                    .chain(our_index.keys())
398                    .collect::<IndexSet<_>>();
399
400                for key in keys {
401                    match (their_index.get(key), our_index.get(key)) {
402                        (Some(theirs), Some(ours)) => {
403                            trace!("Comparing index shards for key={key:?}, theirs={theirs:?}, ours={ours:?}");
404
405                            if theirs.1 == 0 {
406                                trace!(
407                                    "No children in this shard (key={key:?}), moving to next one"
408                                );
409                            } else if theirs == ours {
410                                trace!("These shards are already in sync (key={key:?}), moving to next one");
411                            } else {
412                                trace!("Data to be found here (key={key:?}), trying to enumerate");
413                                queue.push(Some(key.to_owned()));
414                            }
415                        }
416                        _ => bail!("Some index shards are omitted, this is currently unsupported"),
417                    }
418                }
419
420                their_index.clear();
421                break;
422            } else if let Some(line) = line.strip_prefix(b":") {
423                let line = line.strip_suffix(b"\n").unwrap_or(line);
424                let line = str::from_utf8(line).context("Length tag has invalid utf8")?;
425                trace!("Received len tag: {:?}", line);
426                let len = usize::from_str_radix(line, 16)
427                    .with_context(|| anyhow!("Length tag is invalid number: {line:?}"))?;
428
429                if len == 0 {
430                    trace!("Received all releases from shard, moving to next one");
431                    while query.increment() {
432                        trace!("Reached last entry in shard, returning to parent");
433                    }
434                    break;
435                }
436
437                // TODO: check this tag doesn't OOM us
438                info!("Reading data packet from remote: {len:?} bytes");
439
440                let mut remaining = len;
441                let mut buf = vec![0u8; len];
442                while remaining > 0 {
443                    let read = rx.read(&mut buf[len - remaining..]);
444
445                    let n = time::timeout(SYNC_READ_TIMEOUT, read)
446                        .await
447                        .context("Read from remote timed out")?
448                        .context("Failed to receive data from peer")?;
449
450                    if n == 0 {
451                        bail!("Unexpected end of file");
452                    }
453
454                    remaining -= n;
455                    trace!("Read {}/{} bytes from remote", len - remaining, len);
456                }
457                trace!("Finished reading data packet: {:?}", buf.len());
458
459                let mut bytes = &buf[..];
460                while !bytes.is_empty() {
461                    let (signed, remaining) =
462                        Signed::from_bytes(bytes).context("Failed to parse release file")?;
463
464                    for (fp, variant) in signed.canonicalize(Some(keyring))? {
465                        let fp = fp.context(
466                            "Signature can't be imported because the signature is unverified",
467                        )?;
468                        if dry_run {
469                            debug!("Skipping insert due to dry-run");
470                        } else {
471                            db.add_release(&fp, &variant).await?;
472                        }
473                    }
474
475                    bytes = remaining;
476                }
477            } else {
478                their_index
479                    .parse_line(&line)
480                    .with_context(|| anyhow!("Failed to parse line of batch index: {line:?}"))?;
481            }
482        }
483    }
484
485    Ok(())
486}
487
488pub async fn sync_pull<
489    D: DatabaseClient + Sync + Send,
490    R: AsyncRead + Unpin,
491    W: AsyncWrite + Unpin,
492>(
493    db: &mut D,
494    keyring: &Keyring,
495    selected_keys: &[Fingerprint],
496    dry_run: bool,
497    mut tx: W,
498    rx: R,
499) -> Result<()> {
500    let selected_keys = if !selected_keys.is_empty() {
501        Cow::Borrowed(selected_keys)
502    } else {
503        Cow::Owned(keyring.all_fingerprints())
504    };
505
506    let mut rx = io::BufReader::new(rx);
507    for fp in selected_keys.iter() {
508        sync_pull_key(db, keyring, fp, dry_run, &mut tx, &mut rx).await?;
509    }
510
511    Ok(())
512}
513
514#[cfg(test)]
515mod tests {
516    use super::*;
517    use crate::db::AccessMode;
518    use futures::TryStreamExt;
519
520    fn init() {
521        let _ = env_logger::builder().is_test(true).try_init();
522    }
523
524    async fn open_temp_dbs() -> Result<(tempfile::TempDir, Database, Database)> {
525        let dir = tempfile::tempdir()?;
526        let db_a = Database::open_at(dir.path().join("a"), AccessMode::Exclusive).await?;
527        let db_b = Database::open_at(dir.path().join("b"), AccessMode::Exclusive).await?;
528        Ok((dir, db_a, db_b))
529    }
530
531    async fn run_sync(keyring: &Keyring, db_a: &mut Database, db_b: &mut Database) -> Result<()> {
532        let (client, server) = tokio::io::duplex(64);
533        let (client_rx, client_tx) = tokio::io::split(client);
534        let (server_rx, server_tx) = tokio::io::split(server);
535        let task_yield = sync_yield(db_a, None, server_rx, server_tx, None);
536        let task_pull = sync_pull(db_b, keyring, &[], false, client_tx, client_rx);
537
538        tokio::select! {
539            ret = task_pull => ret?,
540            ret = task_yield => bail!("Yield task was not expected to return: {ret:?}"),
541        }
542
543        Ok(())
544    }
545
546    #[tokio::test]
547    async fn test_sync_both_empty() -> Result<()> {
548        init();
549
550        let keyring =
551            Keyring::new(include_bytes!("../contrib/signal-desktop-keyring.gpg")).unwrap();
552        let (_, mut db_a, mut db_b) = open_temp_dbs().await.unwrap();
553        run_sync(&keyring, &mut db_a, &mut db_b).await.unwrap();
554
555        Ok(())
556    }
557
558    #[tokio::test]
559    async fn test_sync_full() -> Result<()> {
560        init();
561
562        let keyring =
563            Keyring::new(include_bytes!("../contrib/signal-desktop-keyring.gpg")).unwrap();
564        let (_, mut db_a, mut db_b) = open_temp_dbs().await.unwrap();
565
566        let data = [
567        b"-----BEGIN PGP SIGNED MESSAGE-----
568
569Origin: . xenial
570Label: . xenial
571Suite: xenial
572Codename: xenial
573Date: Thu, 23 Feb 2023 01:55:04 UTC
574Architectures: amd64
575Components: main
576Description: Generated by aptly
577MD5Sum:
578 cdb20787f1556bb38ae3b6017ef51327   132984 main/binary-amd64/Packages
579 001fc41d6c21eb85a43a13133584cbae    21567 main/binary-amd64/Packages.gz
580 3fb4f1a0169c3b2fff2c43c2a2277b51    17923 main/binary-amd64/Packages.bz2
581 c911b1bc4adf556f6fbd17c0c9cd8315     4794 main/Contents-amd64.gz
582 d8c35b55bc8e48e267b9ccdaf383976d       85 main/binary-amd64/Release
583SHA1:
584 455673b692a697ad3ada91a875096365f8da1524   132984 main/binary-amd64/Packages
585 e6a940039dcfc7f93f4a5501f15e75d1427b9464    21567 main/binary-amd64/Packages.gz
586 b8447294816bb063c10d0ba35f48dd5d1980f795    17923 main/binary-amd64/Packages.bz2
587 b6fd643edc8846c0914b44f3182dfc086877d944     4794 main/Contents-amd64.gz
588 992cb9cd8a0af2d9ad81d2b45342656d41157202       85 main/binary-amd64/Release
589SHA256:
590 989c22244106e44d789400d4da33d2ed64228ce94f48d1c2c37493118c992384   132984 main/binary-amd64/Packages
591 c46198172d00d4e01388832b61186a888da47e2c119c1e9dd7378fea206b1237    21567 main/binary-amd64/Packages.gz
592 b368e24d5c137448095f8940e3b371bff83e3e56159df6c58d4be83732a85554    17923 main/binary-amd64/Packages.bz2
593 bb347cbc00e02d73fef513965f1cd9f9e73100cd34097c43fdd1414668ec8ed8     4794 main/Contents-amd64.gz
594 e593f5bb98e0b6dbf5d0636ebff298b905b98a00402e2b20173fdb5da85c46d9       85 main/binary-amd64/Release
595-----BEGIN PGP SIGNATURE-----
596
597wsFcBAEBCAAGBQJj9sd6AAoJENmAoXRX9vsGOQ0P/3S63ctKl7QyxmRQ4UVJl70S
598hTxA90FbWp236nrEWw4EO/eVWiR/VbgFPacp/dyBpSmtTFl5cpOeyf2SYj5qfg5L
599cemYgUbaxRl+PBFGm7A14y82Ym0MUzF9cNWVK8bDXH9BKSljKKerXr4giOwjTkgh
600z2LoLxnrbhGkIWnSNiT0YvQrkxkSC5BjOInRiy/4Dr7LFAX/7KBzyPVwiDPxWQca
601dwtmI6EoZQP+zHDTR6RwnYOB7oME8aYIruwF9Vhu/unfdC4LpbNJDGL7VwQKUp8h
602ICupSwnRmHPV2raNBq58K6OunGvFO0oFaYUIQqbvGzu/5859YWhrdd7gBd9Fj4zI
603Ff7fHC+ZigCNCk7op4LykJ/3uJF8NvFlNxiagO+1tRko3V4tNbeSrXEKDhr5RQJz
604p/VdL1TXI/pVIobxbF5D/Lo8dCs5LjJsJ5rFlPgzjlREFn0hwKcDwB7M+rbPhuHV
6051R3lgdhW01ZghwOdTMiX1cShQwE7bvGtskn2WIHyIhEawpotGNpBFG2K5TdxfXA1
606m+wu4PLxfxOSb+VoQlH1enyDcR7m7XNtt692l++6nw3rq6Wv2zNc9DHRE+HNavJg
607zwlfH3L9OOoGfPMfRxrKqFzcob2gnKjptlHt3XpUx5ZwS4hcKB2lETT9ORVxe1NI
608rK5KKL67o5aLviVqo98l
609=MI63
610-----END PGP SIGNATURE-----
611",
612b"-----BEGIN PGP SIGNED MESSAGE-----
613
614Origin: . xenial
615Label: . xenial
616Suite: xenial
617Codename: xenial
618Date: Wed, 15 Feb 2023 23:18:08 UTC
619Architectures: amd64
620Components: main
621Description: Generated by aptly
622MD5Sum:
623 bea5a0f7c99209504f22d8faf10125fd   132287 main/binary-amd64/Packages
624 1aa4c130945a3a076a9f16546ca17a83    21467 main/binary-amd64/Packages.gz
625 f18c7f0779161104fed2aec72d9a44e2    17922 main/binary-amd64/Packages.bz2
626 fdb168fb0b8f575585d917ca0ffd98bb     4783 main/Contents-amd64.gz
627 d8c35b55bc8e48e267b9ccdaf383976d       85 main/binary-amd64/Release
628SHA1:
629 d01c164d99cf7c867b5f115770f58e4916d7a15f   132287 main/binary-amd64/Packages
630 41e78c4c558567f4936d9952eb928a32911cc56c    21467 main/binary-amd64/Packages.gz
631 8837bf2f3e2bad7c73712d003c1510c6171c53ee    17922 main/binary-amd64/Packages.bz2
632 e800a4c83e8d9ef564f5869ad838962550799c5e     4783 main/Contents-amd64.gz
633 992cb9cd8a0af2d9ad81d2b45342656d41157202       85 main/binary-amd64/Release
634SHA256:
635 9f9178b66c4d1d31d7b2b741f0835c2140552cac68861beaf2ecc55f0364c620   132287 main/binary-amd64/Packages
636 57386742060a0913236bafd5e8eb3b3334284e0d2ab8362a7c22c78175e9d89b    21467 main/binary-amd64/Packages.gz
637 ec097b64b5e3a39760a9b5ea6b02e91d0401994464dd0ce3de2a0a26a62230e2    17922 main/binary-amd64/Packages.bz2
638 e04d5fd71915c3003d55f3927e5af71a4831e30ffbb0efec6dceb36cd1b054fa     4783 main/Contents-amd64.gz
639 e593f5bb98e0b6dbf5d0636ebff298b905b98a00402e2b20173fdb5da85c46d9       85 main/binary-amd64/Release
640-----BEGIN PGP SIGNATURE-----
641
642wsFcBAABCAAGBQJj7WgwAAoJENmAoXRX9vsG69AQAJmvducnhHqCXQIsqjXrDMjU
643QUAw56MRunn7rHTFpJY0ZPLgQ5gVBibouNZ9x78wuJ784Sl+MIHC7RWdQYBEbWQ5
644haKjiI00BzDeXx4sUet1E+Ce5dhjK/UvoZIOy+ed5nv/HM7QFrvoxdADSDnYGy2o
645djFUVWR5kzkb5Tv7bcjJQWWf6JvY1Z12CgsG85ECYv2PE+tGgQjSwbxRDvFFzY1O
646Xy1EkjT+YDG6hy5CiKSZL7qPsjsLHeuRvat3oSlBWiFRnSuLOlsDozqzYMFqNx93
647GPQiFNiYEmkxDxiKLOcds7+Plz2FjQdQwv2msllJ4jA9PxYRiEbfH14/ELk+/snE
64866XID9dv91JbrwaI3NOoJZZmN+QYZ7WaAj3Uxl3cYnCGuIIt6z4KB2CYeyRa3f3K
649HbPq0mBchPPmavaQEfaNDQ+dzMuazR0VMoKfHGEp44r+XU+JH/lNzlxgQEMgVv43
6500B++zb4MYgheGUhu7Xdgd6XSQdZGxt4GieXLwIAXA0nmAFlZB7EAJcyHqz0hVo6m
651Q/m8Ja8hBw6lmyM5uCduF61BhnQDfuDQetLgGzrvOp3m2qfTag3QGtEijwhH8L2O
6523xuMqMjtJutTa557go0p+PLjAhMVQ0S7z+3aLn/368qnqlxSflDCPe4GMcaXmOCz
653RdMJMk9txqB8GM5F2sO3
654=gtrA
655-----END PGP SIGNATURE-----
656",
657b"-----BEGIN PGP SIGNED MESSAGE-----
658
659Origin: . xenial
660Label: . xenial
661Suite: xenial
662Codename: xenial
663Date: Fri, 10 Feb 2023 21:24:49 UTC
664Architectures: amd64
665Components: main
666Description: Generated by aptly
667MD5Sum:
668 1044a9316b629fb7ea4b964ecaf1ccf3    21255 main/binary-amd64/Packages.gz
669 6ee4dcbdb0c0e98e416b94f542f6cc1b    17584 main/binary-amd64/Packages.bz2
670 fdb168fb0b8f575585d917ca0ffd98bb     4783 main/Contents-amd64.gz
671 d8c35b55bc8e48e267b9ccdaf383976d       85 main/binary-amd64/Release
672 b2f1f73fabd4acfaca43d05bee1debca   130864 main/binary-amd64/Packages
673SHA1:
674 1bcb7cd08c94a3519b2dce77f3f5f5e16c312067    21255 main/binary-amd64/Packages.gz
675 61c0f6b35c7bd3a5f59094511ccd593dcb2b8c96    17584 main/binary-amd64/Packages.bz2
676 e800a4c83e8d9ef564f5869ad838962550799c5e     4783 main/Contents-amd64.gz
677 992cb9cd8a0af2d9ad81d2b45342656d41157202       85 main/binary-amd64/Release
678 4a7c86cb1c0caa36c92e1af844ebf0b7e2bf4cea   130864 main/binary-amd64/Packages
679SHA256:
680 481c1bf74f609fbf71eed01da98a05cbe884acc2efd6d0e2c1c65f9e72ddc2e6    21255 main/binary-amd64/Packages.gz
681 d9f8cc2cc5b2aa854c509caf96d9e1457e6cb0fd55597ac49408a96afb8a727b    17584 main/binary-amd64/Packages.bz2
682 e04d5fd71915c3003d55f3927e5af71a4831e30ffbb0efec6dceb36cd1b054fa     4783 main/Contents-amd64.gz
683 e593f5bb98e0b6dbf5d0636ebff298b905b98a00402e2b20173fdb5da85c46d9       85 main/binary-amd64/Release
684 d0bec4d8f926383f3d61dc79b8d5d352f71adcf8befb59e8e02aeabe8c19eeba   130864 main/binary-amd64/Packages
685-----BEGIN PGP SIGNATURE-----
686
687wsFcBAEBCAAGBQJj5rYjAAoJENmAoXRX9vsG3wUP/2X7ufCo5nJkyHhzOtTEI4Pq
688rz6P94r2S/OA7v99mVkKNyOYZ8hKMNccYumvkWaXBF+WkLemCPeJxaBbRUrulu3c
689GXNPHht8dusQYIxS2VQVYbHgXfwQ+Y3+P1wVLPNT+9Ka0POkUT4YiM1G8Zx3fwTq
690zUeCpV1TKgkrVQ4CF5DX8i9tcVmYUq8B+BouwQAFJxElM1cuYqGybG19H/od77nH
691tkv3n43P0TCZ9KR48ZXWXF+6v26SRse2YkergbNOtJwRfdMHzvc8d/nb7T/Iv3jM
692WmqUs6Ob4EioUTWYwi2H3y+LnzAPeSVEklfCS61LzlyFGelpxHGuTjaaMtCI2Bkb
693f3XyNjeVwUYmnGWrBMCI38CUnY0J0oXLrVUxYZoT0O9SSO2bpql64T2Flqn10Djk
694W8j7V9a5gNO69PkNEHWUylwolFvF/H8Zmc6QZbnnbFSpC4pMEeRhoI1v1CqPSMn8
695APOGWa1xHN9hj9g4AZfXvO56BDveo9lbNOmFs2EAmBEEj2hCiroRtuCxxDmwerq3
696MLtCJIkir3JdbefexXcbIoP5+tjl573nvKU+Kb4KhCJTBDEY6+6qZKTSBDESKTvq
697T2L60YwfXZsj6WCS9roTz9llmze3YjURbHNZpf4BO3zONwNNeqFZw3qYWNCyzRS+
698R4AjBHbzlyIGpU5BGNn3
699=KMXz
700-----END PGP SIGNATURE-----
701"
702];
703        for data in data {
704            let (signed, _) = Signed::from_bytes(data).unwrap();
705            db_a.add_release(
706                &"DBA36B5181D0C816F630E889D980A17457F6FB06".parse()?,
707                &signed,
708            )
709            .await
710            .unwrap();
711        }
712
713        let keys_a = db_a.scan_keys(b"").try_collect::<Vec<_>>().await.unwrap();
714        let keys_b = db_b.scan_keys(b"").try_collect::<Vec<_>>().await.unwrap();
715        assert_eq!(keys_a.len(), 3);
716        assert_eq!(keys_b.len(), 0);
717
718        run_sync(&keyring, &mut db_a, &mut db_b).await.unwrap();
719
720        let keys_a = db_a.scan_keys(b"").try_collect::<Vec<_>>().await.unwrap();
721        let keys_b = db_b.scan_keys(b"").try_collect::<Vec<_>>().await.unwrap();
722        assert_eq!(keys_a.len(), 3);
723        assert_eq!(keys_b.len(), 3);
724
725        Ok(())
726    }
727
728    #[tokio::test]
729    async fn test_sync_from_partial() -> Result<()> {
730        init();
731
732        let keyring =
733            Keyring::new(include_bytes!("../contrib/signal-desktop-keyring.gpg")).unwrap();
734        let (_, mut db_a, mut db_b) = open_temp_dbs().await.unwrap();
735
736        let data = [
737        b"-----BEGIN PGP SIGNED MESSAGE-----
738
739Origin: . xenial
740Label: . xenial
741Suite: xenial
742Codename: xenial
743Date: Thu, 23 Feb 2023 01:55:04 UTC
744Architectures: amd64
745Components: main
746Description: Generated by aptly
747MD5Sum:
748 cdb20787f1556bb38ae3b6017ef51327   132984 main/binary-amd64/Packages
749 001fc41d6c21eb85a43a13133584cbae    21567 main/binary-amd64/Packages.gz
750 3fb4f1a0169c3b2fff2c43c2a2277b51    17923 main/binary-amd64/Packages.bz2
751 c911b1bc4adf556f6fbd17c0c9cd8315     4794 main/Contents-amd64.gz
752 d8c35b55bc8e48e267b9ccdaf383976d       85 main/binary-amd64/Release
753SHA1:
754 455673b692a697ad3ada91a875096365f8da1524   132984 main/binary-amd64/Packages
755 e6a940039dcfc7f93f4a5501f15e75d1427b9464    21567 main/binary-amd64/Packages.gz
756 b8447294816bb063c10d0ba35f48dd5d1980f795    17923 main/binary-amd64/Packages.bz2
757 b6fd643edc8846c0914b44f3182dfc086877d944     4794 main/Contents-amd64.gz
758 992cb9cd8a0af2d9ad81d2b45342656d41157202       85 main/binary-amd64/Release
759SHA256:
760 989c22244106e44d789400d4da33d2ed64228ce94f48d1c2c37493118c992384   132984 main/binary-amd64/Packages
761 c46198172d00d4e01388832b61186a888da47e2c119c1e9dd7378fea206b1237    21567 main/binary-amd64/Packages.gz
762 b368e24d5c137448095f8940e3b371bff83e3e56159df6c58d4be83732a85554    17923 main/binary-amd64/Packages.bz2
763 bb347cbc00e02d73fef513965f1cd9f9e73100cd34097c43fdd1414668ec8ed8     4794 main/Contents-amd64.gz
764 e593f5bb98e0b6dbf5d0636ebff298b905b98a00402e2b20173fdb5da85c46d9       85 main/binary-amd64/Release
765-----BEGIN PGP SIGNATURE-----
766
767wsFcBAEBCAAGBQJj9sd6AAoJENmAoXRX9vsGOQ0P/3S63ctKl7QyxmRQ4UVJl70S
768hTxA90FbWp236nrEWw4EO/eVWiR/VbgFPacp/dyBpSmtTFl5cpOeyf2SYj5qfg5L
769cemYgUbaxRl+PBFGm7A14y82Ym0MUzF9cNWVK8bDXH9BKSljKKerXr4giOwjTkgh
770z2LoLxnrbhGkIWnSNiT0YvQrkxkSC5BjOInRiy/4Dr7LFAX/7KBzyPVwiDPxWQca
771dwtmI6EoZQP+zHDTR6RwnYOB7oME8aYIruwF9Vhu/unfdC4LpbNJDGL7VwQKUp8h
772ICupSwnRmHPV2raNBq58K6OunGvFO0oFaYUIQqbvGzu/5859YWhrdd7gBd9Fj4zI
773Ff7fHC+ZigCNCk7op4LykJ/3uJF8NvFlNxiagO+1tRko3V4tNbeSrXEKDhr5RQJz
774p/VdL1TXI/pVIobxbF5D/Lo8dCs5LjJsJ5rFlPgzjlREFn0hwKcDwB7M+rbPhuHV
7751R3lgdhW01ZghwOdTMiX1cShQwE7bvGtskn2WIHyIhEawpotGNpBFG2K5TdxfXA1
776m+wu4PLxfxOSb+VoQlH1enyDcR7m7XNtt692l++6nw3rq6Wv2zNc9DHRE+HNavJg
777zwlfH3L9OOoGfPMfRxrKqFzcob2gnKjptlHt3XpUx5ZwS4hcKB2lETT9ORVxe1NI
778rK5KKL67o5aLviVqo98l
779=MI63
780-----END PGP SIGNATURE-----
781",
782b"-----BEGIN PGP SIGNED MESSAGE-----
783
784Origin: . xenial
785Label: . xenial
786Suite: xenial
787Codename: xenial
788Date: Wed, 15 Feb 2023 23:18:08 UTC
789Architectures: amd64
790Components: main
791Description: Generated by aptly
792MD5Sum:
793 bea5a0f7c99209504f22d8faf10125fd   132287 main/binary-amd64/Packages
794 1aa4c130945a3a076a9f16546ca17a83    21467 main/binary-amd64/Packages.gz
795 f18c7f0779161104fed2aec72d9a44e2    17922 main/binary-amd64/Packages.bz2
796 fdb168fb0b8f575585d917ca0ffd98bb     4783 main/Contents-amd64.gz
797 d8c35b55bc8e48e267b9ccdaf383976d       85 main/binary-amd64/Release
798SHA1:
799 d01c164d99cf7c867b5f115770f58e4916d7a15f   132287 main/binary-amd64/Packages
800 41e78c4c558567f4936d9952eb928a32911cc56c    21467 main/binary-amd64/Packages.gz
801 8837bf2f3e2bad7c73712d003c1510c6171c53ee    17922 main/binary-amd64/Packages.bz2
802 e800a4c83e8d9ef564f5869ad838962550799c5e     4783 main/Contents-amd64.gz
803 992cb9cd8a0af2d9ad81d2b45342656d41157202       85 main/binary-amd64/Release
804SHA256:
805 9f9178b66c4d1d31d7b2b741f0835c2140552cac68861beaf2ecc55f0364c620   132287 main/binary-amd64/Packages
806 57386742060a0913236bafd5e8eb3b3334284e0d2ab8362a7c22c78175e9d89b    21467 main/binary-amd64/Packages.gz
807 ec097b64b5e3a39760a9b5ea6b02e91d0401994464dd0ce3de2a0a26a62230e2    17922 main/binary-amd64/Packages.bz2
808 e04d5fd71915c3003d55f3927e5af71a4831e30ffbb0efec6dceb36cd1b054fa     4783 main/Contents-amd64.gz
809 e593f5bb98e0b6dbf5d0636ebff298b905b98a00402e2b20173fdb5da85c46d9       85 main/binary-amd64/Release
810-----BEGIN PGP SIGNATURE-----
811
812wsFcBAABCAAGBQJj7WgwAAoJENmAoXRX9vsG69AQAJmvducnhHqCXQIsqjXrDMjU
813QUAw56MRunn7rHTFpJY0ZPLgQ5gVBibouNZ9x78wuJ784Sl+MIHC7RWdQYBEbWQ5
814haKjiI00BzDeXx4sUet1E+Ce5dhjK/UvoZIOy+ed5nv/HM7QFrvoxdADSDnYGy2o
815djFUVWR5kzkb5Tv7bcjJQWWf6JvY1Z12CgsG85ECYv2PE+tGgQjSwbxRDvFFzY1O
816Xy1EkjT+YDG6hy5CiKSZL7qPsjsLHeuRvat3oSlBWiFRnSuLOlsDozqzYMFqNx93
817GPQiFNiYEmkxDxiKLOcds7+Plz2FjQdQwv2msllJ4jA9PxYRiEbfH14/ELk+/snE
81866XID9dv91JbrwaI3NOoJZZmN+QYZ7WaAj3Uxl3cYnCGuIIt6z4KB2CYeyRa3f3K
819HbPq0mBchPPmavaQEfaNDQ+dzMuazR0VMoKfHGEp44r+XU+JH/lNzlxgQEMgVv43
8200B++zb4MYgheGUhu7Xdgd6XSQdZGxt4GieXLwIAXA0nmAFlZB7EAJcyHqz0hVo6m
821Q/m8Ja8hBw6lmyM5uCduF61BhnQDfuDQetLgGzrvOp3m2qfTag3QGtEijwhH8L2O
8223xuMqMjtJutTa557go0p+PLjAhMVQ0S7z+3aLn/368qnqlxSflDCPe4GMcaXmOCz
823RdMJMk9txqB8GM5F2sO3
824=gtrA
825-----END PGP SIGNATURE-----
826",
827b"-----BEGIN PGP SIGNED MESSAGE-----
828
829Origin: . xenial
830Label: . xenial
831Suite: xenial
832Codename: xenial
833Date: Fri, 10 Feb 2023 21:24:49 UTC
834Architectures: amd64
835Components: main
836Description: Generated by aptly
837MD5Sum:
838 1044a9316b629fb7ea4b964ecaf1ccf3    21255 main/binary-amd64/Packages.gz
839 6ee4dcbdb0c0e98e416b94f542f6cc1b    17584 main/binary-amd64/Packages.bz2
840 fdb168fb0b8f575585d917ca0ffd98bb     4783 main/Contents-amd64.gz
841 d8c35b55bc8e48e267b9ccdaf383976d       85 main/binary-amd64/Release
842 b2f1f73fabd4acfaca43d05bee1debca   130864 main/binary-amd64/Packages
843SHA1:
844 1bcb7cd08c94a3519b2dce77f3f5f5e16c312067    21255 main/binary-amd64/Packages.gz
845 61c0f6b35c7bd3a5f59094511ccd593dcb2b8c96    17584 main/binary-amd64/Packages.bz2
846 e800a4c83e8d9ef564f5869ad838962550799c5e     4783 main/Contents-amd64.gz
847 992cb9cd8a0af2d9ad81d2b45342656d41157202       85 main/binary-amd64/Release
848 4a7c86cb1c0caa36c92e1af844ebf0b7e2bf4cea   130864 main/binary-amd64/Packages
849SHA256:
850 481c1bf74f609fbf71eed01da98a05cbe884acc2efd6d0e2c1c65f9e72ddc2e6    21255 main/binary-amd64/Packages.gz
851 d9f8cc2cc5b2aa854c509caf96d9e1457e6cb0fd55597ac49408a96afb8a727b    17584 main/binary-amd64/Packages.bz2
852 e04d5fd71915c3003d55f3927e5af71a4831e30ffbb0efec6dceb36cd1b054fa     4783 main/Contents-amd64.gz
853 e593f5bb98e0b6dbf5d0636ebff298b905b98a00402e2b20173fdb5da85c46d9       85 main/binary-amd64/Release
854 d0bec4d8f926383f3d61dc79b8d5d352f71adcf8befb59e8e02aeabe8c19eeba   130864 main/binary-amd64/Packages
855-----BEGIN PGP SIGNATURE-----
856
857wsFcBAEBCAAGBQJj5rYjAAoJENmAoXRX9vsG3wUP/2X7ufCo5nJkyHhzOtTEI4Pq
858rz6P94r2S/OA7v99mVkKNyOYZ8hKMNccYumvkWaXBF+WkLemCPeJxaBbRUrulu3c
859GXNPHht8dusQYIxS2VQVYbHgXfwQ+Y3+P1wVLPNT+9Ka0POkUT4YiM1G8Zx3fwTq
860zUeCpV1TKgkrVQ4CF5DX8i9tcVmYUq8B+BouwQAFJxElM1cuYqGybG19H/od77nH
861tkv3n43P0TCZ9KR48ZXWXF+6v26SRse2YkergbNOtJwRfdMHzvc8d/nb7T/Iv3jM
862WmqUs6Ob4EioUTWYwi2H3y+LnzAPeSVEklfCS61LzlyFGelpxHGuTjaaMtCI2Bkb
863f3XyNjeVwUYmnGWrBMCI38CUnY0J0oXLrVUxYZoT0O9SSO2bpql64T2Flqn10Djk
864W8j7V9a5gNO69PkNEHWUylwolFvF/H8Zmc6QZbnnbFSpC4pMEeRhoI1v1CqPSMn8
865APOGWa1xHN9hj9g4AZfXvO56BDveo9lbNOmFs2EAmBEEj2hCiroRtuCxxDmwerq3
866MLtCJIkir3JdbefexXcbIoP5+tjl573nvKU+Kb4KhCJTBDEY6+6qZKTSBDESKTvq
867T2L60YwfXZsj6WCS9roTz9llmze3YjURbHNZpf4BO3zONwNNeqFZw3qYWNCyzRS+
868R4AjBHbzlyIGpU5BGNn3
869=KMXz
870-----END PGP SIGNATURE-----
871"
872];
873        for data in data {
874            let (signed, _) = Signed::from_bytes(data).unwrap();
875            db_a.add_release(
876                &"DBA36B5181D0C816F630E889D980A17457F6FB06".parse()?,
877                &signed,
878            )
879            .await
880            .unwrap();
881        }
882
883        let (signed, _) = Signed::from_bytes(b"-----BEGIN PGP SIGNED MESSAGE-----
884
885Origin: . xenial
886Label: . xenial
887Suite: xenial
888Codename: xenial
889Date: Wed, 15 Feb 2023 23:18:08 UTC
890Architectures: amd64
891Components: main
892Description: Generated by aptly
893MD5Sum:
894 bea5a0f7c99209504f22d8faf10125fd   132287 main/binary-amd64/Packages
895 1aa4c130945a3a076a9f16546ca17a83    21467 main/binary-amd64/Packages.gz
896 f18c7f0779161104fed2aec72d9a44e2    17922 main/binary-amd64/Packages.bz2
897 fdb168fb0b8f575585d917ca0ffd98bb     4783 main/Contents-amd64.gz
898 d8c35b55bc8e48e267b9ccdaf383976d       85 main/binary-amd64/Release
899SHA1:
900 d01c164d99cf7c867b5f115770f58e4916d7a15f   132287 main/binary-amd64/Packages
901 41e78c4c558567f4936d9952eb928a32911cc56c    21467 main/binary-amd64/Packages.gz
902 8837bf2f3e2bad7c73712d003c1510c6171c53ee    17922 main/binary-amd64/Packages.bz2
903 e800a4c83e8d9ef564f5869ad838962550799c5e     4783 main/Contents-amd64.gz
904 992cb9cd8a0af2d9ad81d2b45342656d41157202       85 main/binary-amd64/Release
905SHA256:
906 9f9178b66c4d1d31d7b2b741f0835c2140552cac68861beaf2ecc55f0364c620   132287 main/binary-amd64/Packages
907 57386742060a0913236bafd5e8eb3b3334284e0d2ab8362a7c22c78175e9d89b    21467 main/binary-amd64/Packages.gz
908 ec097b64b5e3a39760a9b5ea6b02e91d0401994464dd0ce3de2a0a26a62230e2    17922 main/binary-amd64/Packages.bz2
909 e04d5fd71915c3003d55f3927e5af71a4831e30ffbb0efec6dceb36cd1b054fa     4783 main/Contents-amd64.gz
910 e593f5bb98e0b6dbf5d0636ebff298b905b98a00402e2b20173fdb5da85c46d9       85 main/binary-amd64/Release
911-----BEGIN PGP SIGNATURE-----
912
913wsFcBAABCAAGBQJj7WgwAAoJENmAoXRX9vsG69AQAJmvducnhHqCXQIsqjXrDMjU
914QUAw56MRunn7rHTFpJY0ZPLgQ5gVBibouNZ9x78wuJ784Sl+MIHC7RWdQYBEbWQ5
915haKjiI00BzDeXx4sUet1E+Ce5dhjK/UvoZIOy+ed5nv/HM7QFrvoxdADSDnYGy2o
916djFUVWR5kzkb5Tv7bcjJQWWf6JvY1Z12CgsG85ECYv2PE+tGgQjSwbxRDvFFzY1O
917Xy1EkjT+YDG6hy5CiKSZL7qPsjsLHeuRvat3oSlBWiFRnSuLOlsDozqzYMFqNx93
918GPQiFNiYEmkxDxiKLOcds7+Plz2FjQdQwv2msllJ4jA9PxYRiEbfH14/ELk+/snE
91966XID9dv91JbrwaI3NOoJZZmN+QYZ7WaAj3Uxl3cYnCGuIIt6z4KB2CYeyRa3f3K
920HbPq0mBchPPmavaQEfaNDQ+dzMuazR0VMoKfHGEp44r+XU+JH/lNzlxgQEMgVv43
9210B++zb4MYgheGUhu7Xdgd6XSQdZGxt4GieXLwIAXA0nmAFlZB7EAJcyHqz0hVo6m
922Q/m8Ja8hBw6lmyM5uCduF61BhnQDfuDQetLgGzrvOp3m2qfTag3QGtEijwhH8L2O
9233xuMqMjtJutTa557go0p+PLjAhMVQ0S7z+3aLn/368qnqlxSflDCPe4GMcaXmOCz
924RdMJMk9txqB8GM5F2sO3
925=gtrA
926-----END PGP SIGNATURE-----
927").unwrap();
928
929        db_b.add_release(
930            &"DBA36B5181D0C816F630E889D980A17457F6FB06".parse()?,
931            &signed,
932        )
933        .await
934        .unwrap();
935
936        let keys_a = db_a.scan_keys(b"").try_collect::<Vec<_>>().await.unwrap();
937        let keys_b = db_b.scan_keys(b"").try_collect::<Vec<_>>().await.unwrap();
938        assert_eq!(keys_a.len(), 3);
939        assert_eq!(keys_b.len(), 1);
940
941        run_sync(&keyring, &mut db_a, &mut db_b).await.unwrap();
942
943        let keys_a = db_a.scan_keys(b"").try_collect::<Vec<_>>().await.unwrap();
944        let keys_b = db_b.scan_keys(b"").try_collect::<Vec<_>>().await.unwrap();
945        assert_eq!(keys_a.len(), 3);
946        assert_eq!(keys_b.len(), 3);
947
948        Ok(())
949    }
950}