1use crate::config::Repository;
2use crate::db::DatabaseClient;
3use crate::errors::*;
4use crate::fetch;
5use crate::keyring::Keyring;
6use crate::p2p;
7use crate::sync;
8use std::collections::HashMap;
9use std::convert::Infallible;
10use std::net::SocketAddr;
11use std::sync::Arc;
12use std::time::Duration;
13use tokio::sync::mpsc;
14use tokio::time;
15
16pub struct GossipStats {
17 last_announced_index: String,
18 last_announced_at: time::Instant,
19 next_idle_announce_after: Duration,
20}
21
22impl GossipStats {
23 pub fn new(idx: String) -> Self {
24 GossipStats {
25 last_announced_index: idx,
26 last_announced_at: time::Instant::now(),
27 next_idle_announce_after: p2p::GOSSIP_IDLE_ANNOUNCE_INTERVAL,
28 }
29 }
30
31 pub fn needs_announcement(&self, idx: &str) -> bool {
32 if self.last_announced_index != idx {
33 true
34 } else {
35 let elapsed = time::Instant::now().duration_since(self.last_announced_at);
36 elapsed >= self.next_idle_announce_after
37 }
38 }
39
40 pub fn update_announced_index(&mut self, idx: String) {
41 self.last_announced_index = idx;
42 self.last_announced_at = time::Instant::now();
43 }
44}
45
46pub async fn spawn_fetch_timer<D: DatabaseClient>(
47 db: &mut D,
48 keyring: Keyring,
49 repositories: Vec<Repository>,
50 proxy: Option<SocketAddr>,
51 announce_addrs: Vec<SocketAddr>,
52 p2p_tx: Option<mpsc::Sender<String>>,
53) -> Result<Infallible> {
54 let mut stats = HashMap::new();
55 for key in keyring.all_fingerprints() {
56 stats.insert(key, GossipStats::new("TODO".to_string()));
57 }
58
59 let keyring = Arc::new(Some(keyring));
60 let mut interval = time::interval(p2p::FETCH_INTERVAL - p2p::FETCH_INTERVAL_JITTER);
61
62 loop {
63 interval.tick().await;
64 p2p::random_jitter(p2p::FETCH_INTERVAL_JITTER).await;
65 info!("Fetch timer has started");
66 if let Err(err) =
67 fetch::fetch_updates(db, keyring.clone(), None, repositories.clone(), proxy).await
68 {
69 error!("Fetch timer has crashed: {err:#}");
70 } else {
71 debug!("Fetch timer has completed");
72 }
73
74 for (fp, gossip) in &mut stats {
75 let query = sync::TreeQuery {
76 fp: fp.clone(),
77 hash_algo: "sha256".to_string(),
78 prefix: None,
79 };
80
81 match db.index_from_scan(&query).await {
82 Ok((idx, count)) => {
83 debug!("Recalculated index for gossip checks: fp={fp:X} idx={idx:?} count={count:?}");
84 if count > 0 && gossip.needs_announcement(&idx) {
85 let mut msg = format!("[sync] fp={fp:X} idx={idx} count={count}");
86
87 for addr in &announce_addrs {
88 msg += &format!(" addr={addr}");
89 }
90
91 if let Some(p2p_tx) = &p2p_tx {
92 trace!("Sending to p2p channel: {:?}", msg);
93 if let Err(err) = p2p_tx.try_send(msg) {
95 warn!("Failed to send to p2p channel: {err:#}");
96 }
97 }
98 gossip.update_announced_index(idx);
99 }
100 }
101 Err(err) => {
102 error!("Failed to access database: {err:#}");
103 }
104 }
105 }
106 }
107}