apt_swarm/
sync.rs

1use crate::db::{Database, DatabaseClient};
2use crate::errors::*;
3use crate::keyring::Keyring;
4use crate::signed::Signed;
5use bstr::BStr;
6use futures::StreamExt;
7use indexmap::{IndexMap, IndexSet};
8use sequoia_openpgp::Fingerprint;
9use sha2::{Digest, Sha256};
10use std::borrow::Cow;
11use std::collections::{BTreeMap, VecDeque};
12use std::fmt;
13use std::str;
14use std::str::FromStr;
15use std::time::Duration;
16use tokio::io;
17use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
18use tokio::time;
19
20pub const MAX_LINE_LENGTH: u64 = 512;
21
22pub const SYNC_INDEX_TIMEOUT: Duration = Duration::from_secs(120);
23pub const SYNC_READ_TIMEOUT: Duration = Duration::from_secs(30);
24
25// We expect entries from 0-f
26pub const BATCH_INDEX_MAX_SIZE: usize = 16;
27
28/// If the number of entries is greater than zero, but <= this threshold, send a dump instead of an index
29pub const SPILL_THRESHOLD: usize = 1;
30
31#[derive(Debug, Clone)]
32pub enum Query {
33    Tree(TreeQuery),
34    Pex,
35    Unknown(String),
36}
37
38impl Query {
39    pub fn from_bytes(bytes: &[u8]) -> Result<Self> {
40        let line = bytes.strip_suffix(b"\n").unwrap_or(bytes);
41        let line = str::from_utf8(line).context("Query contains invalid utf8")?;
42        let query = line
43            .parse()
44            .with_context(|| anyhow!("Failed to parse input as query: {line:?}"))?;
45        Ok(query)
46    }
47}
48
49impl FromStr for Query {
50    type Err = Error;
51
52    fn from_str(query: &str) -> Result<Self> {
53        if let Some(cmd) = query.strip_prefix("//") {
54            if cmd == "pex" {
55                Ok(Query::Pex)
56            } else {
57                Ok(Query::Unknown(cmd.to_string()))
58            }
59        } else {
60            let query = query
61                .parse()
62                .context("Failed to parse input as tree-query")?;
63            Ok(Query::Tree(query))
64        }
65    }
66}
67
68#[derive(Debug, Clone)]
69pub struct TreeQuery {
70    pub fp: Fingerprint,
71    pub hash_algo: String,
72    pub prefix: Option<String>,
73}
74
75impl TreeQuery {
76    pub async fn write_to<W: AsyncWrite + Unpin>(&self, mut tx: W) -> Result<()> {
77        let mut out = format!("{:X} {}", self.fp, self.hash_algo);
78        if let Some(prefix) = &self.prefix {
79            out.push(' ');
80            out.push_str(prefix);
81        }
82        out.push('\n');
83        tx.write_all(out.as_bytes()).await?;
84        Ok(())
85    }
86
87    /// Switch to the next shard
88    pub fn increment(&mut self) -> bool {
89        if let Some(prefix) = &mut self.prefix {
90            if prefix.ends_with('f') {
91                prefix.pop();
92                true
93            } else if let Some(c) = prefix.pop() {
94                let c = match c {
95                    '0'..='8' | 'a'..='e' => (c as u8 + 1) as char,
96                    '9' => 'a',
97                    _ => c,
98                };
99                prefix.push(c);
100                false
101            } else {
102                // prefix is empty, keyspace has been traversed
103                false
104            }
105        } else {
106            debug!("Peers are already in sync, nothing to do here");
107            self.prefix = Some(String::new());
108            false
109        }
110    }
111
112    /// Traverse into the first entry in this shard
113    pub fn enter(&mut self) {
114        if let Some(prefix) = &mut self.prefix {
115            prefix.push('0');
116        } else {
117            self.prefix = Some("0".to_string());
118        }
119    }
120}
121
122impl fmt::Display for TreeQuery {
123    fn fmt(&self, w: &mut fmt::Formatter) -> fmt::Result {
124        let prefix = self.prefix.as_deref().unwrap_or("");
125        write!(w, "{:X}/{}:{}", self.fp, self.hash_algo, prefix)?;
126        Ok(())
127    }
128}
129
130impl FromStr for TreeQuery {
131    type Err = Error;
132
133    fn from_str(s: &str) -> Result<Self> {
134        let mut s = s.split(' ');
135        let fp = Fingerprint::from_str(s.next().context("Missing fingerprint")?)
136            .context("Invalid fingerprint")?;
137
138        let hash_algo = s.next().context("Missing hash algo")?;
139        if hash_algo != "sha256" {
140            bail!("Only sha256 is supported at the moment");
141        }
142
143        let prefix = s.next().map(String::from);
144
145        if let Some(garbage) = s.next() {
146            bail!("Detected trailing data, rejecting as invalid: {garbage:?}");
147        }
148
149        Ok(TreeQuery {
150            fp,
151            hash_algo: hash_algo.to_string(),
152            prefix,
153        })
154    }
155}
156
157#[derive(Debug, Default)]
158pub struct BatchIndex {
159    index: IndexMap<String, (String, usize)>,
160}
161
162impl BatchIndex {
163    pub fn new() -> Self {
164        BatchIndex::default()
165    }
166
167    pub fn add(&mut self, index: String, prefix: String, count: usize) -> Result<()> {
168        if self.index.len() < BATCH_INDEX_MAX_SIZE {
169            self.index.insert(prefix, (index, count));
170            Ok(())
171        } else {
172            bail!(
173                "Batch index is already at max capacity: {:?}",
174                self.index.len()
175            )
176        }
177    }
178
179    pub async fn write_to<W: AsyncWrite + Unpin>(&self, mut sink: W) -> Result<()> {
180        for (prefix, (index, count)) in &self.index {
181            sink.write_all(format!("{index} {prefix} {count}\n").as_bytes())
182                .await?;
183        }
184        Ok(())
185    }
186
187    pub fn parse_line(&mut self, line: &[u8]) -> Result<()> {
188        let line = line.strip_suffix(b"\n").unwrap_or(line);
189        let line = str::from_utf8(line).context("Response contains invalid utf8")?;
190
191        let mut s = line.split(' ');
192        let index = s.next().context("Missing index from response")?;
193        let prefix = s.next().context("Failed to get prefix for index")?;
194        let count = s.next().context("Failed to get number of children")?;
195        let count = count
196            .parse()
197            .context("Number of children is not a number")?;
198
199        self.add(index.to_string(), prefix.to_string(), count)
200    }
201
202    pub fn get(&self, key: &str) -> Option<&(String, usize)> {
203        self.index.get(key)
204    }
205
206    pub fn keys(&self) -> indexmap::map::Keys<String, (String, usize)> {
207        self.index.keys()
208    }
209
210    pub fn clear(&mut self) {
211        self.index.clear();
212    }
213}
214
215pub async fn index_from_scan(db: &Database, query: &TreeQuery) -> Result<(String, usize)> {
216    let prefix = query.to_string();
217
218    let mut counter = 0;
219    let mut hasher = Sha256::new();
220
221    let stream = db.scan_keys(prefix.as_bytes());
222    tokio::pin!(stream);
223    while let Some(item) = stream.next().await {
224        let hash = item.context("Failed to read from database (index_from_scan)")?;
225        hasher.update(&hash);
226        hasher.update(b"\n");
227        counter += 1;
228    }
229
230    let result = hasher.finalize();
231    Ok((format!("sha256:{result:x}"), counter))
232}
233
234pub async fn sync_yield<
235    D: DatabaseClient + Sync + Send,
236    R: AsyncRead + Unpin,
237    W: AsyncWrite + Unpin,
238>(
239    db: &mut D,
240    rx: R,
241    mut tx: W,
242    timeout: Option<Duration>,
243) -> Result<()> {
244    let mut rx = io::BufReader::new(rx);
245    loop {
246        let mut line = Vec::new();
247        let mut rrx = (&mut rx).take(MAX_LINE_LENGTH);
248        let read = rrx.read_until(b'\n', &mut line);
249
250        let n = if let Some(timeout) = timeout {
251            if let Ok(n) = time::timeout(timeout, read).await {
252                n
253            } else {
254                break;
255            }
256        } else {
257            read.await
258        }?;
259
260        if n == 0 {
261            break;
262        }
263
264        if !line.ends_with(b"\n") {
265            bail!(
266                "Client sent invalid request, exceeding size limit: {:?}",
267                BStr::new(&line)
268            );
269        }
270
271        let query = Query::from_bytes(&line)?;
272        trace!("Received query: {:?}", query);
273        match query {
274            Query::Tree(mut query) => {
275                let (index, total) = db.batch_index_from_scan(&mut query).await?;
276
277                if total > 0 && total <= SPILL_THRESHOLD {
278                    let prefix = query.to_string();
279                    for (hash, data) in db.spill(prefix.as_bytes()).await? {
280                        trace!("Sending data packet to client: {:?}", BStr::new(&hash));
281                        tx.write_all(format!(":{:x}\n", data.len()).as_bytes())
282                            .await?;
283                        tx.write_all(&data).await?;
284                    }
285                    tx.write_all(b":0\n").await?;
286                } else {
287                    index.write_to(&mut tx).await?;
288                    tx.write_all(b"\n").await?;
289                }
290            }
291            Query::Pex => {
292                // TODO
293                tx.write_all(b":0\n").await?;
294            }
295            Query::Unknown(data) => {
296                debug!("Received unknown command from network: {data:?}");
297                tx.write_all(b":0\n").await?;
298            }
299        }
300    }
301
302    Ok(())
303}
304
305#[derive(Debug, Default)]
306pub struct SyncQueue {
307    queues: BTreeMap<usize, VecDeque<Option<String>>>,
308}
309
310impl SyncQueue {
311    pub fn push(&mut self, key: Option<String>) {
312        let len = key.as_ref().map(|s| s.len()).unwrap_or(0);
313        let queue = self.queues.entry(len).or_default();
314        queue.push_back(key);
315    }
316
317    pub fn pop_next(&mut self) -> Option<Option<String>> {
318        loop {
319            if let Some(mut entry) = self.queues.last_entry() {
320                let queue = entry.get_mut();
321                if let Some(item) = queue.pop_front() {
322                    return Some(item);
323                } else {
324                    entry.remove_entry();
325                }
326            } else {
327                return None;
328            }
329        }
330    }
331}
332
333pub async fn sync_pull_key<
334    D: DatabaseClient + Sync + Send,
335    R: AsyncRead + Unpin,
336    W: AsyncWrite + Unpin,
337>(
338    db: &mut D,
339    keyring: &Keyring,
340    fp: &Fingerprint,
341    dry_run: bool,
342    mut tx: W,
343    rx: &mut io::BufReader<R>,
344) -> Result<()> {
345    let mut query = TreeQuery {
346        fp: fp.clone(),
347        hash_algo: "sha256".to_string(),
348        prefix: None,
349    };
350
351    let mut queue = SyncQueue::default();
352    queue.push(None);
353
354    while let Some(item) = queue.pop_next() {
355        query.prefix = item;
356        info!("Requesting index for: {:?}", query.to_string());
357        query.write_to(&mut tx).await?;
358
359        let (our_index, _our_count) = db.batch_index_from_scan(&mut query).await?;
360        trace!("Our index: {our_index:?}");
361
362        let mut line = Vec::new();
363        let mut their_index = BatchIndex::new();
364
365        loop {
366            line.clear();
367
368            let mut rrx = rx.take(MAX_LINE_LENGTH);
369            let read = rrx.read_until(b'\n', &mut line);
370            let n = time::timeout(SYNC_INDEX_TIMEOUT, read)
371                .await
372                .context("Request for index timed out")?
373                .context("Failed to receive response from peer")?;
374
375            if n == 0 {
376                bail!("Reached unexpected eof while enumerating service");
377            }
378
379            if !line.ends_with(b"\n") {
380                bail!(
381                    "Server sent invalid line, exceeding size limit: {:?}",
382                    BStr::new(&line)
383                );
384            }
385
386            if line == b"\n" {
387                let keys = their_index
388                    .keys()
389                    .chain(our_index.keys())
390                    .collect::<IndexSet<_>>();
391
392                for key in keys {
393                    match (their_index.get(key), our_index.get(key)) {
394                        (Some(theirs), Some(ours)) => {
395                            trace!("Comparing index shards for key={key:?}, theirs={theirs:?}, ours={ours:?}");
396
397                            if theirs.1 == 0 {
398                                trace!(
399                                    "No children in this shard (key={key:?}), moving to next one"
400                                );
401                            } else if theirs == ours {
402                                trace!("These shards are already in sync (key={key:?}), moving to next one");
403                            } else {
404                                trace!("Data to be found here (key={key:?}), trying to enumerate");
405                                queue.push(Some(key.to_owned()));
406                            }
407                        }
408                        _ => bail!("Some index shards are omitted, this is currently unsupported"),
409                    }
410                }
411
412                their_index.clear();
413                break;
414            } else if let Some(line) = line.strip_prefix(b":") {
415                let line = line.strip_suffix(b"\n").unwrap_or(line);
416                let line = str::from_utf8(line).context("Length tag has invalid utf8")?;
417                trace!("Received len tag: {:?}", line);
418                let len = usize::from_str_radix(line, 16)
419                    .with_context(|| anyhow!("Length tag is invalid number: {line:?}"))?;
420
421                if len == 0 {
422                    trace!("Received all releases from shard, moving to next one");
423                    while query.increment() {
424                        trace!("Reached last entry in shard, returning to parent");
425                    }
426                    break;
427                }
428
429                // TODO: check this tag doesn't OOM us
430                info!("Reading data packet from remote: {len:?} bytes");
431
432                let mut remaining = len;
433                let mut buf = vec![0u8; len];
434                while remaining > 0 {
435                    let read = rx.read(&mut buf[len - remaining..]);
436
437                    let n = time::timeout(SYNC_READ_TIMEOUT, read)
438                        .await
439                        .context("Read from remote timed out")?
440                        .context("Failed to receive data from peer")?;
441
442                    if n == 0 {
443                        bail!("Unexpected end of file");
444                    }
445
446                    remaining -= n;
447                    trace!("Read {}/{} bytes from remote", len - remaining, len);
448                }
449                trace!("Finished reading data packet: {:?}", buf.len());
450
451                let mut bytes = &buf[..];
452                while !bytes.is_empty() {
453                    let (signed, remaining) =
454                        Signed::from_bytes(bytes).context("Failed to parse release file")?;
455
456                    for (fp, variant) in signed.canonicalize(Some(keyring))? {
457                        let fp = fp.context(
458                            "Signature can't be imported because the signature is unverified",
459                        )?;
460                        if dry_run {
461                            debug!("Skipping insert due to dry-run");
462                        } else {
463                            db.add_release(&fp, &variant).await?;
464                        }
465                    }
466
467                    bytes = remaining;
468                }
469            } else {
470                their_index
471                    .parse_line(&line)
472                    .with_context(|| anyhow!("Failed to parse line of batch index: {line:?}"))?;
473            }
474        }
475    }
476
477    Ok(())
478}
479
480pub async fn sync_pull<
481    D: DatabaseClient + Sync + Send,
482    R: AsyncRead + Unpin,
483    W: AsyncWrite + Unpin,
484>(
485    db: &mut D,
486    keyring: &Keyring,
487    selected_keys: &[Fingerprint],
488    dry_run: bool,
489    mut tx: W,
490    rx: R,
491) -> Result<()> {
492    let selected_keys = if !selected_keys.is_empty() {
493        Cow::Borrowed(selected_keys)
494    } else {
495        Cow::Owned(keyring.all_fingerprints())
496    };
497
498    let mut rx = io::BufReader::new(rx);
499    for fp in selected_keys.iter() {
500        sync_pull_key(db, keyring, fp, dry_run, &mut tx, &mut rx).await?;
501    }
502
503    Ok(())
504}
505
506#[cfg(test)]
507mod tests {
508    use super::*;
509    use crate::db::AccessMode;
510    use futures::TryStreamExt;
511
512    fn init() {
513        let _ = env_logger::builder().is_test(true).try_init();
514    }
515
516    async fn open_temp_dbs() -> Result<(tempfile::TempDir, Database, Database)> {
517        let dir = tempfile::tempdir()?;
518        let db_a = Database::open_at(dir.path().join("a"), AccessMode::Exclusive).await?;
519        let db_b = Database::open_at(dir.path().join("b"), AccessMode::Exclusive).await?;
520        Ok((dir, db_a, db_b))
521    }
522
523    async fn run_sync(keyring: &Keyring, db_a: &mut Database, db_b: &mut Database) -> Result<()> {
524        let (client, server) = tokio::io::duplex(64);
525        let (client_rx, client_tx) = tokio::io::split(client);
526        let (server_rx, server_tx) = tokio::io::split(server);
527        let task_yield = sync_yield(db_a, server_rx, server_tx, None);
528        let task_pull = sync_pull(db_b, keyring, &[], false, client_tx, client_rx);
529
530        tokio::select! {
531            ret = task_pull => ret?,
532            ret = task_yield => bail!("Yield task was not expected to return: {ret:?}"),
533        }
534
535        Ok(())
536    }
537
538    #[tokio::test]
539    async fn test_sync_both_empty() -> Result<()> {
540        init();
541
542        let keyring =
543            Keyring::new(include_bytes!("../contrib/signal-desktop-keyring.gpg")).unwrap();
544        let (_, mut db_a, mut db_b) = open_temp_dbs().await.unwrap();
545        run_sync(&keyring, &mut db_a, &mut db_b).await.unwrap();
546
547        Ok(())
548    }
549
550    #[tokio::test]
551    async fn test_sync_full() -> Result<()> {
552        init();
553
554        let keyring =
555            Keyring::new(include_bytes!("../contrib/signal-desktop-keyring.gpg")).unwrap();
556        let (_, mut db_a, mut db_b) = open_temp_dbs().await.unwrap();
557
558        let data = [
559        b"-----BEGIN PGP SIGNED MESSAGE-----
560
561Origin: . xenial
562Label: . xenial
563Suite: xenial
564Codename: xenial
565Date: Thu, 23 Feb 2023 01:55:04 UTC
566Architectures: amd64
567Components: main
568Description: Generated by aptly
569MD5Sum:
570 cdb20787f1556bb38ae3b6017ef51327   132984 main/binary-amd64/Packages
571 001fc41d6c21eb85a43a13133584cbae    21567 main/binary-amd64/Packages.gz
572 3fb4f1a0169c3b2fff2c43c2a2277b51    17923 main/binary-amd64/Packages.bz2
573 c911b1bc4adf556f6fbd17c0c9cd8315     4794 main/Contents-amd64.gz
574 d8c35b55bc8e48e267b9ccdaf383976d       85 main/binary-amd64/Release
575SHA1:
576 455673b692a697ad3ada91a875096365f8da1524   132984 main/binary-amd64/Packages
577 e6a940039dcfc7f93f4a5501f15e75d1427b9464    21567 main/binary-amd64/Packages.gz
578 b8447294816bb063c10d0ba35f48dd5d1980f795    17923 main/binary-amd64/Packages.bz2
579 b6fd643edc8846c0914b44f3182dfc086877d944     4794 main/Contents-amd64.gz
580 992cb9cd8a0af2d9ad81d2b45342656d41157202       85 main/binary-amd64/Release
581SHA256:
582 989c22244106e44d789400d4da33d2ed64228ce94f48d1c2c37493118c992384   132984 main/binary-amd64/Packages
583 c46198172d00d4e01388832b61186a888da47e2c119c1e9dd7378fea206b1237    21567 main/binary-amd64/Packages.gz
584 b368e24d5c137448095f8940e3b371bff83e3e56159df6c58d4be83732a85554    17923 main/binary-amd64/Packages.bz2
585 bb347cbc00e02d73fef513965f1cd9f9e73100cd34097c43fdd1414668ec8ed8     4794 main/Contents-amd64.gz
586 e593f5bb98e0b6dbf5d0636ebff298b905b98a00402e2b20173fdb5da85c46d9       85 main/binary-amd64/Release
587-----BEGIN PGP SIGNATURE-----
588
589wsFcBAEBCAAGBQJj9sd6AAoJENmAoXRX9vsGOQ0P/3S63ctKl7QyxmRQ4UVJl70S
590hTxA90FbWp236nrEWw4EO/eVWiR/VbgFPacp/dyBpSmtTFl5cpOeyf2SYj5qfg5L
591cemYgUbaxRl+PBFGm7A14y82Ym0MUzF9cNWVK8bDXH9BKSljKKerXr4giOwjTkgh
592z2LoLxnrbhGkIWnSNiT0YvQrkxkSC5BjOInRiy/4Dr7LFAX/7KBzyPVwiDPxWQca
593dwtmI6EoZQP+zHDTR6RwnYOB7oME8aYIruwF9Vhu/unfdC4LpbNJDGL7VwQKUp8h
594ICupSwnRmHPV2raNBq58K6OunGvFO0oFaYUIQqbvGzu/5859YWhrdd7gBd9Fj4zI
595Ff7fHC+ZigCNCk7op4LykJ/3uJF8NvFlNxiagO+1tRko3V4tNbeSrXEKDhr5RQJz
596p/VdL1TXI/pVIobxbF5D/Lo8dCs5LjJsJ5rFlPgzjlREFn0hwKcDwB7M+rbPhuHV
5971R3lgdhW01ZghwOdTMiX1cShQwE7bvGtskn2WIHyIhEawpotGNpBFG2K5TdxfXA1
598m+wu4PLxfxOSb+VoQlH1enyDcR7m7XNtt692l++6nw3rq6Wv2zNc9DHRE+HNavJg
599zwlfH3L9OOoGfPMfRxrKqFzcob2gnKjptlHt3XpUx5ZwS4hcKB2lETT9ORVxe1NI
600rK5KKL67o5aLviVqo98l
601=MI63
602-----END PGP SIGNATURE-----
603",
604b"-----BEGIN PGP SIGNED MESSAGE-----
605
606Origin: . xenial
607Label: . xenial
608Suite: xenial
609Codename: xenial
610Date: Wed, 15 Feb 2023 23:18:08 UTC
611Architectures: amd64
612Components: main
613Description: Generated by aptly
614MD5Sum:
615 bea5a0f7c99209504f22d8faf10125fd   132287 main/binary-amd64/Packages
616 1aa4c130945a3a076a9f16546ca17a83    21467 main/binary-amd64/Packages.gz
617 f18c7f0779161104fed2aec72d9a44e2    17922 main/binary-amd64/Packages.bz2
618 fdb168fb0b8f575585d917ca0ffd98bb     4783 main/Contents-amd64.gz
619 d8c35b55bc8e48e267b9ccdaf383976d       85 main/binary-amd64/Release
620SHA1:
621 d01c164d99cf7c867b5f115770f58e4916d7a15f   132287 main/binary-amd64/Packages
622 41e78c4c558567f4936d9952eb928a32911cc56c    21467 main/binary-amd64/Packages.gz
623 8837bf2f3e2bad7c73712d003c1510c6171c53ee    17922 main/binary-amd64/Packages.bz2
624 e800a4c83e8d9ef564f5869ad838962550799c5e     4783 main/Contents-amd64.gz
625 992cb9cd8a0af2d9ad81d2b45342656d41157202       85 main/binary-amd64/Release
626SHA256:
627 9f9178b66c4d1d31d7b2b741f0835c2140552cac68861beaf2ecc55f0364c620   132287 main/binary-amd64/Packages
628 57386742060a0913236bafd5e8eb3b3334284e0d2ab8362a7c22c78175e9d89b    21467 main/binary-amd64/Packages.gz
629 ec097b64b5e3a39760a9b5ea6b02e91d0401994464dd0ce3de2a0a26a62230e2    17922 main/binary-amd64/Packages.bz2
630 e04d5fd71915c3003d55f3927e5af71a4831e30ffbb0efec6dceb36cd1b054fa     4783 main/Contents-amd64.gz
631 e593f5bb98e0b6dbf5d0636ebff298b905b98a00402e2b20173fdb5da85c46d9       85 main/binary-amd64/Release
632-----BEGIN PGP SIGNATURE-----
633
634wsFcBAABCAAGBQJj7WgwAAoJENmAoXRX9vsG69AQAJmvducnhHqCXQIsqjXrDMjU
635QUAw56MRunn7rHTFpJY0ZPLgQ5gVBibouNZ9x78wuJ784Sl+MIHC7RWdQYBEbWQ5
636haKjiI00BzDeXx4sUet1E+Ce5dhjK/UvoZIOy+ed5nv/HM7QFrvoxdADSDnYGy2o
637djFUVWR5kzkb5Tv7bcjJQWWf6JvY1Z12CgsG85ECYv2PE+tGgQjSwbxRDvFFzY1O
638Xy1EkjT+YDG6hy5CiKSZL7qPsjsLHeuRvat3oSlBWiFRnSuLOlsDozqzYMFqNx93
639GPQiFNiYEmkxDxiKLOcds7+Plz2FjQdQwv2msllJ4jA9PxYRiEbfH14/ELk+/snE
64066XID9dv91JbrwaI3NOoJZZmN+QYZ7WaAj3Uxl3cYnCGuIIt6z4KB2CYeyRa3f3K
641HbPq0mBchPPmavaQEfaNDQ+dzMuazR0VMoKfHGEp44r+XU+JH/lNzlxgQEMgVv43
6420B++zb4MYgheGUhu7Xdgd6XSQdZGxt4GieXLwIAXA0nmAFlZB7EAJcyHqz0hVo6m
643Q/m8Ja8hBw6lmyM5uCduF61BhnQDfuDQetLgGzrvOp3m2qfTag3QGtEijwhH8L2O
6443xuMqMjtJutTa557go0p+PLjAhMVQ0S7z+3aLn/368qnqlxSflDCPe4GMcaXmOCz
645RdMJMk9txqB8GM5F2sO3
646=gtrA
647-----END PGP SIGNATURE-----
648",
649b"-----BEGIN PGP SIGNED MESSAGE-----
650
651Origin: . xenial
652Label: . xenial
653Suite: xenial
654Codename: xenial
655Date: Fri, 10 Feb 2023 21:24:49 UTC
656Architectures: amd64
657Components: main
658Description: Generated by aptly
659MD5Sum:
660 1044a9316b629fb7ea4b964ecaf1ccf3    21255 main/binary-amd64/Packages.gz
661 6ee4dcbdb0c0e98e416b94f542f6cc1b    17584 main/binary-amd64/Packages.bz2
662 fdb168fb0b8f575585d917ca0ffd98bb     4783 main/Contents-amd64.gz
663 d8c35b55bc8e48e267b9ccdaf383976d       85 main/binary-amd64/Release
664 b2f1f73fabd4acfaca43d05bee1debca   130864 main/binary-amd64/Packages
665SHA1:
666 1bcb7cd08c94a3519b2dce77f3f5f5e16c312067    21255 main/binary-amd64/Packages.gz
667 61c0f6b35c7bd3a5f59094511ccd593dcb2b8c96    17584 main/binary-amd64/Packages.bz2
668 e800a4c83e8d9ef564f5869ad838962550799c5e     4783 main/Contents-amd64.gz
669 992cb9cd8a0af2d9ad81d2b45342656d41157202       85 main/binary-amd64/Release
670 4a7c86cb1c0caa36c92e1af844ebf0b7e2bf4cea   130864 main/binary-amd64/Packages
671SHA256:
672 481c1bf74f609fbf71eed01da98a05cbe884acc2efd6d0e2c1c65f9e72ddc2e6    21255 main/binary-amd64/Packages.gz
673 d9f8cc2cc5b2aa854c509caf96d9e1457e6cb0fd55597ac49408a96afb8a727b    17584 main/binary-amd64/Packages.bz2
674 e04d5fd71915c3003d55f3927e5af71a4831e30ffbb0efec6dceb36cd1b054fa     4783 main/Contents-amd64.gz
675 e593f5bb98e0b6dbf5d0636ebff298b905b98a00402e2b20173fdb5da85c46d9       85 main/binary-amd64/Release
676 d0bec4d8f926383f3d61dc79b8d5d352f71adcf8befb59e8e02aeabe8c19eeba   130864 main/binary-amd64/Packages
677-----BEGIN PGP SIGNATURE-----
678
679wsFcBAEBCAAGBQJj5rYjAAoJENmAoXRX9vsG3wUP/2X7ufCo5nJkyHhzOtTEI4Pq
680rz6P94r2S/OA7v99mVkKNyOYZ8hKMNccYumvkWaXBF+WkLemCPeJxaBbRUrulu3c
681GXNPHht8dusQYIxS2VQVYbHgXfwQ+Y3+P1wVLPNT+9Ka0POkUT4YiM1G8Zx3fwTq
682zUeCpV1TKgkrVQ4CF5DX8i9tcVmYUq8B+BouwQAFJxElM1cuYqGybG19H/od77nH
683tkv3n43P0TCZ9KR48ZXWXF+6v26SRse2YkergbNOtJwRfdMHzvc8d/nb7T/Iv3jM
684WmqUs6Ob4EioUTWYwi2H3y+LnzAPeSVEklfCS61LzlyFGelpxHGuTjaaMtCI2Bkb
685f3XyNjeVwUYmnGWrBMCI38CUnY0J0oXLrVUxYZoT0O9SSO2bpql64T2Flqn10Djk
686W8j7V9a5gNO69PkNEHWUylwolFvF/H8Zmc6QZbnnbFSpC4pMEeRhoI1v1CqPSMn8
687APOGWa1xHN9hj9g4AZfXvO56BDveo9lbNOmFs2EAmBEEj2hCiroRtuCxxDmwerq3
688MLtCJIkir3JdbefexXcbIoP5+tjl573nvKU+Kb4KhCJTBDEY6+6qZKTSBDESKTvq
689T2L60YwfXZsj6WCS9roTz9llmze3YjURbHNZpf4BO3zONwNNeqFZw3qYWNCyzRS+
690R4AjBHbzlyIGpU5BGNn3
691=KMXz
692-----END PGP SIGNATURE-----
693"
694];
695        for data in data {
696            let (signed, _) = Signed::from_bytes(data).unwrap();
697            db_a.add_release(
698                &"DBA36B5181D0C816F630E889D980A17457F6FB06".parse()?,
699                &signed,
700            )
701            .await
702            .unwrap();
703        }
704
705        let keys_a = db_a.scan_keys(b"").try_collect::<Vec<_>>().await.unwrap();
706        let keys_b = db_b.scan_keys(b"").try_collect::<Vec<_>>().await.unwrap();
707        assert_eq!(keys_a.len(), 3);
708        assert_eq!(keys_b.len(), 0);
709
710        run_sync(&keyring, &mut db_a, &mut db_b).await.unwrap();
711
712        let keys_a = db_a.scan_keys(b"").try_collect::<Vec<_>>().await.unwrap();
713        let keys_b = db_b.scan_keys(b"").try_collect::<Vec<_>>().await.unwrap();
714        assert_eq!(keys_a.len(), 3);
715        assert_eq!(keys_b.len(), 3);
716
717        Ok(())
718    }
719
720    #[tokio::test]
721    async fn test_sync_from_partial() -> Result<()> {
722        init();
723
724        let keyring =
725            Keyring::new(include_bytes!("../contrib/signal-desktop-keyring.gpg")).unwrap();
726        let (_, mut db_a, mut db_b) = open_temp_dbs().await.unwrap();
727
728        let data = [
729        b"-----BEGIN PGP SIGNED MESSAGE-----
730
731Origin: . xenial
732Label: . xenial
733Suite: xenial
734Codename: xenial
735Date: Thu, 23 Feb 2023 01:55:04 UTC
736Architectures: amd64
737Components: main
738Description: Generated by aptly
739MD5Sum:
740 cdb20787f1556bb38ae3b6017ef51327   132984 main/binary-amd64/Packages
741 001fc41d6c21eb85a43a13133584cbae    21567 main/binary-amd64/Packages.gz
742 3fb4f1a0169c3b2fff2c43c2a2277b51    17923 main/binary-amd64/Packages.bz2
743 c911b1bc4adf556f6fbd17c0c9cd8315     4794 main/Contents-amd64.gz
744 d8c35b55bc8e48e267b9ccdaf383976d       85 main/binary-amd64/Release
745SHA1:
746 455673b692a697ad3ada91a875096365f8da1524   132984 main/binary-amd64/Packages
747 e6a940039dcfc7f93f4a5501f15e75d1427b9464    21567 main/binary-amd64/Packages.gz
748 b8447294816bb063c10d0ba35f48dd5d1980f795    17923 main/binary-amd64/Packages.bz2
749 b6fd643edc8846c0914b44f3182dfc086877d944     4794 main/Contents-amd64.gz
750 992cb9cd8a0af2d9ad81d2b45342656d41157202       85 main/binary-amd64/Release
751SHA256:
752 989c22244106e44d789400d4da33d2ed64228ce94f48d1c2c37493118c992384   132984 main/binary-amd64/Packages
753 c46198172d00d4e01388832b61186a888da47e2c119c1e9dd7378fea206b1237    21567 main/binary-amd64/Packages.gz
754 b368e24d5c137448095f8940e3b371bff83e3e56159df6c58d4be83732a85554    17923 main/binary-amd64/Packages.bz2
755 bb347cbc00e02d73fef513965f1cd9f9e73100cd34097c43fdd1414668ec8ed8     4794 main/Contents-amd64.gz
756 e593f5bb98e0b6dbf5d0636ebff298b905b98a00402e2b20173fdb5da85c46d9       85 main/binary-amd64/Release
757-----BEGIN PGP SIGNATURE-----
758
759wsFcBAEBCAAGBQJj9sd6AAoJENmAoXRX9vsGOQ0P/3S63ctKl7QyxmRQ4UVJl70S
760hTxA90FbWp236nrEWw4EO/eVWiR/VbgFPacp/dyBpSmtTFl5cpOeyf2SYj5qfg5L
761cemYgUbaxRl+PBFGm7A14y82Ym0MUzF9cNWVK8bDXH9BKSljKKerXr4giOwjTkgh
762z2LoLxnrbhGkIWnSNiT0YvQrkxkSC5BjOInRiy/4Dr7LFAX/7KBzyPVwiDPxWQca
763dwtmI6EoZQP+zHDTR6RwnYOB7oME8aYIruwF9Vhu/unfdC4LpbNJDGL7VwQKUp8h
764ICupSwnRmHPV2raNBq58K6OunGvFO0oFaYUIQqbvGzu/5859YWhrdd7gBd9Fj4zI
765Ff7fHC+ZigCNCk7op4LykJ/3uJF8NvFlNxiagO+1tRko3V4tNbeSrXEKDhr5RQJz
766p/VdL1TXI/pVIobxbF5D/Lo8dCs5LjJsJ5rFlPgzjlREFn0hwKcDwB7M+rbPhuHV
7671R3lgdhW01ZghwOdTMiX1cShQwE7bvGtskn2WIHyIhEawpotGNpBFG2K5TdxfXA1
768m+wu4PLxfxOSb+VoQlH1enyDcR7m7XNtt692l++6nw3rq6Wv2zNc9DHRE+HNavJg
769zwlfH3L9OOoGfPMfRxrKqFzcob2gnKjptlHt3XpUx5ZwS4hcKB2lETT9ORVxe1NI
770rK5KKL67o5aLviVqo98l
771=MI63
772-----END PGP SIGNATURE-----
773",
774b"-----BEGIN PGP SIGNED MESSAGE-----
775
776Origin: . xenial
777Label: . xenial
778Suite: xenial
779Codename: xenial
780Date: Wed, 15 Feb 2023 23:18:08 UTC
781Architectures: amd64
782Components: main
783Description: Generated by aptly
784MD5Sum:
785 bea5a0f7c99209504f22d8faf10125fd   132287 main/binary-amd64/Packages
786 1aa4c130945a3a076a9f16546ca17a83    21467 main/binary-amd64/Packages.gz
787 f18c7f0779161104fed2aec72d9a44e2    17922 main/binary-amd64/Packages.bz2
788 fdb168fb0b8f575585d917ca0ffd98bb     4783 main/Contents-amd64.gz
789 d8c35b55bc8e48e267b9ccdaf383976d       85 main/binary-amd64/Release
790SHA1:
791 d01c164d99cf7c867b5f115770f58e4916d7a15f   132287 main/binary-amd64/Packages
792 41e78c4c558567f4936d9952eb928a32911cc56c    21467 main/binary-amd64/Packages.gz
793 8837bf2f3e2bad7c73712d003c1510c6171c53ee    17922 main/binary-amd64/Packages.bz2
794 e800a4c83e8d9ef564f5869ad838962550799c5e     4783 main/Contents-amd64.gz
795 992cb9cd8a0af2d9ad81d2b45342656d41157202       85 main/binary-amd64/Release
796SHA256:
797 9f9178b66c4d1d31d7b2b741f0835c2140552cac68861beaf2ecc55f0364c620   132287 main/binary-amd64/Packages
798 57386742060a0913236bafd5e8eb3b3334284e0d2ab8362a7c22c78175e9d89b    21467 main/binary-amd64/Packages.gz
799 ec097b64b5e3a39760a9b5ea6b02e91d0401994464dd0ce3de2a0a26a62230e2    17922 main/binary-amd64/Packages.bz2
800 e04d5fd71915c3003d55f3927e5af71a4831e30ffbb0efec6dceb36cd1b054fa     4783 main/Contents-amd64.gz
801 e593f5bb98e0b6dbf5d0636ebff298b905b98a00402e2b20173fdb5da85c46d9       85 main/binary-amd64/Release
802-----BEGIN PGP SIGNATURE-----
803
804wsFcBAABCAAGBQJj7WgwAAoJENmAoXRX9vsG69AQAJmvducnhHqCXQIsqjXrDMjU
805QUAw56MRunn7rHTFpJY0ZPLgQ5gVBibouNZ9x78wuJ784Sl+MIHC7RWdQYBEbWQ5
806haKjiI00BzDeXx4sUet1E+Ce5dhjK/UvoZIOy+ed5nv/HM7QFrvoxdADSDnYGy2o
807djFUVWR5kzkb5Tv7bcjJQWWf6JvY1Z12CgsG85ECYv2PE+tGgQjSwbxRDvFFzY1O
808Xy1EkjT+YDG6hy5CiKSZL7qPsjsLHeuRvat3oSlBWiFRnSuLOlsDozqzYMFqNx93
809GPQiFNiYEmkxDxiKLOcds7+Plz2FjQdQwv2msllJ4jA9PxYRiEbfH14/ELk+/snE
81066XID9dv91JbrwaI3NOoJZZmN+QYZ7WaAj3Uxl3cYnCGuIIt6z4KB2CYeyRa3f3K
811HbPq0mBchPPmavaQEfaNDQ+dzMuazR0VMoKfHGEp44r+XU+JH/lNzlxgQEMgVv43
8120B++zb4MYgheGUhu7Xdgd6XSQdZGxt4GieXLwIAXA0nmAFlZB7EAJcyHqz0hVo6m
813Q/m8Ja8hBw6lmyM5uCduF61BhnQDfuDQetLgGzrvOp3m2qfTag3QGtEijwhH8L2O
8143xuMqMjtJutTa557go0p+PLjAhMVQ0S7z+3aLn/368qnqlxSflDCPe4GMcaXmOCz
815RdMJMk9txqB8GM5F2sO3
816=gtrA
817-----END PGP SIGNATURE-----
818",
819b"-----BEGIN PGP SIGNED MESSAGE-----
820
821Origin: . xenial
822Label: . xenial
823Suite: xenial
824Codename: xenial
825Date: Fri, 10 Feb 2023 21:24:49 UTC
826Architectures: amd64
827Components: main
828Description: Generated by aptly
829MD5Sum:
830 1044a9316b629fb7ea4b964ecaf1ccf3    21255 main/binary-amd64/Packages.gz
831 6ee4dcbdb0c0e98e416b94f542f6cc1b    17584 main/binary-amd64/Packages.bz2
832 fdb168fb0b8f575585d917ca0ffd98bb     4783 main/Contents-amd64.gz
833 d8c35b55bc8e48e267b9ccdaf383976d       85 main/binary-amd64/Release
834 b2f1f73fabd4acfaca43d05bee1debca   130864 main/binary-amd64/Packages
835SHA1:
836 1bcb7cd08c94a3519b2dce77f3f5f5e16c312067    21255 main/binary-amd64/Packages.gz
837 61c0f6b35c7bd3a5f59094511ccd593dcb2b8c96    17584 main/binary-amd64/Packages.bz2
838 e800a4c83e8d9ef564f5869ad838962550799c5e     4783 main/Contents-amd64.gz
839 992cb9cd8a0af2d9ad81d2b45342656d41157202       85 main/binary-amd64/Release
840 4a7c86cb1c0caa36c92e1af844ebf0b7e2bf4cea   130864 main/binary-amd64/Packages
841SHA256:
842 481c1bf74f609fbf71eed01da98a05cbe884acc2efd6d0e2c1c65f9e72ddc2e6    21255 main/binary-amd64/Packages.gz
843 d9f8cc2cc5b2aa854c509caf96d9e1457e6cb0fd55597ac49408a96afb8a727b    17584 main/binary-amd64/Packages.bz2
844 e04d5fd71915c3003d55f3927e5af71a4831e30ffbb0efec6dceb36cd1b054fa     4783 main/Contents-amd64.gz
845 e593f5bb98e0b6dbf5d0636ebff298b905b98a00402e2b20173fdb5da85c46d9       85 main/binary-amd64/Release
846 d0bec4d8f926383f3d61dc79b8d5d352f71adcf8befb59e8e02aeabe8c19eeba   130864 main/binary-amd64/Packages
847-----BEGIN PGP SIGNATURE-----
848
849wsFcBAEBCAAGBQJj5rYjAAoJENmAoXRX9vsG3wUP/2X7ufCo5nJkyHhzOtTEI4Pq
850rz6P94r2S/OA7v99mVkKNyOYZ8hKMNccYumvkWaXBF+WkLemCPeJxaBbRUrulu3c
851GXNPHht8dusQYIxS2VQVYbHgXfwQ+Y3+P1wVLPNT+9Ka0POkUT4YiM1G8Zx3fwTq
852zUeCpV1TKgkrVQ4CF5DX8i9tcVmYUq8B+BouwQAFJxElM1cuYqGybG19H/od77nH
853tkv3n43P0TCZ9KR48ZXWXF+6v26SRse2YkergbNOtJwRfdMHzvc8d/nb7T/Iv3jM
854WmqUs6Ob4EioUTWYwi2H3y+LnzAPeSVEklfCS61LzlyFGelpxHGuTjaaMtCI2Bkb
855f3XyNjeVwUYmnGWrBMCI38CUnY0J0oXLrVUxYZoT0O9SSO2bpql64T2Flqn10Djk
856W8j7V9a5gNO69PkNEHWUylwolFvF/H8Zmc6QZbnnbFSpC4pMEeRhoI1v1CqPSMn8
857APOGWa1xHN9hj9g4AZfXvO56BDveo9lbNOmFs2EAmBEEj2hCiroRtuCxxDmwerq3
858MLtCJIkir3JdbefexXcbIoP5+tjl573nvKU+Kb4KhCJTBDEY6+6qZKTSBDESKTvq
859T2L60YwfXZsj6WCS9roTz9llmze3YjURbHNZpf4BO3zONwNNeqFZw3qYWNCyzRS+
860R4AjBHbzlyIGpU5BGNn3
861=KMXz
862-----END PGP SIGNATURE-----
863"
864];
865        for data in data {
866            let (signed, _) = Signed::from_bytes(data).unwrap();
867            db_a.add_release(
868                &"DBA36B5181D0C816F630E889D980A17457F6FB06".parse()?,
869                &signed,
870            )
871            .await
872            .unwrap();
873        }
874
875        let (signed, _) = Signed::from_bytes(b"-----BEGIN PGP SIGNED MESSAGE-----
876
877Origin: . xenial
878Label: . xenial
879Suite: xenial
880Codename: xenial
881Date: Wed, 15 Feb 2023 23:18:08 UTC
882Architectures: amd64
883Components: main
884Description: Generated by aptly
885MD5Sum:
886 bea5a0f7c99209504f22d8faf10125fd   132287 main/binary-amd64/Packages
887 1aa4c130945a3a076a9f16546ca17a83    21467 main/binary-amd64/Packages.gz
888 f18c7f0779161104fed2aec72d9a44e2    17922 main/binary-amd64/Packages.bz2
889 fdb168fb0b8f575585d917ca0ffd98bb     4783 main/Contents-amd64.gz
890 d8c35b55bc8e48e267b9ccdaf383976d       85 main/binary-amd64/Release
891SHA1:
892 d01c164d99cf7c867b5f115770f58e4916d7a15f   132287 main/binary-amd64/Packages
893 41e78c4c558567f4936d9952eb928a32911cc56c    21467 main/binary-amd64/Packages.gz
894 8837bf2f3e2bad7c73712d003c1510c6171c53ee    17922 main/binary-amd64/Packages.bz2
895 e800a4c83e8d9ef564f5869ad838962550799c5e     4783 main/Contents-amd64.gz
896 992cb9cd8a0af2d9ad81d2b45342656d41157202       85 main/binary-amd64/Release
897SHA256:
898 9f9178b66c4d1d31d7b2b741f0835c2140552cac68861beaf2ecc55f0364c620   132287 main/binary-amd64/Packages
899 57386742060a0913236bafd5e8eb3b3334284e0d2ab8362a7c22c78175e9d89b    21467 main/binary-amd64/Packages.gz
900 ec097b64b5e3a39760a9b5ea6b02e91d0401994464dd0ce3de2a0a26a62230e2    17922 main/binary-amd64/Packages.bz2
901 e04d5fd71915c3003d55f3927e5af71a4831e30ffbb0efec6dceb36cd1b054fa     4783 main/Contents-amd64.gz
902 e593f5bb98e0b6dbf5d0636ebff298b905b98a00402e2b20173fdb5da85c46d9       85 main/binary-amd64/Release
903-----BEGIN PGP SIGNATURE-----
904
905wsFcBAABCAAGBQJj7WgwAAoJENmAoXRX9vsG69AQAJmvducnhHqCXQIsqjXrDMjU
906QUAw56MRunn7rHTFpJY0ZPLgQ5gVBibouNZ9x78wuJ784Sl+MIHC7RWdQYBEbWQ5
907haKjiI00BzDeXx4sUet1E+Ce5dhjK/UvoZIOy+ed5nv/HM7QFrvoxdADSDnYGy2o
908djFUVWR5kzkb5Tv7bcjJQWWf6JvY1Z12CgsG85ECYv2PE+tGgQjSwbxRDvFFzY1O
909Xy1EkjT+YDG6hy5CiKSZL7qPsjsLHeuRvat3oSlBWiFRnSuLOlsDozqzYMFqNx93
910GPQiFNiYEmkxDxiKLOcds7+Plz2FjQdQwv2msllJ4jA9PxYRiEbfH14/ELk+/snE
91166XID9dv91JbrwaI3NOoJZZmN+QYZ7WaAj3Uxl3cYnCGuIIt6z4KB2CYeyRa3f3K
912HbPq0mBchPPmavaQEfaNDQ+dzMuazR0VMoKfHGEp44r+XU+JH/lNzlxgQEMgVv43
9130B++zb4MYgheGUhu7Xdgd6XSQdZGxt4GieXLwIAXA0nmAFlZB7EAJcyHqz0hVo6m
914Q/m8Ja8hBw6lmyM5uCduF61BhnQDfuDQetLgGzrvOp3m2qfTag3QGtEijwhH8L2O
9153xuMqMjtJutTa557go0p+PLjAhMVQ0S7z+3aLn/368qnqlxSflDCPe4GMcaXmOCz
916RdMJMk9txqB8GM5F2sO3
917=gtrA
918-----END PGP SIGNATURE-----
919").unwrap();
920
921        db_b.add_release(
922            &"DBA36B5181D0C816F630E889D980A17457F6FB06".parse()?,
923            &signed,
924        )
925        .await
926        .unwrap();
927
928        let keys_a = db_a.scan_keys(b"").try_collect::<Vec<_>>().await.unwrap();
929        let keys_b = db_b.scan_keys(b"").try_collect::<Vec<_>>().await.unwrap();
930        assert_eq!(keys_a.len(), 3);
931        assert_eq!(keys_b.len(), 1);
932
933        run_sync(&keyring, &mut db_a, &mut db_b).await.unwrap();
934
935        let keys_a = db_a.scan_keys(b"").try_collect::<Vec<_>>().await.unwrap();
936        let keys_b = db_b.scan_keys(b"").try_collect::<Vec<_>>().await.unwrap();
937        assert_eq!(keys_a.len(), 3);
938        assert_eq!(keys_b.len(), 3);
939
940        Ok(())
941    }
942}