Skip to main content

radicle_protocol/
service.rs

1#![allow(clippy::too_many_arguments)]
2#![deny(clippy::unwrap_used)]
3pub mod command;
4pub use command::{Command, QueryState};
5
6pub mod filter;
7pub mod gossip;
8pub mod io;
9pub mod limiter;
10pub mod message;
11pub mod session;
12
13use std::collections::hash_map::Entry;
14use std::collections::{BTreeSet, HashMap, HashSet};
15use std::net::IpAddr;
16use std::ops::{Deref, DerefMut};
17use std::sync::Arc;
18use std::{fmt, net, time};
19
20use crossbeam_channel as chan;
21use fastrand::Rng;
22use localtime::{LocalDuration, LocalTime};
23use log::*;
24use nonempty::NonEmpty;
25
26use radicle::identity::Doc;
27use radicle::node;
28use radicle::node::address;
29use radicle::node::address::Store as _;
30use radicle::node::address::{AddressBook, AddressType, KnownAddress};
31use radicle::node::config::{PeerConfig, RateLimit};
32use radicle::node::device::Device;
33use radicle::node::refs::Store as _;
34use radicle::node::routing::Store as _;
35use radicle::node::seed;
36use radicle::node::seed::Store as _;
37use radicle::node::{Penalty, Severity};
38use radicle::storage::refs::{FeatureLevel, SIGREFS_BRANCH};
39use radicle::storage::{RepositoryError, RepositoryInfo, SignedRefsInfo};
40use radicle_fetch::policy::SeedingPolicy;
41
42use crate::fetcher;
43use crate::fetcher::FetcherState;
44use crate::fetcher::RefsToFetch;
45use crate::fetcher::service::FetcherService;
46use crate::service::gossip::Store as _;
47use crate::service::message::{
48    Announcement, AnnouncementMessage, Info, NodeAnnouncement, Ping, RefsAnnouncement, RefsStatus,
49};
50use crate::service::policy::{Scope, store::Write};
51use radicle::identity::RepoId;
52use radicle::node::events::Emitter;
53use radicle::node::routing;
54use radicle::node::routing::InsertResult;
55use radicle::node::{Address, Features, FetchResult, HostName, Seed, Seeds, SyncStatus, SyncedAt};
56use radicle::prelude::*;
57use radicle::storage;
58use radicle::storage::{Namespaces, ReadStorage, refs::RefsAt};
59// use radicle::worker::fetch;
60// use crate::worker::FetchError;
61use radicle::crypto;
62use radicle::node::Link;
63use radicle::node::PROTOCOL_VERSION;
64
65use crate::bounded::BoundedVec;
66use crate::service::filter::Filter;
67pub use crate::service::message::{Message, ZeroBytes};
68pub use crate::service::session::{QueuedFetch, Session};
69use crate::worker::FetchError;
70use radicle::node::events::{Event, Events};
71use radicle::node::{Config, NodeId};
72
73use radicle::node::policy::config as policy;
74
75use self::io::Outbox;
76use self::limiter::RateLimiter;
77use self::message::InventoryAnnouncement;
78use self::policy::NamespacesError;
79
80/// How often to run the "idle" task.
81pub const IDLE_INTERVAL: LocalDuration = LocalDuration::from_secs(30);
82/// How often to run the "gossip" task.
83pub const GOSSIP_INTERVAL: LocalDuration = LocalDuration::from_secs(6);
84/// How often to run the "announce" task.
85pub const ANNOUNCE_INTERVAL: LocalDuration = LocalDuration::from_mins(60);
86/// How often to run the "sync" task.
87pub const SYNC_INTERVAL: LocalDuration = LocalDuration::from_secs(60);
88/// How often to run the "prune" task.
89pub const PRUNE_INTERVAL: LocalDuration = LocalDuration::from_mins(30);
90/// Duration to wait on an unresponsive peer before dropping its connection.
91pub const STALE_CONNECTION_TIMEOUT: LocalDuration = LocalDuration::from_mins(2);
92/// How much time should pass after a peer was last active for a *ping* to be sent.
93pub const KEEP_ALIVE_DELTA: LocalDuration = LocalDuration::from_mins(1);
94/// Maximum number of latency values to keep for a session.
95pub const MAX_LATENCIES: usize = 16;
96/// Maximum time difference between the local time, and an announcement timestamp.
97pub const MAX_TIME_DELTA: LocalDuration = LocalDuration::from_mins(60);
98/// Maximum attempts to connect to a peer before we give up.
99pub const MAX_CONNECTION_ATTEMPTS: usize = 3;
100/// How far back from the present time should we request gossip messages when connecting to a peer,
101/// when we come online for the first time.
102pub const INITIAL_SUBSCRIBE_BACKLOG_DELTA: LocalDuration = LocalDuration::from_mins(60 * 24);
103/// When subscribing, what margin of error do we give ourselves. A greater delta means we ask for
104/// messages further back than strictly necessary, to account for missed messages.
105pub const SUBSCRIBE_BACKLOG_DELTA: LocalDuration = LocalDuration::from_mins(3);
106/// Minimum amount of time to wait before reconnecting to a peer.
107pub const MIN_RECONNECTION_DELTA: LocalDuration = LocalDuration::from_secs(3);
108/// Maximum amount of time to wait before reconnecting to a peer.
109pub const MAX_RECONNECTION_DELTA: LocalDuration = LocalDuration::from_mins(60);
110/// Connection retry delta used for ephemeral peers that failed to connect previously.
111pub const CONNECTION_RETRY_DELTA: LocalDuration = LocalDuration::from_mins(10);
112/// How long to wait for a fetch to stall before aborting, default is 30s.
113pub const FETCH_TIMEOUT: time::Duration = time::Duration::from_secs(30);
114/// Target number of peers to maintain connections to.
115pub const TARGET_OUTBOUND_PEERS: usize = 8;
116
117/// Maximum external address limit imposed by message size limits.
118pub use message::ADDRESS_LIMIT;
119/// Maximum inventory limit imposed by message size limits.
120pub use message::INVENTORY_LIMIT;
121/// Maximum number of project git references imposed by message size limits.
122pub use message::REF_REMOTE_LIMIT;
123
124/// Metrics we track.
125#[derive(Clone, Debug, Default, serde::Serialize)]
126#[serde(rename_all = "camelCase")]
127pub struct Metrics {
128    /// Metrics for each peer.
129    pub peers: HashMap<NodeId, PeerMetrics>,
130    /// Tasks queued in worker queue.
131    pub worker_queue_size: usize,
132    /// Current open channel count.
133    pub open_channels: usize,
134}
135
136impl Metrics {
137    /// Get metrics for the given peer.
138    pub fn peer(&mut self, nid: NodeId) -> &mut PeerMetrics {
139        self.peers.entry(nid).or_default()
140    }
141}
142
143/// Per-peer metrics we track.
144#[derive(Clone, Debug, Default, serde::Serialize)]
145#[serde(rename_all = "camelCase")]
146pub struct PeerMetrics {
147    pub received_git_bytes: usize,
148    pub received_fetch_requests: usize,
149    pub received_bytes: usize,
150    pub received_gossip_messages: usize,
151    pub sent_bytes: usize,
152    pub sent_fetch_requests: usize,
153    pub sent_git_bytes: usize,
154    pub sent_gossip_messages: usize,
155    pub streams_opened: usize,
156    pub inbound_connection_attempts: usize,
157    pub outbound_connection_attempts: usize,
158    pub disconnects: usize,
159}
160
161/// Result of syncing our routing table with a node's inventory.
162#[derive(Default)]
163struct SyncedRouting {
164    /// Repo entries added.
165    added: Vec<RepoId>,
166    /// Repo entries removed.
167    removed: Vec<RepoId>,
168    /// Repo entries updated (time).
169    updated: Vec<RepoId>,
170}
171
172impl SyncedRouting {
173    fn is_empty(&self) -> bool {
174        self.added.is_empty() && self.removed.is_empty() && self.updated.is_empty()
175    }
176}
177
178/// A peer we can connect to.
179#[derive(Debug, Clone)]
180struct Peer {
181    nid: NodeId,
182    addresses: Vec<KnownAddress>,
183    penalty: Penalty,
184}
185
186/// General service error.
187#[derive(thiserror::Error, Debug)]
188pub enum Error {
189    #[error(transparent)]
190    Git(#[from] radicle::git::raw::Error),
191    #[error(transparent)]
192    Storage(#[from] storage::Error),
193    #[error(transparent)]
194    Gossip(#[from] gossip::Error),
195    #[error(transparent)]
196    Refs(#[from] storage::refs::Error),
197    #[error(transparent)]
198    Routing(#[from] routing::Error),
199    #[error(transparent)]
200    Address(#[from] address::Error),
201    #[error(transparent)]
202    Database(#[from] node::db::Error),
203    #[error(transparent)]
204    Seeds(#[from] seed::Error),
205    #[error(transparent)]
206    Policy(#[from] policy::Error),
207    #[error(transparent)]
208    Repository(#[from] radicle::storage::RepositoryError),
209    #[error("namespaces error: {0}")]
210    Namespaces(Box<NamespacesError>),
211}
212
213impl From<NamespacesError> for Error {
214    fn from(e: NamespacesError) -> Self {
215        Self::Namespaces(Box::new(e))
216    }
217}
218
219#[derive(thiserror::Error, Debug)]
220pub enum ConnectError {
221    #[error("attempted connection to peer {nid} which already has a session")]
222    SessionExists { nid: NodeId },
223    #[error("attempted connection to self")]
224    SelfConnection,
225    #[error("outbound connection limit reached when attempting {nid} ({addr})")]
226    LimitReached { nid: NodeId, addr: Address },
227    #[error(
228        "attempted connection to {nid}, via {addr} but addresses of this kind are not supported"
229    )]
230    UnsupportedAddress { nid: NodeId, addr: Address },
231    #[error("attempted connection with blocked peer {nid}")]
232    Blocked { nid: NodeId },
233}
234
235/// A store for all node data.
236pub trait Store:
237    address::Store + gossip::Store + routing::Store + seed::Store + node::refs::Store
238{
239}
240
241impl Store for radicle::node::Database {}
242
243/// Fetch state for an ongoing fetch.
244#[derive(Debug)]
245pub struct FetchState {
246    /// Node we're fetching from.
247    pub from: NodeId,
248    /// What refs we're fetching.
249    pub refs_at: Vec<RefsAt>,
250    /// Channels waiting for fetch results.
251    pub subscribers: Vec<chan::Sender<FetchResult>>,
252}
253
254/// Holds all node stores.
255#[derive(Debug)]
256pub struct Stores<D>(D);
257
258impl<D> Stores<D>
259where
260    D: Store,
261{
262    /// Get the database as a routing store.
263    pub fn routing(&self) -> &impl routing::Store {
264        &self.0
265    }
266
267    /// Get the database as a routing store, mutably.
268    pub fn routing_mut(&mut self) -> &mut impl routing::Store {
269        &mut self.0
270    }
271
272    /// Get the database as an address store.
273    pub fn addresses(&self) -> &impl address::Store {
274        &self.0
275    }
276
277    /// Get the database as an address store, mutably.
278    pub fn addresses_mut(&mut self) -> &mut impl address::Store {
279        &mut self.0
280    }
281
282    /// Get the database as a gossip store.
283    pub fn gossip(&self) -> &impl gossip::Store {
284        &self.0
285    }
286
287    /// Get the database as a gossip store, mutably.
288    pub fn gossip_mut(&mut self) -> &mut impl gossip::Store {
289        &mut self.0
290    }
291
292    /// Get the database as a seed store.
293    pub fn seeds(&self) -> &impl seed::Store {
294        &self.0
295    }
296
297    /// Get the database as a seed store, mutably.
298    pub fn seeds_mut(&mut self) -> &mut impl seed::Store {
299        &mut self.0
300    }
301
302    /// Get the database as a refs db.
303    pub fn refs(&self) -> &impl node::refs::Store {
304        &self.0
305    }
306
307    /// Get the database as a refs db, mutably.
308    pub fn refs_mut(&mut self) -> &mut impl node::refs::Store {
309        &mut self.0
310    }
311}
312
313impl<D> AsMut<D> for Stores<D> {
314    fn as_mut(&mut self) -> &mut D {
315        &mut self.0
316    }
317}
318
319impl<D> From<D> for Stores<D> {
320    fn from(db: D) -> Self {
321        Self(db)
322    }
323}
324
325/// The node service.
326#[derive(Debug)]
327pub struct Service<D, S, G> {
328    /// Service configuration.
329    config: Config,
330    /// Our cryptographic signer and key.
331    signer: Device<G>,
332    /// Project storage.
333    storage: S,
334    /// Node database.
335    db: Stores<D>,
336    /// Policy configuration.
337    policies: policy::Config<Write>,
338    /// Peer sessions, currently or recently connected.
339    sessions: Sessions,
340    /// Clock. Tells the time.
341    clock: LocalTime,
342    /// Who relayed what announcement to us. We keep track of this to ensure that
343    /// we don't relay messages to nodes that already know about these messages.
344    relayed_by: HashMap<gossip::AnnouncementId, Vec<NodeId>>,
345    /// I/O outbox.
346    outbox: Outbox,
347    /// Cached local node announcement.
348    node: NodeAnnouncement,
349    /// Cached local inventory announcement.
350    inventory: InventoryAnnouncement,
351    /// Source of entropy.
352    rng: Rng,
353    fetcher: FetcherService<command::Responder<FetchResult>>,
354    /// Request/connection rate limiter.
355    limiter: RateLimiter,
356    /// Current seeded repositories bloom filter.
357    filter: Filter,
358    /// Last time the service was idle.
359    last_idle: LocalTime,
360    /// Last time the gossip messages were relayed.
361    last_gossip: LocalTime,
362    /// Last time the service synced.
363    last_sync: LocalTime,
364    /// Last time the service routing table was pruned.
365    last_prune: LocalTime,
366    /// Last time the announcement task was run.
367    last_announce: LocalTime,
368    /// Timestamp of last local inventory announced.
369    last_inventory: LocalTime,
370    /// Last timestamp used for announcements.
371    last_timestamp: Timestamp,
372    /// Time when the service was initialized, or `None` if it wasn't initialized.
373    started_at: Option<LocalTime>,
374    /// Time when the service was last online, or `None` if this is the first time.
375    last_online_at: Option<LocalTime>,
376    /// Publishes events to subscribers.
377    emitter: Emitter<Event>,
378    /// Local listening addresses.
379    listening: Vec<net::SocketAddr>,
380    /// Latest metrics for all nodes connected to since the last start.
381    metrics: Metrics,
382}
383
384impl<D, S, G> Service<D, S, G> {
385    /// Get the local node id.
386    pub fn node_id(&self) -> NodeId {
387        *self.signer.public_key()
388    }
389
390    /// Get the local service time.
391    pub fn local_time(&self) -> LocalTime {
392        self.clock
393    }
394
395    pub fn emitter(&self) -> Emitter<Event> {
396        self.emitter.clone()
397    }
398}
399
400impl<D, S, G> Service<D, S, G>
401where
402    D: Store,
403    S: WriteStorage + 'static,
404    G: crypto::signature::Signer<crypto::Signature>,
405{
406    /// Initialize service with current time. Call this once.
407    pub fn initialize(&mut self, time: LocalTime) -> Result<(), Error> {
408        debug!(target: "service", "Init @{}", time.as_millis());
409        assert_ne!(time, LocalTime::default());
410
411        let nid = self.node_id();
412
413        self.clock = time;
414        self.started_at = Some(time);
415        self.last_online_at = match self.db.gossip().last() {
416            Ok(Some(last)) => Some(last.to_local_time()),
417            Ok(None) => None,
418            Err(e) => {
419                warn!(target: "service", "Failed to get the latest gossip message from db: {e}");
420                None
421            }
422        };
423
424        // Populate refs database. This is only useful as part of the upgrade process for nodes
425        // that have been online since before the refs database was created.
426        match self.db.refs().count() {
427            Ok(0) => {
428                info!(target: "service", "Empty refs database, populating from storage..");
429                if let Err(e) = self.db.refs_mut().populate(&self.storage) {
430                    warn!(target: "service", "Failed to populate refs database: {e}");
431                }
432            }
433            Ok(n) => debug!(target: "service", "Refs database has {n} cached references"),
434            Err(e) => {
435                warn!(target: "service", "Failed to retrieve count of refs from database: {e}")
436            }
437        }
438
439        let announced = self
440            .db
441            .seeds()
442            .seeded_by(&nid)?
443            .collect::<Result<HashMap<_, _>, _>>()?;
444        let mut inventory = BTreeSet::new();
445        let mut private = BTreeSet::new();
446
447        for repo in self.storage.repositories()? {
448            let repo = self.upgrade_sigrefs(repo)?;
449            let rid = repo.rid;
450
451            // If we're not seeding this repo, just skip it.
452            if !self.policies.is_seeding(&rid)? {
453                debug!(target: "service", "Local repository {rid} is not seeded");
454                continue;
455            }
456            // Add public repositories to inventory.
457            if repo.doc.is_public() {
458                inventory.insert(rid);
459            } else {
460                private.insert(rid);
461            }
462            // If we have no owned refs for this repo, then there's nothing to announce.
463            let Some(updated_at) = repo.synced_at else {
464                continue;
465            };
466            // Skip this repo if the sync status matches what we have in storage.
467            if let Some(announced) = announced.get(&rid) {
468                if updated_at.oid == announced.oid {
469                    continue;
470                }
471            }
472            // Make sure our local node's sync status is up to date with storage.
473            if self.db.seeds_mut().synced(
474                &rid,
475                &nid,
476                updated_at.oid,
477                updated_at.timestamp.into(),
478            )? {
479                debug!(target: "service", "Saved local sync status for {rid}..");
480            }
481            // If we got here, it likely means a repo was updated while the node was stopped.
482            // Therefore, we pre-load a refs announcement for this repo, so that it is included in
483            // the historical gossip messages when a node connects and subscribes to this repo.
484            if let Ok((ann, _)) = self.refs_announcement_for(rid, [nid]) {
485                debug!(target: "service", "Adding refs announcement for {rid} to historical gossip messages..");
486                self.db.gossip_mut().announced(&nid, &ann)?;
487            }
488        }
489
490        // Ensure that our inventory is recorded in our routing table, and we are seeding
491        // all of it. It can happen that inventory is not properly seeded if for eg. the
492        // user creates a new repository while the node is stopped.
493        self.db
494            .routing_mut()
495            .add_inventory(inventory.iter(), nid, time.into())?;
496        self.inventory = gossip::inventory(self.timestamp(), inventory);
497
498        // Ensure that private repositories are not in our inventory. It's possible that
499        // a repository was public and then it was made private.
500        self.db
501            .routing_mut()
502            .remove_inventories(private.iter(), &nid)?;
503
504        // Set up subscription filter for seeded repos.
505        self.filter = Filter::allowed_by(self.policies.seed_policies()?);
506        // Connect to configured peers.
507        let addrs = self.config.connect.clone();
508        for (id, addr) in addrs.into_iter().map(|ca| ca.into()) {
509            if let Err(e) = self.connect(id, addr) {
510                debug!(target: "service", "Service::initialization connection error: {e}");
511            }
512        }
513        // Try to establish some connections.
514        self.maintain_connections();
515        // Start periodic tasks.
516        self.outbox.wakeup(IDLE_INTERVAL);
517        self.outbox.wakeup(GOSSIP_INTERVAL);
518
519        Ok(())
520    }
521
522    fn upgrade_sigrefs(&mut self, mut info: RepositoryInfo) -> Result<RepositoryInfo, Error> {
523        if !matches!(info.refs, SignedRefsInfo::NeedsMigration) {
524            return Ok(info);
525        }
526
527        let rid = info.rid;
528
529        log::info!(
530            "Migrating `rad/sigrefs` of {rid} to force feature level {}.",
531            FeatureLevel::LATEST
532        );
533
534        let repo = self.storage.repository_mut(rid)?;
535        // NOTE: We assume to reach `FeatureLevel::LATEST` by signing refs.
536        let refs = repo.force_sign_refs(&self.signer)?;
537
538        let repo = self.storage.repository(rid)?;
539        let synced_at = SyncedAt::new(refs.at, &repo)?;
540
541        info.synced_at = Some(synced_at);
542        info.refs = SignedRefsInfo::Some(refs);
543
544        Ok(info)
545    }
546}
547
548impl<D, S, G> Service<D, S, G>
549where
550    D: Store,
551    S: ReadStorage + 'static,
552    G: crypto::signature::Signer<crypto::Signature>,
553{
554    pub fn new(
555        config: Config,
556        db: Stores<D>,
557        storage: S,
558        policies: policy::Config<Write>,
559        signer: Device<G>,
560        rng: Rng,
561        node: NodeAnnouncement,
562        emitter: Emitter<Event>,
563    ) -> Self {
564        let sessions = Sessions::new(rng.clone());
565        let limiter = RateLimiter::new(config.peers());
566        let last_timestamp = node.timestamp;
567        let clock = LocalTime::default(); // Updated on initialize.
568        let inventory = gossip::inventory(clock.into(), []); // Updated on initialize.
569        let fetcher = {
570            let config = fetcher::Config::new()
571                .with_max_concurrency(
572                    std::num::NonZeroUsize::new(config.limits.fetch_concurrency.into())
573                        .expect("fetch concurrency was zero, must be at least 1"),
574                )
575                .with_max_capacity(fetcher::MaxQueueSize::default());
576            FetcherService::new(config)
577        };
578        Self {
579            config,
580            storage,
581            policies,
582            signer,
583            rng,
584            inventory,
585            node,
586            clock,
587            db,
588            outbox: Outbox::default(),
589            limiter,
590            sessions,
591            fetcher,
592            filter: Filter::empty(),
593            relayed_by: HashMap::default(),
594            last_idle: LocalTime::default(),
595            last_gossip: LocalTime::default(),
596            last_sync: LocalTime::default(),
597            last_prune: LocalTime::default(),
598            last_timestamp,
599            last_announce: LocalTime::default(),
600            last_inventory: LocalTime::default(),
601            started_at: None,     // Updated on initialize.
602            last_online_at: None, // Updated on initialize.
603            emitter,
604            listening: vec![],
605            metrics: Metrics::default(),
606        }
607    }
608
609    /// Whether the service was started (initialized) and if so, at what time.
610    pub fn started(&self) -> Option<LocalTime> {
611        self.started_at
612    }
613
614    /// Return the next i/o action to execute.
615    #[allow(clippy::should_implement_trait)]
616    pub fn next(&mut self) -> Option<io::Io> {
617        self.outbox.next()
618    }
619
620    /// Seed a repository.
621    /// Returns whether or not the repo policy was updated.
622    pub fn seed(&mut self, id: &RepoId, scope: Scope) -> Result<bool, policy::Error> {
623        let updated = self.policies.seed(id, scope)?;
624        self.filter.insert(id);
625
626        Ok(updated)
627    }
628
629    /// Unseed a repository.
630    /// Returns whether or not the repo policy was updated.
631    /// Note that when unseeding, we don't announce anything to the network. This is because by
632    /// simply not announcing it anymore, it will eventually be pruned by nodes.
633    pub fn unseed(&mut self, id: &RepoId) -> Result<bool, policy::Error> {
634        let updated = self.policies.unseed(id)?;
635
636        if updated {
637            // Nb. This is potentially slow if we have lots of repos. We should probably
638            // only re-compute the filter when we've unseeded a certain amount of repos
639            // and the filter is really out of date.
640            self.filter = Filter::allowed_by(self.policies.seed_policies()?);
641            // Update and announce new inventory.
642            if let Err(e) = self.remove_inventory(id) {
643                warn!(target: "service", "Failed to update inventory after unseed: {e}");
644            }
645        }
646        Ok(updated)
647    }
648
649    /// Find the closest `n` peers by proximity in seeding graphs.
650    /// Returns a sorted list from the closest peer to the furthest.
651    /// Peers with more seedings in common score higher.
652    #[allow(unused)]
653    pub fn closest_peers(&self, n: usize) -> Vec<NodeId> {
654        todo!()
655    }
656
657    /// Get the database.
658    pub fn database(&self) -> &Stores<D> {
659        &self.db
660    }
661
662    /// Get the mutable database.
663    pub fn database_mut(&mut self) -> &mut Stores<D> {
664        &mut self.db
665    }
666
667    /// Get the storage instance.
668    pub fn storage(&self) -> &S {
669        &self.storage
670    }
671
672    /// Get the mutable storage instance.
673    pub fn storage_mut(&mut self) -> &mut S {
674        &mut self.storage
675    }
676
677    /// Get the node policies.
678    pub fn policies(&self) -> &policy::Config<Write> {
679        &self.policies
680    }
681
682    /// Get the local signer.
683    pub fn signer(&self) -> &Device<G> {
684        &self.signer
685    }
686
687    /// Subscriber to inner `Emitter` events.
688    pub fn events(&mut self) -> Events {
689        Events::from(self.emitter.subscribe())
690    }
691
692    pub fn fetcher(&self) -> &FetcherState {
693        self.fetcher.state()
694    }
695
696    /// Get I/O outbox.
697    pub fn outbox(&mut self) -> &mut Outbox {
698        &mut self.outbox
699    }
700
701    /// Get configuration.
702    pub fn config(&self) -> &Config {
703        &self.config
704    }
705
706    /// Lookup a repository, both locally and in the routing table.
707    pub fn lookup(&self, rid: RepoId) -> Result<Lookup, LookupError> {
708        let this = self.nid();
709        let local = self.storage.get(rid)?;
710        let remote = self
711            .db
712            .routing()
713            .get(&rid)?
714            .iter()
715            .filter(|nid| nid != &this)
716            .cloned()
717            .collect();
718
719        Ok(Lookup { local, remote })
720    }
721
722    pub fn tick(&mut self, now: LocalTime, metrics: &Metrics) {
723        trace!(
724            target: "service",
725            "Tick +{}",
726            now - self.started_at.expect("Service::tick: service must be initialized")
727        );
728        if now >= self.clock {
729            self.clock = now;
730        } else {
731            // Nb. In tests, we often move the clock forwards in time to test different behaviors,
732            // so this warning isn't applicable there.
733            #[cfg(not(test))]
734            warn!(
735                target: "service",
736                "System clock is not monotonic: {now} is not greater or equal to {}", self.clock
737            );
738        }
739        self.metrics = metrics.clone();
740    }
741
742    pub fn wake(&mut self) {
743        let now = self.clock;
744
745        trace!(
746            target: "service",
747            "Wake +{}",
748            now - self.started_at.expect("Service::wake: service must be initialized")
749        );
750
751        if now - self.last_idle >= IDLE_INTERVAL {
752            trace!(target: "service", "Running 'idle' task...");
753
754            self.keep_alive(&now);
755            self.disconnect_unresponsive_peers(&now);
756            self.idle_connections();
757            self.maintain_connections();
758            self.dequeue_fetches();
759            self.outbox.wakeup(IDLE_INTERVAL);
760            self.last_idle = now;
761        }
762        if now - self.last_gossip >= GOSSIP_INTERVAL {
763            trace!(target: "service", "Running 'gossip' task...");
764
765            if let Err(e) = self.relay_announcements() {
766                warn!(target: "service", "Failed to relay stored announcements: {e}");
767            }
768            self.outbox.wakeup(GOSSIP_INTERVAL);
769            self.last_gossip = now;
770        }
771        if now - self.last_sync >= SYNC_INTERVAL {
772            trace!(target: "service", "Running 'sync' task...");
773
774            if let Err(e) = self.fetch_missing_repositories() {
775                warn!(target: "service", "Failed to fetch missing inventory: {e}");
776            }
777            self.outbox.wakeup(SYNC_INTERVAL);
778            self.last_sync = now;
779        }
780        if now - self.last_announce >= ANNOUNCE_INTERVAL {
781            trace!(target: "service", "Running 'announce' task...");
782
783            self.announce_inventory();
784            self.outbox.wakeup(ANNOUNCE_INTERVAL);
785            self.last_announce = now;
786        }
787        if now - self.last_prune >= PRUNE_INTERVAL {
788            trace!(target: "service", "Running 'prune' task...");
789
790            if let Err(err) = self.prune_routing_entries(&now) {
791                warn!(target: "service", "Failed to prune routing entries: {err}");
792            }
793            if let Err(err) = self
794                .db
795                .gossip_mut()
796                .prune((now - LocalDuration::from(self.config.limits.gossip_max_age)).into())
797            {
798                warn!(target: "service", "Failed to prune gossip entries: {err}");
799            }
800
801            self.outbox.wakeup(PRUNE_INTERVAL);
802            self.last_prune = now;
803        }
804
805        // Always check whether there are persistent peers that need reconnecting.
806        self.maintain_persistent();
807    }
808
809    pub fn command(&mut self, cmd: Command) {
810        info!(target: "service", "Received command {cmd:?}");
811
812        match cmd {
813            Command::Connect(nid, addr, opts) => {
814                if opts.persistent {
815                    self.config.connect.insert((nid, addr.clone()).into());
816                }
817                if let Err(e) = self.connect(nid, addr) {
818                    match e {
819                        ConnectError::SessionExists { nid } => {
820                            self.emitter.emit(Event::PeerConnected { nid });
821                        }
822                        e => {
823                            // N.b. using the fact that the call to connect waits for an event
824                            self.emitter.emit(Event::PeerDisconnected {
825                                nid,
826                                reason: e.to_string(),
827                            });
828                        }
829                    }
830                }
831            }
832            Command::Disconnect(nid) => {
833                self.outbox.disconnect(nid, DisconnectReason::Command);
834            }
835            Command::Config(resp) => {
836                resp.ok(self.config.clone()).ok();
837            }
838            Command::ListenAddrs(resp) => {
839                resp.ok(self.listening.clone()).ok();
840            }
841            Command::Seeds(rid, namespaces, resp) => match self.seeds(&rid, namespaces) {
842                Ok(seeds) => {
843                    let (connected, disconnected) = seeds.partition();
844                    debug!(
845                        target: "service",
846                        "Found {} connected seed(s) and {} disconnected seed(s) for {}",
847                        connected.len(), disconnected.len(),  rid
848                    );
849                    resp.ok(seeds).ok();
850                }
851                Err(e) => {
852                    warn!(target: "service", "Failed to get seeds for {rid}: {e}");
853                    resp.err(e).ok();
854                }
855            },
856            Command::Fetch(rid, seed, timeout, signed_references_minimum_feature_level, resp) => {
857                let feature_level = signed_references_minimum_feature_level
858                    .unwrap_or(self.config.fetch.feature_level_min());
859                let config = self
860                    .fetch_config()
861                    .with_timeout(timeout)
862                    .with_minimum_feature_level(feature_level);
863                self.fetch(rid, seed, vec![], config, Some(resp));
864            }
865            Command::Seed(rid, scope, resp) => {
866                // Update our seeding policy.
867                let seeded = self
868                    .seed(&rid, scope)
869                    .expect("Service::command: error seeding repository");
870                resp.ok(seeded).ok();
871
872                // Let all our peers know that we're interested in this repo from now on.
873                self.outbox.broadcast(
874                    Message::subscribe(self.filter(), self.clock.into(), Timestamp::MAX),
875                    self.sessions.connected().map(|(_, s)| s),
876                );
877            }
878            Command::Unseed(id, resp) => {
879                let updated = self
880                    .unseed(&id)
881                    .expect("Service::command: error unseeding repository");
882                resp.ok(updated).ok();
883            }
884            Command::Follow(id, alias, resp) => {
885                let seeded = self
886                    .policies
887                    .follow(&id, alias.as_ref())
888                    .expect("Service::command: error following node");
889                resp.ok(seeded).ok();
890            }
891            Command::Unfollow(id, resp) => {
892                let updated = self
893                    .policies
894                    .unfollow(&id)
895                    .expect("Service::command: error unfollowing node");
896                resp.ok(updated).ok();
897            }
898            Command::Block(id, resp) => {
899                let updated = self
900                    .policies
901                    .set_follow_policy(&id, policy::Policy::Block)
902                    .expect("Service::command: error blocking node");
903                if updated {
904                    self.outbox.disconnect(id, DisconnectReason::Policy);
905                }
906                resp.send(updated).ok();
907            }
908            Command::AnnounceRefs(id, namespaces, resp) => {
909                let doc = match self.storage.get(id) {
910                    Ok(Some(doc)) => doc,
911                    Ok(None) => {
912                        warn!(target: "service", "Failed to announce refs: repository {id} not found");
913                        resp.err(command::Error::custom(format!("repository {id} not found")))
914                            .ok();
915                        return;
916                    }
917                    Err(e) => {
918                        warn!(target: "service", "Failed to announce refs: doc error: {e}");
919                        resp.err(e).ok();
920                        return;
921                    }
922                };
923
924                match self.announce_own_refs(id, doc, namespaces) {
925                    Ok((refs, _timestamp)) => {
926                        // TODO(finto): currently the command caller only
927                        // expects one `RefsAt`, this should be fixed in the
928                        // trait, eventually.
929                        if let Some(refs) = refs.first() {
930                            resp.ok(*refs).ok();
931                        } else {
932                            resp.err(command::Error::custom(format!(
933                                "no refs were announced for {id}"
934                            )))
935                            .ok();
936                        }
937                    }
938                    Err(err) => {
939                        warn!(target: "service", "Failed to announce refs: {err}");
940                        resp.err(err).ok();
941                    }
942                }
943            }
944            Command::AnnounceInventory => {
945                self.announce_inventory();
946            }
947            Command::AddInventory(rid, resp) => match self.add_inventory(rid) {
948                Ok(updated) => {
949                    resp.ok(updated).ok();
950                }
951                Err(e) => {
952                    warn!(target: "service", "Failed to add {rid} to inventory: {e}");
953                    resp.err(e).ok();
954                }
955            },
956            Command::QueryState(query, sender) => {
957                sender.send(query(self)).ok();
958            }
959        }
960    }
961
962    /// Initiate an outgoing fetch for some repository, based on another node's announcement.
963    /// Returns `true` if the fetch was initiated and `false` if it was skipped.
964    fn fetch_refs_at(
965        &mut self,
966        rid: RepoId,
967        from: NodeId,
968        refs: NonEmpty<RefsAt>,
969        scope: Scope,
970        config: fetcher::FetchConfig,
971    ) -> bool {
972        match self.refs_status_of(rid, refs, &scope) {
973            Ok(status) => {
974                if status.want.is_empty() {
975                    debug!(target: "service", "Skipping fetch for {rid}, all refs are already in storage");
976                } else {
977                    self.fetch(rid, from, status.want, config, None);
978                    return true;
979                }
980            }
981            Err(e) => {
982                warn!(target: "service", "Failed to get the refs status of {rid}: {e}");
983            }
984        }
985        // We didn't try to fetch anything.
986        false
987    }
988
989    fn fetch(
990        &mut self,
991        rid: RepoId,
992        from: NodeId,
993        refs_at: Vec<RefsAt>,
994        config: fetcher::FetchConfig,
995        channel: Option<command::Responder<FetchResult>>,
996    ) {
997        let session = {
998            let reason = format!("peer {from} is not connected; cannot initiate fetch");
999            let Some(session) = self.sessions.get_mut(&from) else {
1000                if let Some(c) = channel {
1001                    c.ok(FetchResult::Failed { reason }).ok();
1002                }
1003                return;
1004            };
1005            if !session.is_connected() {
1006                if let Some(c) = channel {
1007                    c.ok(FetchResult::Failed { reason }).ok();
1008                }
1009                return;
1010            }
1011            session
1012        };
1013
1014        let cmd = fetcher::state::command::Fetch {
1015            from,
1016            rid,
1017            refs: refs_at.into(),
1018            config,
1019        };
1020        let fetcher::service::FetchInitiated { event, rejected } = self.fetcher.fetch(cmd, channel);
1021
1022        if let Some(c) = rejected {
1023            c.ok(FetchResult::Failed {
1024                reason: "fetch queue at capacity".to_string(),
1025            })
1026            .ok();
1027        }
1028
1029        match event {
1030            fetcher::state::event::Fetch::Started {
1031                rid,
1032                from,
1033                refs: refs_at,
1034                config,
1035            } => {
1036                debug!(target: "service", "Starting fetch for {rid} from {from}");
1037                self.outbox.fetch(
1038                    session,
1039                    rid,
1040                    refs_at.into(),
1041                    self.config.limits.fetch_pack_receive,
1042                    config,
1043                );
1044            }
1045            fetcher::state::event::Fetch::Queued { rid, from } => {
1046                debug!(target: "service", "Queued fetch for {rid} from {from}");
1047            }
1048            fetcher::state::event::Fetch::AlreadyFetching { rid, from } => {
1049                debug!(target: "service", "Already fetching {rid} from {from}");
1050            }
1051            fetcher::state::event::Fetch::QueueAtCapacity { rid, from, .. } => {
1052                debug!(target: "service", "Queue at capacity for {from}, rejected {rid}");
1053            }
1054        }
1055    }
1056
1057    pub fn fetched(
1058        &mut self,
1059        rid: RepoId,
1060        from: NodeId,
1061        result: Result<crate::worker::fetch::FetchResult, crate::worker::FetchError>,
1062    ) {
1063        let cmd = fetcher::state::command::Fetched { from, rid };
1064        let fetcher::service::FetchCompleted { event, subscribers } = self.fetcher.fetched(cmd);
1065
1066        // Dequeue next fetches
1067        self.dequeue_fetches();
1068
1069        match event {
1070            fetcher::state::event::Fetched::NotFound { from, rid } => {
1071                debug!(target: "service", "Unexpected fetch result for {rid} from {from}");
1072            }
1073            fetcher::state::event::Fetched::Completed { from, rid, refs: _ } => {
1074                // Notify responders
1075                let fetch_result = match &result {
1076                    Ok(success) => FetchResult::Success {
1077                        updated: success.updated.clone(),
1078                        namespaces: success.namespaces.clone(),
1079                        clone: success.clone,
1080                    },
1081                    Err(e) => FetchResult::Failed {
1082                        reason: e.to_string(),
1083                    },
1084                };
1085                for responder in subscribers {
1086                    responder.ok(fetch_result.clone()).ok();
1087                }
1088                match result {
1089                    Ok(crate::worker::fetch::FetchResult {
1090                        updated,
1091                        canonical,
1092                        namespaces,
1093                        clone,
1094                        doc,
1095                    }) => {
1096                        info!(target: "service", "Fetched {rid} from {from} successfully");
1097                        // Update our routing table in case this fetch was user-initiated and doesn't
1098                        // come from an announcement.
1099                        self.seed_discovered(rid, from, self.clock.into());
1100
1101                        for update in &updated {
1102                            if update.is_skipped() {
1103                                trace!(target: "service", "Ref skipped: {update} for {rid}");
1104                            } else {
1105                                debug!(target: "service", "Ref updated: {update} for {rid}");
1106                            }
1107                        }
1108                        self.emitter.emit(Event::RefsFetched {
1109                            remote: from,
1110                            rid,
1111                            updated: updated.clone(),
1112                        });
1113                        self.emitter
1114                            .emit_all(canonical.into_iter().map(|(refname, target)| {
1115                                Event::CanonicalRefUpdated {
1116                                    rid,
1117                                    refname,
1118                                    target,
1119                                }
1120                            }));
1121
1122                        // Announce our new inventory if this fetch was a full clone.
1123                        // Only update and announce inventory for public repositories.
1124                        if clone && doc.is_public() {
1125                            debug!(target: "service", "Updating and announcing inventory for cloned repository {rid}..");
1126
1127                            if let Err(e) = self.add_inventory(rid) {
1128                                warn!(target: "service", "Failed to announce inventory for {rid}: {e}");
1129                            }
1130                        }
1131
1132                        // It's possible for a fetch to succeed but nothing was updated.
1133                        if updated.is_empty() || updated.iter().all(|u| u.is_skipped()) {
1134                            debug!(target: "service", "Nothing to announce, no refs were updated..");
1135                        } else {
1136                            // Finally, announce the refs. This is useful for nodes to know what we've synced,
1137                            // beyond just knowing that we have added an item to our inventory.
1138                            if let Err(e) = self.announce_refs(rid, doc.into(), namespaces, false) {
1139                                warn!(target: "service", "Failed to announce new refs: {e}");
1140                            }
1141                        }
1142                    }
1143                    Err(err) => {
1144                        warn!(target: "service", "Fetch failed for {rid} from {from}: {err}");
1145
1146                        // For now, we only disconnect the from in case of timeout. In the future,
1147                        // there may be other reasons to disconnect.
1148                        if err.is_timeout() {
1149                            self.outbox.disconnect(from, DisconnectReason::Fetch(err));
1150                        }
1151                    }
1152                }
1153            }
1154        }
1155    }
1156
1157    /// Attempt to dequeue fetches from all peers.
1158    /// At most one fetch is dequeued per peer. If the fetch cannot be processed,
1159    /// it is put back on the queue for that peer.
1160    ///
1161    /// Fetches are queued for two reasons:
1162    /// 1. The RID was already being fetched.
1163    /// 2. The session was already at fetch capacity.
1164    pub fn dequeue_fetches(&mut self) {
1165        let sessions = self
1166            .sessions
1167            .shuffled()
1168            .map(|(k, _)| *k)
1169            .collect::<Vec<_>>();
1170
1171        for nid in sessions {
1172            #[allow(clippy::unwrap_used)]
1173            let sess = self.sessions.get_mut(&nid).unwrap();
1174            if !sess.is_connected() {
1175                continue;
1176            }
1177
1178            let Some(fetcher::QueuedFetch {
1179                rid,
1180                refs: refs_at,
1181                config,
1182            }) = self.fetcher.dequeue(&nid)
1183            else {
1184                continue;
1185            };
1186
1187            // Check seeding policy
1188            let repo_entry = self
1189                .policies
1190                .seed_policy(&rid)
1191                .expect("error accessing repo seeding configuration");
1192
1193            let SeedingPolicy::Allow { scope } = repo_entry.policy else {
1194                debug!(target: "service", "Repository {} no longer seeded, skipping", rid);
1195                continue;
1196            };
1197
1198            debug!(target: "service", "Dequeued fetch for {} from {}", rid, nid);
1199
1200            match refs_at {
1201                RefsToFetch::Refs(refs) => {
1202                    self.fetch_refs_at(rid, nid, refs, scope, config);
1203                }
1204                RefsToFetch::All => {
1205                    // Channel is `None` since they will already be
1206                    // registered with the fetcher service.
1207                    self.fetch(rid, nid, vec![], config, None);
1208                }
1209            }
1210        }
1211    }
1212
1213    /// Inbound connection attempt.
1214    pub fn accepted(&mut self, ip: IpAddr) -> bool {
1215        // Always accept localhost connections, even if we already reached
1216        // our inbound connection limit.
1217        if ip.is_loopback() || ip.is_unspecified() {
1218            return true;
1219        }
1220        // Check for inbound connection limit.
1221        if self.sessions.inbound().count() >= self.config.limits.connection.inbound.into() {
1222            return false;
1223        }
1224        match self.db.addresses().is_ip_banned(ip) {
1225            Ok(banned) => {
1226                if banned {
1227                    debug!(target: "service", "Rejecting inbound connection from banned ip {ip}");
1228                    return false;
1229                }
1230            }
1231            Err(e) => warn!(target: "service", "Failed to query ban status for {ip}: {e}"),
1232        }
1233        let host: HostName = ip.into();
1234        let tokens = self.config.limits.rate.inbound;
1235
1236        if self.limiter.limit(host.clone(), None, &tokens, self.clock) {
1237            trace!(target: "service", "Rate limiting inbound connection from {host}..");
1238            return false;
1239        }
1240        true
1241    }
1242
1243    pub fn attempted(&mut self, nid: NodeId, addr: Address) {
1244        debug!(target: "service", "Attempted connection to {nid} ({addr})");
1245
1246        if let Some(sess) = self.sessions.get_mut(&nid) {
1247            sess.to_attempted();
1248        } else {
1249            #[cfg(debug_assertions)]
1250            panic!("Service::attempted: unknown session {nid}@{addr}");
1251        }
1252    }
1253
1254    pub fn listening(&mut self, local_addr: net::SocketAddr) {
1255        info!(target: "node", "Listening on {local_addr}..");
1256
1257        self.listening.push(local_addr);
1258    }
1259
1260    pub fn connected(&mut self, remote: NodeId, addr: Address, link: Link) {
1261        if let Ok(true) = self.policies.is_blocked(&remote) {
1262            self.emitter.emit(Event::PeerDisconnected {
1263                nid: remote,
1264                reason: format!("{remote} is blocked"),
1265            });
1266            info!(target: "service", "Disconnecting blocked inbound peer {remote}");
1267            self.outbox.disconnect(remote, DisconnectReason::Policy);
1268            return;
1269        }
1270
1271        info!(target: "service", "Connected to {remote} ({addr}) ({link:?})");
1272        self.emitter.emit(Event::PeerConnected { nid: remote });
1273
1274        let msgs = self.initial(link);
1275
1276        if link.is_outbound() {
1277            if let Some(peer) = self.sessions.get_mut(&remote) {
1278                peer.to_connected(self.clock);
1279                self.outbox.write_all(peer, msgs);
1280            }
1281        } else {
1282            match self.sessions.entry(remote) {
1283                Entry::Occupied(mut e) => {
1284                    // In this scenario, it's possible that our peer is persistent, and
1285                    // disconnected. We get an inbound connection before we attempt a re-connection,
1286                    // and therefore we treat it as a regular inbound connection.
1287                    //
1288                    // It's also possible that a disconnection hasn't gone through yet and our
1289                    // peer is still in connected state here, while a new inbound connection from
1290                    // that same peer is made. This results in a new connection from a peer that is
1291                    // already connected from the perspective of the service. This appears to be
1292                    // a bug in the underlying networking library.
1293                    let peer = e.get_mut();
1294                    debug!(
1295                        target: "service",
1296                        "Connecting peer {remote} already has a session open ({peer})"
1297                    );
1298                    peer.link = link;
1299                    peer.to_connected(self.clock);
1300                    self.outbox.write_all(peer, msgs);
1301                }
1302                Entry::Vacant(e) => {
1303                    if let HostName::Ip(ip) = addr.host {
1304                        if !address::is_local(&ip) {
1305                            if let Err(e) =
1306                                self.db
1307                                    .addresses_mut()
1308                                    .record_ip(&remote, ip, self.clock.into())
1309                            {
1310                                log::debug!(target: "service", "Failed to record IP address for {remote}: {e}");
1311                            }
1312                        }
1313                    }
1314                    let peer = e.insert(Session::inbound(
1315                        remote,
1316                        addr,
1317                        self.config.is_persistent(&remote),
1318                        self.rng.clone(),
1319                        self.clock,
1320                    ));
1321                    self.outbox.write_all(peer, msgs);
1322                }
1323            }
1324        }
1325    }
1326
1327    pub fn disconnected(&mut self, remote: NodeId, link: Link, reason: &DisconnectReason) {
1328        let since = self.local_time();
1329        let Some(session) = self.sessions.get_mut(&remote) else {
1330            // Since we sometimes disconnect the service eagerly, it's not unusual to get a second
1331            // disconnection event once the transport is dropped.
1332            trace!(target: "service", "Redundant disconnection for {remote} ({reason})");
1333            return;
1334        };
1335        // In cases of connection conflicts, there may be disconnections of one of the two
1336        // connections. In that case we don't want the service to remove the session.
1337        if session.link != link {
1338            return;
1339        }
1340
1341        info!(target: "service", "Disconnected from {remote} ({reason})");
1342        self.emitter.emit(Event::PeerDisconnected {
1343            nid: remote,
1344            reason: reason.to_string(),
1345        });
1346
1347        let link = session.link;
1348        let addr = session.addr.clone();
1349
1350        let cmd = fetcher::state::command::Cancel { from: remote };
1351        let fetcher::service::FetchesCancelled { event, orphaned } = self.fetcher.cancel(cmd);
1352
1353        match event {
1354            fetcher::state::event::Cancel::Unexpected { from } => {
1355                debug!(target: "service", "No fetches to cancel for {from}");
1356            }
1357            fetcher::state::event::Cancel::Canceled {
1358                from,
1359                active,
1360                queued,
1361            } => {
1362                debug!(target: "service", "Cancelled {} ongoing, {} queued for {from}", active.len(), queued.len());
1363            }
1364        }
1365
1366        // Notify orphaned responders
1367        for (rid, responder) in orphaned {
1368            responder
1369                .ok(FetchResult::Failed {
1370                    reason: format!("failed fetch to {rid}, peer disconnected: {reason}"),
1371                })
1372                .ok();
1373        }
1374
1375        // Attempt to re-connect to persistent peers.
1376        if self.config.is_persistent(&remote) {
1377            let delay = LocalDuration::from_secs(2u64.saturating_pow(session.attempts() as u32))
1378                .clamp(MIN_RECONNECTION_DELTA, MAX_RECONNECTION_DELTA);
1379
1380            // Nb. We always try to reconnect to persistent peers, even when the error appears
1381            // to not be transient.
1382            session.to_disconnected(since, since + delay);
1383
1384            debug!(target: "service", "Reconnecting to {remote} in {delay}..");
1385
1386            self.outbox.wakeup(delay);
1387        } else {
1388            debug!(target: "service", "Dropping peer {remote}..");
1389            self.sessions.remove(&remote);
1390
1391            let severity = match reason {
1392                DisconnectReason::Dial(_)
1393                | DisconnectReason::Fetch(_)
1394                | DisconnectReason::Connection(_) => {
1395                    if self.is_online() {
1396                        // If we're "online", there's something wrong with this
1397                        // peer connection specifically.
1398                        Severity::Medium
1399                    } else {
1400                        Severity::Low
1401                    }
1402                }
1403                DisconnectReason::Session(e) => e.severity(),
1404                DisconnectReason::Command
1405                | DisconnectReason::Conflict
1406                | DisconnectReason::Policy
1407                | DisconnectReason::SelfConnection => Severity::Low,
1408            };
1409
1410            if let Err(e) = self
1411                .db
1412                .addresses_mut()
1413                .disconnected(&remote, &addr, severity)
1414            {
1415                debug!(target: "service", "Failed to update address store: {e}");
1416            }
1417            // Only re-attempt outbound connections, since we don't care if an inbound connection
1418            // is dropped.
1419            if link.is_outbound() {
1420                self.maintain_connections();
1421            }
1422        }
1423        self.dequeue_fetches();
1424    }
1425
1426    pub fn received_message(&mut self, remote: NodeId, message: Message) {
1427        if let Err(err) = self.handle_message(&remote, message) {
1428            // If there's an error, stop processing messages from this peer.
1429            // However, we still relay messages returned up to this point.
1430            self.outbox
1431                .disconnect(remote, DisconnectReason::Session(err));
1432
1433            // FIXME: The peer should be set in a state such that we don't
1434            // process further messages.
1435        }
1436    }
1437
1438    /// Handle an announcement message.
1439    ///
1440    /// Returns `true` if this announcement should be stored and relayed to connected peers,
1441    /// and `false` if it should not.
1442    pub fn handle_announcement(
1443        &mut self,
1444        relayer: &NodeId,
1445        relayer_addr: &Address,
1446        announcement: &Announcement,
1447    ) -> Result<Option<gossip::AnnouncementId>, session::Error> {
1448        if !announcement.verify() {
1449            return Err(session::Error::Misbehavior);
1450        }
1451        let Announcement {
1452            node: announcer,
1453            message,
1454            ..
1455        } = announcement;
1456
1457        // Ignore our own announcements, in case the relayer sent one by mistake.
1458        if announcer == self.nid() {
1459            return Ok(None);
1460        }
1461        let now = self.clock;
1462        let timestamp = message.timestamp();
1463
1464        // Don't allow messages from too far in the future.
1465        if timestamp.saturating_sub(now.as_millis()) > MAX_TIME_DELTA.as_millis() as u64 {
1466            return Err(session::Error::InvalidTimestamp(timestamp));
1467        }
1468
1469        // We don't process announcements from nodes we don't know, since the node announcement is
1470        // what provides DoS protection.
1471        //
1472        // Note that it's possible to *not* receive the node announcement, but receive the
1473        // subsequent announcements of a node in the case of historical gossip messages requested
1474        // from the `subscribe` message. This can happen if the cut-off time is after the node
1475        // announcement timestamp, but before the other announcements. In that case, we simply
1476        // ignore all announcements of that node until we get a node announcement.
1477        if let AnnouncementMessage::Inventory(_) | AnnouncementMessage::Refs(_) = message {
1478            match self.db.addresses().get(announcer) {
1479                Ok(node) => {
1480                    if node.is_none() {
1481                        debug!(target: "service", "Ignoring announcement from unknown node {announcer} (t={timestamp})");
1482                        return Ok(None);
1483                    }
1484                }
1485                Err(e) => {
1486                    debug!(target: "service", "Failed to look up node in address book: {e}");
1487                    return Ok(None);
1488                }
1489            }
1490        }
1491
1492        // Discard announcement messages we've already seen; otherwise, update our last seen time.
1493        let relay = match self.db.gossip_mut().announced(announcer, announcement) {
1494            Ok(Some(id)) => {
1495                log::debug!(
1496                    target: "service",
1497                    "Stored announcement from {announcer} to be broadcast in {} (t={timestamp})",
1498                    (self.last_gossip + GOSSIP_INTERVAL) - self.clock
1499                );
1500                // Keep track of who relayed the message for later.
1501                self.relayed_by.entry(id).or_default().push(*relayer);
1502
1503                // Decide whether or not to relay this message, if it's fresh.
1504                // To avoid spamming peers on startup with historical gossip messages,
1505                // don't relay messages that are too old. We make an exception for node announcements,
1506                // since they are cached, and will hence often carry old timestamps.
1507                let relay = message.is_node_announcement()
1508                    || now - timestamp.to_local_time() <= MAX_TIME_DELTA;
1509                relay.then_some(id)
1510            }
1511            Ok(None) => {
1512                // FIXME: Still mark as relayed by this peer.
1513                // FIXME: Refs announcements should not be delayed, since they are only sent
1514                // to subscribers.
1515                debug!(target: "service", "Ignoring stale announcement from {announcer} (t={timestamp})");
1516                return Ok(None);
1517            }
1518            Err(e) => {
1519                debug!(target: "service", "Failed to update gossip entry from {announcer}: {e}");
1520                return Ok(None);
1521            }
1522        };
1523
1524        match message {
1525            // Process a peer inventory update announcement by (maybe) fetching.
1526            AnnouncementMessage::Inventory(message) => {
1527                self.emitter.emit(Event::InventoryAnnounced {
1528                    nid: *announcer,
1529                    inventory: message.inventory.to_vec(),
1530                    timestamp: message.timestamp,
1531                });
1532                match self.sync_routing(
1533                    message.inventory.iter().cloned(),
1534                    *announcer,
1535                    message.timestamp,
1536                ) {
1537                    Ok(synced) => {
1538                        if synced.is_empty() {
1539                            trace!(target: "service", "No routes updated by inventory announcement from {announcer}");
1540                            return Ok(None);
1541                        }
1542                    }
1543                    Err(e) => {
1544                        debug!(target: "service", "Failed to process inventory from {announcer}: {e}");
1545                        return Ok(None);
1546                    }
1547                }
1548                let mut missing = Vec::new();
1549                let nid = *self.nid();
1550
1551                // Here we handle the special case where the inventory we received is that of
1552                // a connected peer, as opposed to being relayed to us.
1553                if let Some(sess) = self.sessions.get_mut(announcer) {
1554                    for id in message.inventory.as_slice() {
1555                        // If we are connected to the announcer of this inventory, update the peer's
1556                        // subscription filter to include all inventory items. This way, we'll
1557                        // relay messages relating to the peer's inventory.
1558                        if let Some(sub) = &mut sess.subscribe {
1559                            sub.filter.insert(id);
1560                        }
1561
1562                        // If we're seeding and connected to the announcer, and we don't have
1563                        // the inventory, fetch it from the announcer.
1564                        if self.policies.is_seeding(id).expect(
1565                            "Service::handle_announcement: error accessing seeding configuration",
1566                        ) {
1567                            // Only if we do not have the repository locally do we fetch here.
1568                            // If we do have it, only fetch after receiving a ref announcement.
1569                            match self.db.routing().entry(id, &nid) {
1570                                Ok(entry) => {
1571                                    if entry.is_none() {
1572                                        missing.push(*id);
1573                                    }
1574                                }
1575                                Err(e) => debug!(
1576                                    target: "service",
1577                                    "Error checking local inventory for {id}: {e}"
1578                                ),
1579                            }
1580                        }
1581                    }
1582                }
1583                // Since we have limited fetch capacity, it may be that we can't fetch an entire
1584                // inventory from a peer. Therefore we randomize the order of the RIDs to fetch
1585                // different RIDs from different peers in case multiple peers announce the same
1586                // RIDs.
1587                self.rng.shuffle(&mut missing);
1588
1589                for rid in missing {
1590                    debug!(target: "service", "Missing seeded inventory {rid}; initiating fetch..");
1591                    self.fetch(rid, *announcer, vec![], self.fetch_config(), None);
1592                }
1593                return Ok(relay);
1594            }
1595            AnnouncementMessage::Refs(message) => {
1596                self.emitter.emit(Event::RefsAnnounced {
1597                    nid: *announcer,
1598                    rid: message.rid,
1599                    refs: message.refs.to_vec(),
1600                    timestamp: message.timestamp,
1601                });
1602                // Empty announcements can be safely ignored.
1603                let Some(refs) = NonEmpty::from_vec(message.refs.to_vec()) else {
1604                    debug!(target: "service", "Skipping fetch, no refs in announcement for {} (t={timestamp})", message.rid);
1605                    return Ok(None);
1606                };
1607                // We update inventories when receiving ref announcements, as these could come
1608                // from a new repository being initialized.
1609                self.seed_discovered(message.rid, *announcer, message.timestamp);
1610
1611                // Update sync status of announcer for this repo.
1612                if let Some(refs) = refs.iter().find(|r| &r.remote == self.nid()) {
1613                    debug!(
1614                        target: "service",
1615                        "Refs announcement of {announcer} for {} contains our own remote at {} (t={})",
1616                        message.rid, refs.at, message.timestamp
1617                    );
1618                    match self.db.seeds_mut().synced(
1619                        &message.rid,
1620                        announcer,
1621                        refs.at,
1622                        message.timestamp,
1623                    ) {
1624                        Ok(updated) => {
1625                            if updated {
1626                                debug!(
1627                                    target: "service",
1628                                    "Updating sync status of {announcer} for {} to {}",
1629                                    message.rid, refs.at
1630                                );
1631                                self.emitter.emit(Event::RefsSynced {
1632                                    rid: message.rid,
1633                                    remote: *announcer,
1634                                    at: refs.at,
1635                                });
1636                            } else {
1637                                debug!(
1638                                    target: "service",
1639                                    "Sync status of {announcer} was not updated for {}",
1640                                    message.rid,
1641                                );
1642                            }
1643                        }
1644                        Err(e) => {
1645                            debug!(target: "service", "Failed to update sync status for {}: {e}", message.rid);
1646                        }
1647                    }
1648                }
1649                let repo_entry = self.policies.seed_policy(&message.rid).expect(
1650                    "Service::handle_announcement: error accessing repo seeding configuration",
1651                );
1652                let SeedingPolicy::Allow { scope } = repo_entry.policy else {
1653                    debug!(
1654                        target: "service",
1655                        "Ignoring refs announcement from {announcer}: repository {} isn't seeded (t={timestamp})",
1656                        message.rid
1657                    );
1658                    return Ok(None);
1659                };
1660                // Ref announcements may be relayed by peers who don't have the
1661                // actual refs in storage, therefore we only check whether we
1662                // are connected to the *announcer*, which is required by the
1663                // protocol to only announce refs it has.
1664                //
1665                // TODO(Ade): Perhaps it makes sense to establish connections to
1666                // followed but unconnected peers. Consider:
1667                //   Connections: Alice ←→ Bob ←→ Eve
1668                //   Follows:     Alice ←→ Eve
1669                // Eve announces refs, and Bob relays these announcements to Alice.
1670                // Then, Alice might determine that Bob does not have Eve's refs,
1671                // and therefore connect directly to Eve in order to fetch.
1672                let Some(remote) = self.sessions.get(announcer).cloned() else {
1673                    trace!(
1674                        target: "service",
1675                        "Skipping fetch of {}, no sessions connected to {announcer}",
1676                        message.rid
1677                    );
1678                    return Ok(relay);
1679                };
1680                // Finally, start the fetch.
1681                self.fetch_refs_at(message.rid, remote.id, refs, scope, self.fetch_config());
1682
1683                return Ok(relay);
1684            }
1685            AnnouncementMessage::Node(
1686                ann @ NodeAnnouncement {
1687                    features,
1688                    addresses,
1689                    ..
1690                },
1691            ) => {
1692                self.emitter.emit(Event::NodeAnnounced {
1693                    nid: *announcer,
1694                    alias: ann.alias.clone(),
1695                    timestamp: ann.timestamp,
1696                    features: *features,
1697                    addresses: addresses.to_vec(),
1698                });
1699                // If this node isn't a seed, we're not interested in adding it
1700                // to our address book, but other nodes may be, so we relay the message anyway.
1701                if !features.has(Features::SEED) {
1702                    return Ok(relay);
1703                }
1704
1705                match self.db.addresses_mut().insert(
1706                    announcer,
1707                    ann.version,
1708                    ann.features,
1709                    &ann.alias,
1710                    ann.work(),
1711                    &ann.agent,
1712                    timestamp,
1713                    addresses
1714                        .iter()
1715                        // Ignore non-routable addresses unless received from a local network
1716                        // peer. This allows the node to function in a local network.
1717                        .filter(|a| a.is_routable() || relayer_addr.is_local())
1718                        .map(|a| KnownAddress::new(a.clone(), address::Source::Peer)),
1719                ) {
1720                    Ok(updated) => {
1721                        // Only relay if we received new information.
1722                        if updated {
1723                            debug!(
1724                                target: "service",
1725                                "Address store entry for node {announcer} updated at {timestamp}"
1726                            );
1727                            return Ok(relay);
1728                        }
1729                    }
1730                    Err(err) => {
1731                        // An error here is due to a fault in our address store.
1732                        warn!(target: "service", "Failed to process node announcement from {announcer}: {err}");
1733                    }
1734                }
1735            }
1736        }
1737        Ok(None)
1738    }
1739
1740    pub fn handle_info(&mut self, remote: NodeId, info: &Info) -> Result<(), session::Error> {
1741        match info {
1742            // Nb. We don't currently send this message.
1743            Info::RefsAlreadySynced { rid, at } => {
1744                debug!(target: "service", "Refs already synced for {rid} by {remote}");
1745                self.emitter.emit(Event::RefsSynced {
1746                    rid: *rid,
1747                    remote,
1748                    at: *at,
1749                });
1750            }
1751        }
1752
1753        Ok(())
1754    }
1755
1756    pub fn handle_message(
1757        &mut self,
1758        remote: &NodeId,
1759        message: Message,
1760    ) -> Result<(), session::Error> {
1761        let local = self.node_id();
1762        let relay = self.config.is_relay();
1763        let Some(peer) = self.sessions.get_mut(remote) else {
1764            debug!(target: "service", "Session not found for {remote}");
1765            return Ok(());
1766        };
1767        peer.last_active = self.clock;
1768
1769        let limit: RateLimit = match peer.link {
1770            Link::Outbound => self.config.limits.rate.outbound.into(),
1771            Link::Inbound => self.config.limits.rate.inbound.into(),
1772        };
1773        if self
1774            .limiter
1775            .limit(peer.addr.clone().into(), Some(remote), &limit, self.clock)
1776        {
1777            debug!(target: "service", "Rate limiting message from {remote} ({})", peer.addr);
1778            return Ok(());
1779        }
1780        message.log(log::Level::Debug, remote, Link::Inbound);
1781
1782        let connected = match &mut peer.state {
1783            session::State::Disconnected { .. } => {
1784                debug!(target: "service", "Ignoring message from disconnected peer {}", peer.id);
1785                return Ok(());
1786            }
1787            // In case of a discrepancy between the service state and the state of the underlying
1788            // wire protocol, we may receive a message from a peer that we consider not fully connected
1789            // at the service level. To remedy this, we simply transition the peer to a connected state.
1790            //
1791            // This is not ideal, but until the wire protocol and service are unified, it's the simplest
1792            // solution to converge towards the same state.
1793            session::State::Attempted | session::State::Initial => {
1794                debug!(target: "service", "Received unexpected message from connecting peer {}", peer.id);
1795                debug!(target: "service", "Transitioning peer {} to 'connected' state", peer.id);
1796
1797                peer.to_connected(self.clock);
1798
1799                None
1800            }
1801            session::State::Connected {
1802                ping, latencies, ..
1803            } => Some((ping, latencies)),
1804        };
1805
1806        trace!(target: "service", "Received message {message:?} from {remote}");
1807
1808        match message {
1809            // Process a peer announcement.
1810            Message::Announcement(ann) => {
1811                let relayer = remote;
1812                let relayer_addr = peer.addr.clone();
1813
1814                if let Some(id) = self.handle_announcement(relayer, &relayer_addr, &ann)? {
1815                    if self.config.is_relay() {
1816                        if let AnnouncementMessage::Inventory(_) = ann.message {
1817                            if let Err(e) = self
1818                                .database_mut()
1819                                .gossip_mut()
1820                                .set_relay(id, gossip::RelayStatus::Relay)
1821                            {
1822                                warn!(target: "service", "Failed to set relay flag for message: {e}");
1823                                return Ok(());
1824                            }
1825                        } else {
1826                            self.relay(id, ann);
1827                        }
1828                    }
1829                }
1830            }
1831            Message::Subscribe(subscribe) => {
1832                // Filter announcements by interest.
1833                match self
1834                    .db
1835                    .gossip()
1836                    .filtered(&subscribe.filter, subscribe.since, subscribe.until)
1837                {
1838                    Ok(anns) => {
1839                        for ann in anns {
1840                            let ann = match ann {
1841                                Ok(a) => a,
1842                                Err(e) => {
1843                                    debug!(target: "service", "Failed to read gossip message from store: {e}");
1844                                    continue;
1845                                }
1846                            };
1847                            // Don't send announcements authored by the remote, back to the remote.
1848                            if ann.node == *remote {
1849                                continue;
1850                            }
1851                            // Only send messages if we're a relay, or it's our own messages.
1852                            if relay || ann.node == local {
1853                                self.outbox.write(peer, ann.into());
1854                            }
1855                        }
1856                    }
1857                    Err(e) => {
1858                        warn!(target: "service", "Failed to query gossip messages from store: {e}");
1859                    }
1860                }
1861                peer.subscribe = Some(subscribe);
1862            }
1863            Message::Info(info) => {
1864                self.handle_info(*remote, &info)?;
1865            }
1866            Message::Ping(Ping { ponglen, .. }) => {
1867                // Ignore pings which ask for too much data.
1868                if ponglen > Ping::MAX_PONG_ZEROES {
1869                    return Ok(());
1870                }
1871                self.outbox.write(
1872                    peer,
1873                    Message::Pong {
1874                        zeroes: ZeroBytes::new(ponglen),
1875                    },
1876                );
1877            }
1878            Message::Pong { zeroes } => {
1879                if let Some((ping, latencies)) = connected {
1880                    if let session::PingState::AwaitingResponse {
1881                        len: ponglen,
1882                        since,
1883                    } = *ping
1884                    {
1885                        if (ponglen as usize) == zeroes.len() {
1886                            *ping = session::PingState::Ok;
1887                            // Keep track of peer latency.
1888                            latencies.push_back(self.clock - since);
1889                            if latencies.len() > MAX_LATENCIES {
1890                                latencies.pop_front();
1891                            }
1892                        }
1893                    }
1894                }
1895            }
1896        }
1897        Ok(())
1898    }
1899
1900    /// A convenient method to check if we should fetch from a `RefsAnnouncement` with `scope`.
1901    fn refs_status_of(
1902        &self,
1903        rid: RepoId,
1904        refs: NonEmpty<RefsAt>,
1905        scope: &policy::Scope,
1906    ) -> Result<RefsStatus, Error> {
1907        let mut refs = RefsStatus::new(rid, refs, self.db.refs())?;
1908        // Check that there's something we want.
1909        if refs.want.is_empty() {
1910            return Ok(refs);
1911        }
1912        // Check scope.
1913        let mut refs = match scope {
1914            policy::Scope::All => refs,
1915            policy::Scope::Followed => match self.policies.namespaces_for(&self.storage, &rid) {
1916                Ok(Namespaces::All) => refs,
1917                Ok(Namespaces::Followed(followed)) => {
1918                    refs.want.retain(|r| followed.contains(&r.remote));
1919                    refs
1920                }
1921                Err(e) => return Err(e.into()),
1922            },
1923        };
1924        // Remove our own remote, we don't want to fetch that.
1925        refs.want.retain(|r| r.remote != self.node_id());
1926
1927        Ok(refs)
1928    }
1929
1930    /// Add a seed to our routing table.
1931    fn seed_discovered(&mut self, rid: RepoId, nid: NodeId, time: Timestamp) {
1932        if let Ok(result) = self.db.routing_mut().add_inventory([&rid], nid, time) {
1933            if let &[(_, InsertResult::SeedAdded)] = result.as_slice() {
1934                self.emitter.emit(Event::SeedDiscovered { rid, nid });
1935                debug!(target: "service", "Routing table updated for {rid} with seed {nid}");
1936            }
1937        }
1938    }
1939
1940    /// Set of initial messages to send to a peer.
1941    fn initial(&mut self, _link: Link) -> Vec<Message> {
1942        let now = self.clock();
1943        let filter = self.filter();
1944
1945        // TODO: Only subscribe to outbound connections; otherwise, we will consume too
1946        // much bandwidth.
1947
1948        // If we've been previously connected to the network, we'll have received gossip messages.
1949        // Instead of simply taking the last timestamp we try to ensure we don't miss any
1950        // messages due un-synchronized clocks.
1951        //
1952        // If this is our first connection to the network, we just ask for a fixed backlog
1953        // of messages to get us started.
1954        let since = if let Some(last) = self.last_online_at {
1955            Timestamp::from(last - SUBSCRIBE_BACKLOG_DELTA)
1956        } else {
1957            (*now - INITIAL_SUBSCRIBE_BACKLOG_DELTA).into()
1958        };
1959        debug!(target: "service", "Subscribing to messages since timestamp {since}..");
1960
1961        vec![
1962            Message::node(self.node.clone(), &self.signer),
1963            Message::inventory(self.inventory.clone(), &self.signer),
1964            Message::subscribe(filter, since, Timestamp::MAX),
1965        ]
1966    }
1967
1968    /// Try to guess whether we're online or not.
1969    fn is_online(&self) -> bool {
1970        self.sessions
1971            .connected()
1972            .filter(|(_, s)| s.addr.is_routable() && s.last_active >= self.clock - IDLE_INTERVAL)
1973            .count()
1974            > 0
1975    }
1976
1977    /// Remove a local repository from our inventory.
1978    fn remove_inventory(&mut self, rid: &RepoId) -> Result<bool, Error> {
1979        let node = self.node_id();
1980        let now = self.timestamp();
1981
1982        let removed = self.db.routing_mut().remove_inventory(rid, &node)?;
1983        if removed {
1984            self.refresh_and_announce_inventory(now)?;
1985        }
1986        Ok(removed)
1987    }
1988
1989    /// Add a local repository to our inventory.
1990    fn add_inventory(&mut self, rid: RepoId) -> Result<bool, Error> {
1991        let node = self.node_id();
1992        let now = self.timestamp();
1993
1994        if !self.storage.contains(&rid)? {
1995            debug!(target: "service", "Attempt to add non-existing inventory {rid}: repository not found in storage");
1996            return Ok(false);
1997        }
1998        // Add to our local inventory.
1999        let updates = self.db.routing_mut().add_inventory([&rid], node, now)?;
2000        let updated = !updates.is_empty();
2001
2002        if updated {
2003            self.refresh_and_announce_inventory(now)?;
2004        }
2005        Ok(updated)
2006    }
2007
2008    /// Update cached inventory message, and announce new inventory to peers.
2009    fn refresh_and_announce_inventory(&mut self, time: Timestamp) -> Result<(), Error> {
2010        let inventory = self.inventory()?;
2011
2012        self.inventory = gossip::inventory(time, inventory);
2013        self.announce_inventory();
2014
2015        Ok(())
2016    }
2017
2018    /// Get our local inventory.
2019    ///
2020    /// A node's inventory is the advertised list of repositories offered by a node.
2021    ///
2022    /// A node's inventory consists of *public* repositories that are seeded and available locally
2023    /// in the node's storage. We use the routing table as the canonical state of all inventories,
2024    /// including the local node's.
2025    ///
2026    /// When a repository is unseeded, it is also removed from the inventory. Private repositories
2027    /// are *not* part of a node's inventory.
2028    fn inventory(&self) -> Result<HashSet<RepoId>, Error> {
2029        self.db
2030            .routing()
2031            .get_inventory(self.nid())
2032            .map_err(Error::from)
2033    }
2034
2035    /// Process a peer inventory announcement by updating our routing table.
2036    /// This function expects the peer's full inventory, and prunes entries that are not in the
2037    /// given inventory.
2038    fn sync_routing(
2039        &mut self,
2040        inventory: impl IntoIterator<Item = RepoId>,
2041        from: NodeId,
2042        timestamp: Timestamp,
2043    ) -> Result<SyncedRouting, Error> {
2044        let mut synced = SyncedRouting::default();
2045        let included = inventory.into_iter().collect::<BTreeSet<_>>();
2046        let mut events = Vec::new();
2047
2048        for (rid, result) in
2049            self.db
2050                .routing_mut()
2051                .add_inventory(included.iter(), from, timestamp)?
2052        {
2053            match result {
2054                InsertResult::SeedAdded => {
2055                    debug!(target: "service", "Routing table updated for {rid} with seed {from}");
2056                    events.push(Event::SeedDiscovered { rid, nid: from });
2057
2058                    if self
2059                        .policies
2060                        .is_seeding(&rid)
2061                        .expect("Service::process_inventory: error accessing seeding configuration")
2062                    {
2063                        // TODO: We should fetch here if we're already connected, case this seed has
2064                        // refs we don't have.
2065                    }
2066                    synced.added.push(rid);
2067                }
2068                InsertResult::TimeUpdated => {
2069                    synced.updated.push(rid);
2070                }
2071                InsertResult::NotUpdated => {}
2072            }
2073        }
2074
2075        synced.removed.extend(
2076            self.db
2077                .routing()
2078                .get_inventory(&from)?
2079                .into_iter()
2080                .filter(|rid| !included.contains(rid)),
2081        );
2082        self.db
2083            .routing_mut()
2084            .remove_inventories(&synced.removed, &from)?;
2085        events.extend(
2086            synced
2087                .removed
2088                .iter()
2089                .map(|&rid| Event::SeedDropped { rid, nid: from }),
2090        );
2091
2092        self.emitter.emit_all(events);
2093
2094        Ok(synced)
2095    }
2096
2097    /// Return a refs announcement including the given remotes.
2098    fn refs_announcement_for(
2099        &mut self,
2100        rid: RepoId,
2101        remotes: impl IntoIterator<Item = NodeId>,
2102    ) -> Result<(Announcement, Vec<RefsAt>), Error> {
2103        let repo = self.storage.repository(rid)?;
2104        let timestamp = self.timestamp();
2105        let mut refs = BoundedVec::<_, REF_REMOTE_LIMIT>::new();
2106
2107        for remote_id in remotes.into_iter() {
2108            let refs_at = RefsAt::new(&repo, remote_id).map_err(|err| {
2109                radicle::storage::Error::Refs(radicle::storage::refs::Error::Read(err))
2110            })?;
2111
2112            if refs.push(refs_at).is_err() {
2113                warn!(
2114                    target: "service",
2115                    "refs announcement limit ({REF_REMOTE_LIMIT}) exceeded, peers will see only some of your repository references",
2116                );
2117                break;
2118            }
2119        }
2120
2121        let msg = AnnouncementMessage::from(RefsAnnouncement {
2122            rid,
2123            refs: refs.clone(),
2124            timestamp,
2125        });
2126        Ok((msg.signed(&self.signer), refs.into()))
2127    }
2128
2129    /// Announce our own refs for the given repo.
2130    fn announce_own_refs(
2131        &mut self,
2132        rid: RepoId,
2133        doc: Doc,
2134        namespaces: impl IntoIterator<Item = NodeId>,
2135    ) -> Result<(Vec<RefsAt>, Timestamp), Error> {
2136        let (refs, timestamp) = self.announce_refs(rid, doc, namespaces, true)?;
2137
2138        // Update refs database with our signed refs branches.
2139        // This isn't strictly necessary for now, as we only use the database for fetches, and
2140        // we don't fetch our own refs that are announced, but it's for good measure.
2141        for r in refs.iter() {
2142            self.emitter.emit(Event::LocalRefsAnnounced {
2143                rid,
2144                refs: *r,
2145                timestamp,
2146            });
2147            if let Err(e) = self.database_mut().refs_mut().set(
2148                &rid,
2149                &r.remote,
2150                &SIGREFS_BRANCH,
2151                r.at,
2152                timestamp.to_local_time(),
2153            ) {
2154                warn!(
2155                    target: "service",
2156                    "Error updating refs database for `rad/sigrefs` of {} in {rid}: {e}",
2157                    r.remote
2158                );
2159            }
2160        }
2161        Ok((refs, timestamp))
2162    }
2163
2164    /// Announce local refs for given repo.
2165    fn announce_refs(
2166        &mut self,
2167        rid: RepoId,
2168        doc: Doc,
2169        remotes: impl IntoIterator<Item = NodeId>,
2170        own: bool,
2171    ) -> Result<(Vec<RefsAt>, Timestamp), Error> {
2172        let (ann, refs) = self.refs_announcement_for(rid, remotes)?;
2173        let timestamp = ann.timestamp();
2174        let peers = self.sessions.connected().map(|(_, p)| p);
2175
2176        // Update our sync status for our own refs. This is useful for determining if refs were
2177        // updated while the node was stopped.
2178        for r in refs.iter().filter(|r| own || r.remote == ann.node) {
2179            info!(
2180                target: "service",
2181                "Announcing refs {rid}/{r} to peers (t={timestamp})..",
2182            );
2183            // Update our local node's sync status to mark the refs as announced.
2184            if let Err(e) = self.db.seeds_mut().synced(&rid, &ann.node, r.at, timestamp) {
2185                warn!(target: "service", "Failed to update sync status for local node: {e}");
2186            } else {
2187                debug!(target: "service", "Saved local sync status for {rid}..");
2188            }
2189        }
2190
2191        self.outbox.announce(
2192            ann,
2193            peers.filter(|p| {
2194                // Only announce to peers who are allowed to view this repo.
2195                doc.is_visible_to(&p.id.into())
2196            }),
2197            self.db.gossip_mut(),
2198        );
2199        Ok((refs, timestamp))
2200    }
2201
2202    fn reconnect(&mut self, nid: NodeId, addr: Address) -> bool {
2203        if let Some(sess) = self.sessions.get_mut(&nid) {
2204            sess.to_initial();
2205            self.outbox.connect(nid, addr);
2206
2207            return true;
2208        }
2209        false
2210    }
2211
2212    fn connect(&mut self, nid: NodeId, addr: Address) -> Result<(), ConnectError> {
2213        debug!(target: "service", "Connecting to {nid} ({addr})..");
2214
2215        if nid == self.node_id() {
2216            return Err(ConnectError::SelfConnection);
2217        }
2218        if let Ok(true) = self.policies.is_blocked(&nid) {
2219            return Err(ConnectError::Blocked { nid });
2220        }
2221        if !self.is_supported_address(&addr) {
2222            return Err(ConnectError::UnsupportedAddress { nid, addr });
2223        }
2224        if self.sessions.contains_key(&nid) {
2225            return Err(ConnectError::SessionExists { nid });
2226        }
2227        if self.sessions.outbound().count() >= self.config.limits.connection.outbound.into() {
2228            return Err(ConnectError::LimitReached { nid, addr });
2229        }
2230        let persistent = self.config.is_persistent(&nid);
2231        let timestamp: Timestamp = self.clock.into();
2232
2233        if let Err(e) = self.db.addresses_mut().attempted(&nid, &addr, timestamp) {
2234            warn!(target: "service", "Failed to update address book with connection attempt: {e}");
2235        }
2236        self.sessions.insert(
2237            nid,
2238            Session::outbound(nid, addr.clone(), persistent, self.rng.clone()),
2239        );
2240        self.outbox.connect(nid, addr);
2241
2242        Ok(())
2243    }
2244
2245    fn seeds(&self, rid: &RepoId, namespaces: HashSet<PublicKey>) -> Result<Seeds, Error> {
2246        let mut seeds = Seeds::new(self.rng.clone());
2247
2248        // First, build a list of peers that have synced refs for `namespaces`, if any.
2249        // This step is skipped:
2250        //  1. For the repository (and thus all `namespaces`), if it not exist in storage.
2251        //  2. For each `namespace` in `namespaces`, which does not exist in storage.
2252        if let Ok(repo) = self.storage.repository(*rid) {
2253            for namespace in namespaces.iter() {
2254                let Ok(local) = RefsAt::new(&repo, *namespace) else {
2255                    continue;
2256                };
2257
2258                for seed in self.db.seeds().seeds_for(rid)? {
2259                    let seed = seed?;
2260                    let state = self.sessions.get(&seed.nid).map(|s| s.state.clone());
2261                    let synced = if local.at == seed.synced_at.oid {
2262                        SyncStatus::Synced { at: seed.synced_at }
2263                    } else {
2264                        let local = SyncedAt::new(local.at, &repo)?;
2265
2266                        SyncStatus::OutOfSync {
2267                            local,
2268                            remote: seed.synced_at,
2269                        }
2270                    };
2271                    seeds.insert(Seed::new(seed.nid, seed.addresses, state, Some(synced)));
2272                }
2273            }
2274        }
2275
2276        // Then, add peers we know about but have no information about the sync status.
2277        // These peers have announced that they seed the repository via an inventory
2278        // announcement, but we haven't received any ref announcements from them.
2279        for nid in self.db.routing().get(rid)? {
2280            if namespaces.contains(&nid) {
2281                continue;
2282            }
2283            if seeds.contains(&nid) {
2284                // We already have a richer entry for this node.
2285                continue;
2286            }
2287            let addrs = self.db.addresses().addresses_of(&nid)?;
2288            let state = self.sessions.get(&nid).map(|s| s.state.clone());
2289
2290            seeds.insert(Seed::new(nid, addrs, state, None));
2291        }
2292        Ok(seeds)
2293    }
2294
2295    /// Return a new filter object, based on our seeding policy.
2296    fn filter(&self) -> Filter {
2297        if self.config.seeding_policy.is_allow() {
2298            // TODO: Remove bits for blocked repos.
2299            Filter::default()
2300        } else {
2301            self.filter.clone()
2302        }
2303    }
2304
2305    /// Get a timestamp for using in announcements.
2306    /// Never returns the same timestamp twice.
2307    fn timestamp(&mut self) -> Timestamp {
2308        let now = Timestamp::from(self.clock);
2309        if *now > *self.last_timestamp {
2310            self.last_timestamp = now;
2311        } else {
2312            self.last_timestamp = self.last_timestamp + 1;
2313        }
2314        self.last_timestamp
2315    }
2316
2317    fn relay(&mut self, id: gossip::AnnouncementId, ann: Announcement) {
2318        let announcer = ann.node;
2319        let relayed_by = self.relayed_by.get(&id);
2320        let rid = if let AnnouncementMessage::Refs(RefsAnnouncement { rid, .. }) = ann.message {
2321            Some(rid)
2322        } else {
2323            None
2324        };
2325        // Choose peers we should relay this message to.
2326        // 1. Don't relay to a peer who sent us this message.
2327        // 2. Don't relay to the peer who signed this announcement.
2328        let relay_to = self
2329            .sessions
2330            .connected()
2331            .filter(|(id, _)| {
2332                relayed_by
2333                    .map(|relayers| !relayers.contains(id))
2334                    .unwrap_or(true) // If there are no relayers we let it through.
2335            })
2336            .filter(|(id, _)| **id != announcer)
2337            .filter(|(id, _)| {
2338                if let Some(rid) = rid {
2339                    // Only relay this message if the peer is allowed to know about the
2340                    // repository. If we don't have the repository, return `false` because
2341                    // we can't determine if it's private or public.
2342                    self.storage
2343                        .get(rid)
2344                        .ok()
2345                        .flatten()
2346                        .map(|doc| doc.is_visible_to(&(*id).into()))
2347                        .unwrap_or(false)
2348                } else {
2349                    // Announcement doesn't concern a specific repository, let it through.
2350                    true
2351                }
2352            })
2353            .map(|(_, p)| p);
2354
2355        self.outbox.relay(ann, relay_to);
2356    }
2357
2358    ////////////////////////////////////////////////////////////////////////////
2359    // Periodic tasks
2360    ////////////////////////////////////////////////////////////////////////////
2361
2362    fn relay_announcements(&mut self) -> Result<(), Error> {
2363        let now = self.clock.into();
2364        let rows = self.database_mut().gossip_mut().relays(now)?;
2365        let local = self.node_id();
2366
2367        for (id, msg) in rows {
2368            let announcer = msg.node;
2369            if announcer == local {
2370                // Don't relay our own stored gossip messages.
2371                continue;
2372            }
2373            self.relay(id, msg);
2374        }
2375        Ok(())
2376    }
2377
2378    /// Announce our inventory to all connected peers, unless it was already announced.
2379    fn announce_inventory(&mut self) {
2380        let timestamp = self.inventory.timestamp.to_local_time();
2381
2382        if self.last_inventory == timestamp {
2383            debug!(target: "service", "Skipping redundant inventory announcement (t={})", self.inventory.timestamp);
2384            return;
2385        }
2386        let msg = AnnouncementMessage::from(self.inventory.clone());
2387
2388        self.outbox.announce(
2389            msg.signed(&self.signer),
2390            self.sessions.connected().map(|(_, p)| p),
2391            self.db.gossip_mut(),
2392        );
2393        self.last_inventory = timestamp;
2394    }
2395
2396    fn prune_routing_entries(&mut self, now: &LocalTime) -> Result<(), routing::Error> {
2397        let count = self.db.routing().len()?;
2398        if count <= self.config.limits.routing_max_size.into() {
2399            return Ok(());
2400        }
2401
2402        let delta = count - usize::from(self.config.limits.routing_max_size);
2403        let nid = self.node_id();
2404        self.db.routing_mut().prune(
2405            (*now - LocalDuration::from(self.config.limits.routing_max_age)).into(),
2406            Some(delta),
2407            &nid,
2408        )?;
2409        Ok(())
2410    }
2411
2412    fn disconnect_unresponsive_peers(&mut self, now: &LocalTime) {
2413        let stale = self
2414            .sessions
2415            .connected()
2416            .filter(|(_, session)| *now - session.last_active >= STALE_CONNECTION_TIMEOUT);
2417
2418        for (_, session) in stale {
2419            debug!(target: "service", "Disconnecting unresponsive peer {}..", session.id);
2420
2421            // TODO: Should we switch the session state to "disconnected" even before receiving
2422            // an official "disconnect"? Otherwise we keep pinging until we get the disconnection.
2423
2424            self.outbox.disconnect(
2425                session.id,
2426                DisconnectReason::Session(session::Error::Timeout),
2427            );
2428        }
2429    }
2430
2431    /// Ensure connection health by pinging connected peers.
2432    fn keep_alive(&mut self, now: &LocalTime) {
2433        let inactive_sessions = self
2434            .sessions
2435            .connected_mut()
2436            .filter(|(_, session)| *now - session.last_active >= KEEP_ALIVE_DELTA)
2437            .map(|(_, session)| session);
2438        for session in inactive_sessions {
2439            session.ping(self.clock, &mut self.outbox).ok();
2440        }
2441    }
2442
2443    /// Get a list of peers available to connect to, sorted by lowest penalty.
2444    fn available_peers(&mut self) -> Vec<Peer> {
2445        match self.db.addresses().entries() {
2446            Ok(entries) => {
2447                // Nb. we don't want to connect to any peers that already have a session with us,
2448                // even if it's in a disconnected state. Those sessions are re-attempted automatically.
2449                let mut peers = entries
2450                    .filter(|entry| entry.version == PROTOCOL_VERSION)
2451                    .filter(|entry| !entry.address.banned)
2452                    .filter(|entry| !entry.penalty.is_connect_threshold_reached())
2453                    .filter(|entry| !self.sessions.contains_key(&entry.node))
2454                    .filter(|entry| !self.policies.is_blocked(&entry.node).unwrap_or(false))
2455                    .filter(|entry| !self.config.external_addresses.contains(&entry.address.addr))
2456                    .filter(|entry| &entry.node != self.nid())
2457                    .filter(|entry| self.is_supported_address(&entry.address.addr))
2458                    .fold(HashMap::new(), |mut acc, entry| {
2459                        acc.entry(entry.node)
2460                            .and_modify(|e: &mut Peer| e.addresses.push(entry.address.clone()))
2461                            .or_insert_with(|| Peer {
2462                                nid: entry.node,
2463                                addresses: vec![entry.address],
2464                                penalty: entry.penalty,
2465                            });
2466                        acc
2467                    })
2468                    .into_values()
2469                    .collect::<Vec<_>>();
2470                peers.sort_by_key(|p| p.penalty);
2471                peers
2472            }
2473            Err(e) => {
2474                warn!(target: "service", "Unable to lookup available peers in address book: {e}");
2475                Vec::new()
2476            }
2477        }
2478    }
2479
2480    /// Fetch all repositories that are seeded but missing from storage.
2481    fn fetch_missing_repositories(&mut self) -> Result<(), Error> {
2482        let policies = self.policies.seed_policies()?.collect::<Vec<_>>();
2483        for policy in policies {
2484            let policy = match policy {
2485                Ok(policy) => policy,
2486                Err(err) => {
2487                    debug!(target: "protocol::filter", "Failed to read seed policy: {err}");
2488                    continue;
2489                }
2490            };
2491
2492            let rid = policy.rid;
2493
2494            if !policy.is_allow() {
2495                continue;
2496            }
2497            match self.storage.contains(&rid) {
2498                Ok(exists) => {
2499                    if exists {
2500                        continue;
2501                    }
2502                }
2503                Err(err) => {
2504                    log::debug!(target: "protocol::filter", "Failed to check if {rid} exists: {err}");
2505                    continue;
2506                }
2507            }
2508            match self.seeds(&rid, [self.node_id()].into()) {
2509                Ok(seeds) => {
2510                    if let Some(connected) = NonEmpty::from_vec(seeds.connected().collect()) {
2511                        for seed in connected {
2512                            self.fetch(rid, seed.nid, vec![], self.fetch_config(), None);
2513                        }
2514                    } else {
2515                        // TODO: We should make sure that this fetch is retried later, either
2516                        // when we connect to a seed, or when we discover a new seed.
2517                        // Since new connections and routing table updates are both conditions for
2518                        // fetching, we should trigger fetches when those conditions appear.
2519                        // Another way to handle this would be to update our database, saying
2520                        // that we're trying to fetch a certain repo. We would then just
2521                        // iterate over those entries in the above circumstances. This is
2522                        // merely an optimization though, we can also iterate over all seeded
2523                        // repos and check which ones are not in our inventory.
2524                        debug!(target: "service", "No connected seeds found for {rid}..");
2525                    }
2526                }
2527                Err(e) => {
2528                    debug!(target: "service", "Couldn't fetch missing repo {rid}: failed to lookup seeds: {e}");
2529                }
2530            }
2531        }
2532        Ok(())
2533    }
2534
2535    /// Run idle task for all connections.
2536    fn idle_connections(&mut self) {
2537        for (_, sess) in self.sessions.iter_mut() {
2538            sess.idle(self.clock);
2539
2540            if sess.is_stable() {
2541                // Mark as connected once connection is stable.
2542                if let Err(e) =
2543                    self.db
2544                        .addresses_mut()
2545                        .connected(&sess.id, &sess.addr, self.clock.into())
2546                {
2547                    warn!(target: "service", "Failed to update address book with connection: {e}");
2548                }
2549            }
2550        }
2551    }
2552
2553    /// Try to maintain a target number of connections.
2554    fn maintain_connections(&mut self) {
2555        let PeerConfig::Dynamic = self.config.peers else {
2556            return;
2557        };
2558        trace!(target: "service", "Maintaining connections..");
2559
2560        let target = TARGET_OUTBOUND_PEERS;
2561        let now = self.clock;
2562        let outbound = self
2563            .sessions
2564            .values()
2565            .filter(|s| s.link.is_outbound())
2566            .filter(|s| s.is_connected() || s.is_connecting())
2567            .count();
2568        let wanted = target.saturating_sub(outbound);
2569
2570        // Don't connect to more peers than needed.
2571        if wanted == 0 {
2572            return;
2573        }
2574
2575        // Peers available to connect to.
2576        let available = self
2577            .available_peers()
2578            .into_iter()
2579            .filter_map(|peer| {
2580                peer.addresses
2581                    .into_iter()
2582                    .find(|ka| match (ka.last_success, ka.last_attempt) {
2583                        // If we succeeded the last time we tried, this is a good address.
2584                        // If it's been long enough that we failed to connect, we also try again.
2585                        (Some(success), Some(attempt)) => {
2586                            success >= attempt || now - attempt >= CONNECTION_RETRY_DELTA
2587                        }
2588                        // If we haven't succeeded yet, and we waited long enough, we can try this address.
2589                        (None, Some(attempt)) => now - attempt >= CONNECTION_RETRY_DELTA,
2590                        // If we have no failed attempts for this address, it's worth a try.
2591                        (_, None) => true,
2592                    })
2593                    .map(|ka| (peer.nid, ka))
2594            })
2595            .filter(|(_, ka)| self.is_supported_address(&ka.addr));
2596
2597        // Peers we are going to attempt connections to.
2598        let connect = available.take(wanted).collect::<Vec<_>>();
2599        if connect.len() < wanted {
2600            log::debug!(
2601                target: "service",
2602                "Not enough available peers to connect to (available={}, wanted={wanted})",
2603                connect.len()
2604            );
2605        }
2606        for (id, ka) in connect {
2607            if let Err(e) = self.connect(id, ka.addr.clone()) {
2608                warn!(target: "service", "Service::maintain_connections connection error: {e}");
2609            }
2610        }
2611    }
2612
2613    /// Maintain persistent peer connections.
2614    fn maintain_persistent(&mut self) {
2615        trace!(target: "service", "Maintaining persistent peers..");
2616
2617        let now = self.local_time();
2618        let mut reconnect = Vec::new();
2619
2620        for (nid, session) in self.sessions.iter_mut() {
2621            if self.config.is_persistent(nid) {
2622                if self.policies.is_blocked(nid).unwrap_or(false) {
2623                    continue;
2624                }
2625                if let session::State::Disconnected { retry_at, .. } = &mut session.state {
2626                    // TODO: Try to reconnect only if the peer was attempted. A disconnect without
2627                    // even a successful attempt means that we're unlikely to be able to reconnect.
2628
2629                    if now >= *retry_at {
2630                        reconnect.push((*nid, session.addr.clone(), session.attempts()));
2631                    }
2632                }
2633            }
2634        }
2635
2636        for (nid, addr, attempts) in reconnect {
2637            if self.reconnect(nid, addr) {
2638                debug!(target: "service", "Reconnecting to {nid} (attempts={attempts})...");
2639            }
2640        }
2641    }
2642
2643    /// Checks if the given [`Address`] is supported for connecting to.
2644    ///
2645    /// # IPv4/IPv6/DNS
2646    ///
2647    /// Always returns `true`.
2648    ///
2649    /// # Tor
2650    ///
2651    /// If the [`Address`] is an `.onion` address and the service supports onion
2652    /// routing then this will return `true`.
2653    ///
2654    /// # I2P
2655    ///
2656    /// If the [`Address`] is an I2P address and the service supports I2P
2657    /// connections then this will return `true`.
2658    fn is_supported_address(&self, address: &Address) -> bool {
2659        match AddressType::from(address) {
2660            // Only consider onion addresses if configured.
2661            #[cfg(feature = "tor")]
2662            AddressType::Onion => self.config.onion != radicle::node::config::AddressConfig::Drop,
2663            #[cfg(feature = "i2p")]
2664            AddressType::I2p => self.config.i2p != radicle::node::config::AddressConfig::Drop,
2665            AddressType::Dns | AddressType::Ipv4 | AddressType::Ipv6 => true,
2666        }
2667    }
2668
2669    fn fetch_config(&self) -> fetcher::FetchConfig {
2670        fetcher::FetchConfig::default()
2671            .with_minimum_feature_level(self.config.fetch.feature_level_min())
2672    }
2673}
2674
2675/// Gives read access to the service state.
2676pub trait ServiceState {
2677    /// Get the Node ID.
2678    fn nid(&self) -> &NodeId;
2679    /// Get the existing sessions.
2680    fn sessions(&self) -> &Sessions;
2681    /// Get fetch state.
2682    fn fetching(&self) -> &FetcherState;
2683    /// Get outbox.
2684    fn outbox(&self) -> &Outbox;
2685    /// Get rate limiter.
2686    fn limiter(&self) -> &RateLimiter;
2687    /// Get event emitter.
2688    fn emitter(&self) -> &Emitter<Event>;
2689    /// Get a repository from storage.
2690    fn get(&self, rid: RepoId) -> Result<Option<Doc>, RepositoryError>;
2691    /// Get the clock.
2692    fn clock(&self) -> &LocalTime;
2693    /// Get the clock mutably.
2694    fn clock_mut(&mut self) -> &mut LocalTime;
2695    /// Get service configuration.
2696    fn config(&self) -> &Config;
2697    /// Get service metrics.
2698    fn metrics(&self) -> &Metrics;
2699}
2700
2701impl<D, S, G> ServiceState for Service<D, S, G>
2702where
2703    D: routing::Store,
2704    G: crypto::signature::Signer<crypto::Signature>,
2705    S: ReadStorage,
2706{
2707    fn nid(&self) -> &NodeId {
2708        self.signer.public_key()
2709    }
2710
2711    fn sessions(&self) -> &Sessions {
2712        &self.sessions
2713    }
2714
2715    fn fetching(&self) -> &FetcherState {
2716        self.fetcher.state()
2717    }
2718
2719    fn outbox(&self) -> &Outbox {
2720        &self.outbox
2721    }
2722
2723    fn limiter(&self) -> &RateLimiter {
2724        &self.limiter
2725    }
2726
2727    fn emitter(&self) -> &Emitter<Event> {
2728        &self.emitter
2729    }
2730
2731    fn get(&self, rid: RepoId) -> Result<Option<Doc>, RepositoryError> {
2732        self.storage.get(rid)
2733    }
2734
2735    fn clock(&self) -> &LocalTime {
2736        &self.clock
2737    }
2738
2739    fn clock_mut(&mut self) -> &mut LocalTime {
2740        &mut self.clock
2741    }
2742
2743    fn config(&self) -> &Config {
2744        &self.config
2745    }
2746
2747    fn metrics(&self) -> &Metrics {
2748        &self.metrics
2749    }
2750}
2751
2752/// Disconnect reason.
2753#[derive(Debug)]
2754pub enum DisconnectReason {
2755    /// Error while dialing the remote. This error occurs before a connection is
2756    /// even established. Errors of this kind are usually not transient.
2757    Dial(Arc<dyn std::error::Error + Sync + Send>),
2758    /// Error with an underlying established connection. Sometimes, reconnecting
2759    /// after such an error is possible.
2760    Connection(Arc<dyn std::error::Error + Sync + Send>),
2761    /// Error with a fetch.
2762    Fetch(FetchError),
2763    /// Session error.
2764    Session(session::Error),
2765    /// Session conflicts with existing session.
2766    Conflict,
2767    /// Connection to self.
2768    SelfConnection,
2769    /// Peer is blocked by policy
2770    Policy,
2771    /// User requested disconnect
2772    Command,
2773}
2774
2775impl DisconnectReason {
2776    pub fn is_dial_err(&self) -> bool {
2777        matches!(self, Self::Dial(_))
2778    }
2779
2780    pub fn is_connection_err(&self) -> bool {
2781        matches!(self, Self::Connection(_))
2782    }
2783
2784    pub fn connection() -> Self {
2785        DisconnectReason::Connection(Arc::new(std::io::Error::from(
2786            std::io::ErrorKind::ConnectionReset,
2787        )))
2788    }
2789}
2790
2791impl fmt::Display for DisconnectReason {
2792    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2793        match self {
2794            Self::Dial(err) => write!(f, "{err}"),
2795            Self::Connection(err) => write!(f, "{err}"),
2796            Self::Command => write!(f, "command"),
2797            Self::SelfConnection => write!(f, "self-connection"),
2798            Self::Conflict => write!(f, "conflict"),
2799            Self::Policy => write!(f, "policy"),
2800            Self::Session(err) => write!(f, "{err}"),
2801            Self::Fetch(err) => write!(f, "fetch: {err}"),
2802        }
2803    }
2804}
2805
2806/// Result of a project lookup.
2807#[derive(Debug)]
2808pub struct Lookup {
2809    /// Whether or not the project was found locally.
2810    pub local: Option<Doc>,
2811    /// A list of remote peers on which the project is known to exist.
2812    pub remote: Vec<NodeId>,
2813}
2814
2815#[derive(thiserror::Error, Debug)]
2816pub enum LookupError {
2817    #[error(transparent)]
2818    Routing(#[from] routing::Error),
2819    #[error(transparent)]
2820    Repository(#[from] RepositoryError),
2821}
2822
2823#[derive(Debug, Clone)]
2824/// Holds currently (or recently) connected peers.
2825pub struct Sessions(AddressBook<NodeId, Session>);
2826
2827impl Sessions {
2828    pub fn new(rng: Rng) -> Self {
2829        Self(AddressBook::new(rng))
2830    }
2831
2832    /// Iterator over fully connected peers.
2833    pub fn connected(&self) -> impl Iterator<Item = (&NodeId, &Session)> + Clone {
2834        self.0
2835            .iter()
2836            .filter_map(move |(id, sess)| match &sess.state {
2837                session::State::Connected { .. } => Some((id, sess)),
2838                _ => None,
2839            })
2840    }
2841
2842    /// Iterator over connected inbound peers.
2843    pub fn inbound(&self) -> impl Iterator<Item = (&NodeId, &Session)> + Clone {
2844        self.connected().filter(|(_, s)| s.link.is_inbound())
2845    }
2846
2847    /// Iterator over outbound peers.
2848    pub fn outbound(&self) -> impl Iterator<Item = (&NodeId, &Session)> + Clone {
2849        self.connected().filter(|(_, s)| s.link.is_outbound())
2850    }
2851
2852    /// Iterator over mutable fully connected peers.
2853    pub fn connected_mut(&mut self) -> impl Iterator<Item = (&NodeId, &mut Session)> {
2854        self.0.iter_mut().filter(move |(_, s)| s.is_connected())
2855    }
2856
2857    /// Iterator over disconnected peers.
2858    pub fn disconnected_mut(&mut self) -> impl Iterator<Item = (&NodeId, &mut Session)> {
2859        self.0.iter_mut().filter(move |(_, s)| s.is_disconnected())
2860    }
2861
2862    /// Return whether this node has a fully established session.
2863    pub fn is_connected(&self, id: &NodeId) -> bool {
2864        self.0.get(id).map(|s| s.is_connected()).unwrap_or(false)
2865    }
2866
2867    /// Return whether this node can be connected to.
2868    pub fn is_disconnected(&self, id: &NodeId) -> bool {
2869        self.0.get(id).map(|s| s.is_disconnected()).unwrap_or(true)
2870    }
2871}
2872
2873impl Deref for Sessions {
2874    type Target = AddressBook<NodeId, Session>;
2875
2876    fn deref(&self) -> &Self::Target {
2877        &self.0
2878    }
2879}
2880
2881impl DerefMut for Sessions {
2882    fn deref_mut(&mut self) -> &mut Self::Target {
2883        &mut self.0
2884    }
2885}