Skip to main content

radicle_protocol/
service.rs

1#![allow(clippy::too_many_arguments)]
2#![allow(clippy::collapsible_match)]
3#![allow(clippy::collapsible_if)]
4#![warn(clippy::unwrap_used)]
5pub mod command;
6pub use command::{Command, QueryState};
7
8pub mod filter;
9pub mod gossip;
10pub mod io;
11pub mod limiter;
12pub mod message;
13pub mod session;
14
15use std::collections::hash_map::Entry;
16use std::collections::{BTreeSet, HashMap, HashSet};
17use std::net::IpAddr;
18use std::ops::{Deref, DerefMut};
19use std::sync::Arc;
20use std::{fmt, net, time};
21
22use crossbeam_channel as chan;
23use fastrand::Rng;
24use localtime::{LocalDuration, LocalTime};
25use log::*;
26use nonempty::NonEmpty;
27
28use radicle::identity::Doc;
29use radicle::node;
30use radicle::node::address;
31use radicle::node::address::Store as _;
32use radicle::node::address::{AddressBook, AddressType, KnownAddress};
33use radicle::node::config::{PeerConfig, RateLimit};
34use radicle::node::device::Device;
35use radicle::node::refs::Store as _;
36use radicle::node::routing::Store as _;
37use radicle::node::seed;
38use radicle::node::seed::Store as _;
39use radicle::node::{Penalty, Severity};
40use radicle::storage::refs::{FeatureLevel, SIGREFS_BRANCH};
41use radicle::storage::{RepositoryError, RepositoryInfo, SignedRefsInfo};
42use radicle_fetch::policy::SeedingPolicy;
43
44use crate::fetcher;
45use crate::fetcher::service::FetcherService;
46use crate::fetcher::FetcherState;
47use crate::fetcher::RefsToFetch;
48use crate::service::gossip::Store as _;
49use crate::service::message::{
50    Announcement, AnnouncementMessage, Info, NodeAnnouncement, Ping, RefsAnnouncement, RefsStatus,
51};
52use crate::service::policy::{store::Write, Scope};
53use radicle::identity::RepoId;
54use radicle::node::events::Emitter;
55use radicle::node::routing;
56use radicle::node::routing::InsertResult;
57use radicle::node::{Address, Features, FetchResult, HostName, Seed, Seeds, SyncStatus, SyncedAt};
58use radicle::prelude::*;
59use radicle::storage;
60use radicle::storage::{refs::RefsAt, Namespaces, ReadStorage};
61// use radicle::worker::fetch;
62// use crate::worker::FetchError;
63use radicle::crypto;
64use radicle::node::Link;
65use radicle::node::PROTOCOL_VERSION;
66
67use crate::bounded::BoundedVec;
68use crate::service::filter::Filter;
69pub use crate::service::message::{Message, ZeroBytes};
70pub use crate::service::session::{QueuedFetch, Session};
71use crate::worker::FetchError;
72use radicle::node::events::{Event, Events};
73use radicle::node::{Config, NodeId};
74
75use radicle::node::policy::config as policy;
76
77use self::io::Outbox;
78use self::limiter::RateLimiter;
79use self::message::InventoryAnnouncement;
80use self::policy::NamespacesError;
81
82/// How often to run the "idle" task.
83pub const IDLE_INTERVAL: LocalDuration = LocalDuration::from_secs(30);
84/// How often to run the "gossip" task.
85pub const GOSSIP_INTERVAL: LocalDuration = LocalDuration::from_secs(6);
86/// How often to run the "announce" task.
87pub const ANNOUNCE_INTERVAL: LocalDuration = LocalDuration::from_mins(60);
88/// How often to run the "sync" task.
89pub const SYNC_INTERVAL: LocalDuration = LocalDuration::from_secs(60);
90/// How often to run the "prune" task.
91pub const PRUNE_INTERVAL: LocalDuration = LocalDuration::from_mins(30);
92/// Duration to wait on an unresponsive peer before dropping its connection.
93pub const STALE_CONNECTION_TIMEOUT: LocalDuration = LocalDuration::from_mins(2);
94/// How much time should pass after a peer was last active for a *ping* to be sent.
95pub const KEEP_ALIVE_DELTA: LocalDuration = LocalDuration::from_mins(1);
96/// Maximum number of latency values to keep for a session.
97pub const MAX_LATENCIES: usize = 16;
98/// Maximum time difference between the local time, and an announcement timestamp.
99pub const MAX_TIME_DELTA: LocalDuration = LocalDuration::from_mins(60);
100/// Maximum attempts to connect to a peer before we give up.
101pub const MAX_CONNECTION_ATTEMPTS: usize = 3;
102/// How far back from the present time should we request gossip messages when connecting to a peer,
103/// when we come online for the first time.
104pub const INITIAL_SUBSCRIBE_BACKLOG_DELTA: LocalDuration = LocalDuration::from_mins(60 * 24);
105/// When subscribing, what margin of error do we give ourselves. A igher delta means we ask for
106/// messages further back than strictly necessary, to account for missed messages.
107pub const SUBSCRIBE_BACKLOG_DELTA: LocalDuration = LocalDuration::from_mins(3);
108/// Minimum amount of time to wait before reconnecting to a peer.
109pub const MIN_RECONNECTION_DELTA: LocalDuration = LocalDuration::from_secs(3);
110/// Maximum amount of time to wait before reconnecting to a peer.
111pub const MAX_RECONNECTION_DELTA: LocalDuration = LocalDuration::from_mins(60);
112/// Connection retry delta used for ephemeral peers that failed to connect previously.
113pub const CONNECTION_RETRY_DELTA: LocalDuration = LocalDuration::from_mins(10);
114/// How long to wait for a fetch to stall before aborting, default is 30s.
115pub const FETCH_TIMEOUT: time::Duration = time::Duration::from_secs(30);
116/// Target number of peers to maintain connections to.
117pub const TARGET_OUTBOUND_PEERS: usize = 8;
118
119/// Maximum external address limit imposed by message size limits.
120pub use message::ADDRESS_LIMIT;
121/// Maximum inventory limit imposed by message size limits.
122pub use message::INVENTORY_LIMIT;
123/// Maximum number of project git references imposed by message size limits.
124pub use message::REF_REMOTE_LIMIT;
125
126/// Metrics we track.
127#[derive(Clone, Debug, Default, serde::Serialize)]
128#[serde(rename_all = "camelCase")]
129pub struct Metrics {
130    /// Metrics for each peer.
131    pub peers: HashMap<NodeId, PeerMetrics>,
132    /// Tasks queued in worker queue.
133    pub worker_queue_size: usize,
134    /// Current open channel count.
135    pub open_channels: usize,
136}
137
138impl Metrics {
139    /// Get metrics for the given peer.
140    pub fn peer(&mut self, nid: NodeId) -> &mut PeerMetrics {
141        self.peers.entry(nid).or_default()
142    }
143}
144
145/// Per-peer metrics we track.
146#[derive(Clone, Debug, Default, serde::Serialize)]
147#[serde(rename_all = "camelCase")]
148pub struct PeerMetrics {
149    pub received_git_bytes: usize,
150    pub received_fetch_requests: usize,
151    pub received_bytes: usize,
152    pub received_gossip_messages: usize,
153    pub sent_bytes: usize,
154    pub sent_fetch_requests: usize,
155    pub sent_git_bytes: usize,
156    pub sent_gossip_messages: usize,
157    pub streams_opened: usize,
158    pub inbound_connection_attempts: usize,
159    pub outbound_connection_attempts: usize,
160    pub disconnects: usize,
161}
162
163/// Result of syncing our routing table with a node's inventory.
164#[derive(Default)]
165struct SyncedRouting {
166    /// Repo entries added.
167    added: Vec<RepoId>,
168    /// Repo entries removed.
169    removed: Vec<RepoId>,
170    /// Repo entries updated (time).
171    updated: Vec<RepoId>,
172}
173
174impl SyncedRouting {
175    fn is_empty(&self) -> bool {
176        self.added.is_empty() && self.removed.is_empty() && self.updated.is_empty()
177    }
178}
179
180/// A peer we can connect to.
181#[derive(Debug, Clone)]
182struct Peer {
183    nid: NodeId,
184    addresses: Vec<KnownAddress>,
185    penalty: Penalty,
186}
187
188/// General service error.
189#[derive(thiserror::Error, Debug)]
190pub enum Error {
191    #[error(transparent)]
192    Git(#[from] radicle::git::raw::Error),
193    #[error(transparent)]
194    Storage(#[from] storage::Error),
195    #[error(transparent)]
196    Gossip(#[from] gossip::Error),
197    #[error(transparent)]
198    Refs(#[from] storage::refs::Error),
199    #[error(transparent)]
200    Routing(#[from] routing::Error),
201    #[error(transparent)]
202    Address(#[from] address::Error),
203    #[error(transparent)]
204    Database(#[from] node::db::Error),
205    #[error(transparent)]
206    Seeds(#[from] seed::Error),
207    #[error(transparent)]
208    Policy(#[from] policy::Error),
209    #[error(transparent)]
210    Repository(#[from] radicle::storage::RepositoryError),
211    #[error("namespaces error: {0}")]
212    Namespaces(Box<NamespacesError>),
213}
214
215impl From<NamespacesError> for Error {
216    fn from(e: NamespacesError) -> Self {
217        Self::Namespaces(Box::new(e))
218    }
219}
220
221#[derive(thiserror::Error, Debug)]
222pub enum ConnectError {
223    #[error("attempted connection to peer {nid} which already has a session")]
224    SessionExists { nid: NodeId },
225    #[error("attempted connection to self")]
226    SelfConnection,
227    #[error("outbound connection limit reached when attempting {nid} ({addr})")]
228    LimitReached { nid: NodeId, addr: Address },
229    #[error(
230        "attempted connection to {nid}, via {addr} but addresses of this kind are not supported"
231    )]
232    UnsupportedAddress { nid: NodeId, addr: Address },
233    #[error("attempted connection with blocked peer {nid}")]
234    Blocked { nid: NodeId },
235}
236
237/// A store for all node data.
238pub trait Store:
239    address::Store + gossip::Store + routing::Store + seed::Store + node::refs::Store
240{
241}
242
243impl Store for radicle::node::Database {}
244
245/// Fetch state for an ongoing fetch.
246#[derive(Debug)]
247pub struct FetchState {
248    /// Node we're fetching from.
249    pub from: NodeId,
250    /// What refs we're fetching.
251    pub refs_at: Vec<RefsAt>,
252    /// Channels waiting for fetch results.
253    pub subscribers: Vec<chan::Sender<FetchResult>>,
254}
255
256/// Holds all node stores.
257#[derive(Debug)]
258pub struct Stores<D>(D);
259
260impl<D> Stores<D>
261where
262    D: Store,
263{
264    /// Get the database as a routing store.
265    pub fn routing(&self) -> &impl routing::Store {
266        &self.0
267    }
268
269    /// Get the database as a routing store, mutably.
270    pub fn routing_mut(&mut self) -> &mut impl routing::Store {
271        &mut self.0
272    }
273
274    /// Get the database as an address store.
275    pub fn addresses(&self) -> &impl address::Store {
276        &self.0
277    }
278
279    /// Get the database as an address store, mutably.
280    pub fn addresses_mut(&mut self) -> &mut impl address::Store {
281        &mut self.0
282    }
283
284    /// Get the database as a gossip store.
285    pub fn gossip(&self) -> &impl gossip::Store {
286        &self.0
287    }
288
289    /// Get the database as a gossip store, mutably.
290    pub fn gossip_mut(&mut self) -> &mut impl gossip::Store {
291        &mut self.0
292    }
293
294    /// Get the database as a seed store.
295    pub fn seeds(&self) -> &impl seed::Store {
296        &self.0
297    }
298
299    /// Get the database as a seed store, mutably.
300    pub fn seeds_mut(&mut self) -> &mut impl seed::Store {
301        &mut self.0
302    }
303
304    /// Get the database as a refs db.
305    pub fn refs(&self) -> &impl node::refs::Store {
306        &self.0
307    }
308
309    /// Get the database as a refs db, mutably.
310    pub fn refs_mut(&mut self) -> &mut impl node::refs::Store {
311        &mut self.0
312    }
313}
314
315impl<D> AsMut<D> for Stores<D> {
316    fn as_mut(&mut self) -> &mut D {
317        &mut self.0
318    }
319}
320
321impl<D> From<D> for Stores<D> {
322    fn from(db: D) -> Self {
323        Self(db)
324    }
325}
326
327/// The node service.
328#[derive(Debug)]
329pub struct Service<D, S, G> {
330    /// Service configuration.
331    config: Config,
332    /// Our cryptographic signer and key.
333    signer: Device<G>,
334    /// Project storage.
335    storage: S,
336    /// Node database.
337    db: Stores<D>,
338    /// Policy configuration.
339    policies: policy::Config<Write>,
340    /// Peer sessions, currently or recently connected.
341    sessions: Sessions,
342    /// Clock. Tells the time.
343    clock: LocalTime,
344    /// Who relayed what announcement to us. We keep track of this to ensure that
345    /// we don't relay messages to nodes that already know about these messages.
346    relayed_by: HashMap<gossip::AnnouncementId, Vec<NodeId>>,
347    /// I/O outbox.
348    outbox: Outbox,
349    /// Cached local node announcement.
350    node: NodeAnnouncement,
351    /// Cached local inventory announcement.
352    inventory: InventoryAnnouncement,
353    /// Source of entropy.
354    rng: Rng,
355    fetcher: FetcherService<command::Responder<FetchResult>>,
356    /// Request/connection rate limiter.
357    limiter: RateLimiter,
358    /// Current seeded repositories bloom filter.
359    filter: Filter,
360    /// Last time the service was idle.
361    last_idle: LocalTime,
362    /// Last time the gossip messages were relayed.
363    last_gossip: LocalTime,
364    /// Last time the service synced.
365    last_sync: LocalTime,
366    /// Last time the service routing table was pruned.
367    last_prune: LocalTime,
368    /// Last time the announcement task was run.
369    last_announce: LocalTime,
370    /// Timestamp of last local inventory announced.
371    last_inventory: LocalTime,
372    /// Last timestamp used for announcements.
373    last_timestamp: Timestamp,
374    /// Time when the service was initialized, or `None` if it wasn't initialized.
375    started_at: Option<LocalTime>,
376    /// Time when the service was last online, or `None` if this is the first time.
377    last_online_at: Option<LocalTime>,
378    /// Publishes events to subscribers.
379    emitter: Emitter<Event>,
380    /// Local listening addresses.
381    listening: Vec<net::SocketAddr>,
382    /// Latest metrics for all nodes connected to since the last start.
383    metrics: Metrics,
384}
385
386impl<D, S, G> Service<D, S, G> {
387    /// Get the local node id.
388    pub fn node_id(&self) -> NodeId {
389        *self.signer.public_key()
390    }
391
392    /// Get the local service time.
393    pub fn local_time(&self) -> LocalTime {
394        self.clock
395    }
396
397    pub fn emitter(&self) -> Emitter<Event> {
398        self.emitter.clone()
399    }
400}
401
402impl<D, S, G> Service<D, S, G>
403where
404    D: Store,
405    S: WriteStorage + 'static,
406    G: crypto::signature::Signer<crypto::Signature>,
407{
408    /// Initialize service with current time. Call this once.
409    pub fn initialize(&mut self, time: LocalTime) -> Result<(), Error> {
410        debug!(target: "service", "Init @{}", time.as_millis());
411        assert_ne!(time, LocalTime::default());
412
413        let nid = self.node_id();
414
415        self.clock = time;
416        self.started_at = Some(time);
417        self.last_online_at = match self.db.gossip().last() {
418            Ok(Some(last)) => Some(last.to_local_time()),
419            Ok(None) => None,
420            Err(e) => {
421                warn!(target: "service", "Failed to get the latest gossip message from db: {e}");
422                None
423            }
424        };
425
426        // Populate refs database. This is only useful as part of the upgrade process for nodes
427        // that have been online since before the refs database was created.
428        match self.db.refs().count() {
429            Ok(0) => {
430                info!(target: "service", "Empty refs database, populating from storage..");
431                if let Err(e) = self.db.refs_mut().populate(&self.storage) {
432                    warn!(target: "service", "Failed to populate refs database: {e}");
433                }
434            }
435            Ok(n) => debug!(target: "service", "Refs database has {n} cached references"),
436            Err(e) => {
437                warn!(target: "service", "Failed to retrieve count of refs from database: {e}")
438            }
439        }
440
441        let announced = self
442            .db
443            .seeds()
444            .seeded_by(&nid)?
445            .collect::<Result<HashMap<_, _>, _>>()?;
446        let mut inventory = BTreeSet::new();
447        let mut private = BTreeSet::new();
448
449        for repo in self.storage.repositories()? {
450            let repo = self.upgrade_sigrefs(repo)?;
451            let rid = repo.rid;
452
453            // If we're not seeding this repo, just skip it.
454            if !self.policies.is_seeding(&rid)? {
455                debug!(target: "service", "Local repository {rid} is not seeded");
456                continue;
457            }
458            // Add public repositories to inventory.
459            if repo.doc.is_public() {
460                inventory.insert(rid);
461            } else {
462                private.insert(rid);
463            }
464            // If we have no owned refs for this repo, then there's nothing to announce.
465            let Some(updated_at) = repo.synced_at else {
466                continue;
467            };
468            // Skip this repo if the sync status matches what we have in storage.
469            if let Some(announced) = announced.get(&rid) {
470                if updated_at.oid == announced.oid {
471                    continue;
472                }
473            }
474            // Make sure our local node's sync status is up to date with storage.
475            if self.db.seeds_mut().synced(
476                &rid,
477                &nid,
478                updated_at.oid,
479                updated_at.timestamp.into(),
480            )? {
481                debug!(target: "service", "Saved local sync status for {rid}..");
482            }
483            // If we got here, it likely means a repo was updated while the node was stopped.
484            // Therefore, we pre-load a refs announcement for this repo, so that it is included in
485            // the historical gossip messages when a node connects and subscribes to this repo.
486            if let Ok((ann, _)) = self.refs_announcement_for(rid, [nid]) {
487                debug!(target: "service", "Adding refs announcement for {rid} to historical gossip messages..");
488                self.db.gossip_mut().announced(&nid, &ann)?;
489            }
490        }
491
492        // Ensure that our inventory is recorded in our routing table, and we are seeding
493        // all of it. It can happen that inventory is not properly seeded if for eg. the
494        // user creates a new repository while the node is stopped.
495        self.db
496            .routing_mut()
497            .add_inventory(inventory.iter(), nid, time.into())?;
498        self.inventory = gossip::inventory(self.timestamp(), inventory);
499
500        // Ensure that private repositories are not in our inventory. It's possible that
501        // a repository was public and then it was made private.
502        self.db
503            .routing_mut()
504            .remove_inventories(private.iter(), &nid)?;
505
506        // Setup subscription filter for seeded repos.
507        self.filter = Filter::allowed_by(self.policies.seed_policies()?);
508        // Connect to configured peers.
509        let addrs = self.config.connect.clone();
510        for (id, addr) in addrs.into_iter().map(|ca| ca.into()) {
511            if let Err(e) = self.connect(id, addr) {
512                debug!(target: "service", "Service::initialization connection error: {e}");
513            }
514        }
515        // Try to establish some connections.
516        self.maintain_connections();
517        // Start periodic tasks.
518        self.outbox.wakeup(IDLE_INTERVAL);
519        self.outbox.wakeup(GOSSIP_INTERVAL);
520
521        Ok(())
522    }
523
524    fn upgrade_sigrefs(&mut self, mut info: RepositoryInfo) -> Result<RepositoryInfo, Error> {
525        if !matches!(info.refs, SignedRefsInfo::NeedsMigration) {
526            return Ok(info);
527        }
528
529        let rid = info.rid;
530
531        log::info!(
532            "Migrating `rad/sigrefs` of {rid} to force feature level {}.",
533            FeatureLevel::LATEST
534        );
535
536        let repo = self.storage.repository_mut(rid)?;
537        // NOTE: We assume to reach `FeatureLevel::LATEST` by signing refs.
538        let refs = repo.force_sign_refs(&self.signer)?;
539
540        let repo = self.storage.repository(rid)?;
541        let synced_at = SyncedAt::new(refs.at, &repo)?;
542
543        info.synced_at = Some(synced_at);
544        info.refs = SignedRefsInfo::Some(refs);
545
546        Ok(info)
547    }
548}
549
550impl<D, S, G> Service<D, S, G>
551where
552    D: Store,
553    S: ReadStorage + 'static,
554    G: crypto::signature::Signer<crypto::Signature>,
555{
556    pub fn new(
557        config: Config,
558        db: Stores<D>,
559        storage: S,
560        policies: policy::Config<Write>,
561        signer: Device<G>,
562        rng: Rng,
563        node: NodeAnnouncement,
564        emitter: Emitter<Event>,
565    ) -> Self {
566        let sessions = Sessions::new(rng.clone());
567        let limiter = RateLimiter::new(config.peers());
568        let last_timestamp = node.timestamp;
569        let clock = LocalTime::default(); // Updated on initialize.
570        let inventory = gossip::inventory(clock.into(), []); // Updated on initialize.
571        let fetcher = {
572            let config = fetcher::Config::new()
573                .with_max_concurrency(
574                    std::num::NonZeroUsize::new(config.limits.fetch_concurrency.into())
575                        .expect("fetch concurrency was zero, must be at least 1"),
576                )
577                .with_max_capacity(fetcher::MaxQueueSize::default());
578            FetcherService::new(config)
579        };
580        Self {
581            config,
582            storage,
583            policies,
584            signer,
585            rng,
586            inventory,
587            node,
588            clock,
589            db,
590            outbox: Outbox::default(),
591            limiter,
592            sessions,
593            fetcher,
594            filter: Filter::empty(),
595            relayed_by: HashMap::default(),
596            last_idle: LocalTime::default(),
597            last_gossip: LocalTime::default(),
598            last_sync: LocalTime::default(),
599            last_prune: LocalTime::default(),
600            last_timestamp,
601            last_announce: LocalTime::default(),
602            last_inventory: LocalTime::default(),
603            started_at: None,     // Updated on initialize.
604            last_online_at: None, // Updated on initialize.
605            emitter,
606            listening: vec![],
607            metrics: Metrics::default(),
608        }
609    }
610
611    /// Whether the service was started (initialized) and if so, at what time.
612    pub fn started(&self) -> Option<LocalTime> {
613        self.started_at
614    }
615
616    /// Return the next i/o action to execute.
617    #[allow(clippy::should_implement_trait)]
618    pub fn next(&mut self) -> Option<io::Io> {
619        self.outbox.next()
620    }
621
622    /// Seed a repository.
623    /// Returns whether or not the repo policy was updated.
624    pub fn seed(&mut self, id: &RepoId, scope: Scope) -> Result<bool, policy::Error> {
625        let updated = self.policies.seed(id, scope)?;
626        self.filter.insert(id);
627
628        Ok(updated)
629    }
630
631    /// Unseed a repository.
632    /// Returns whether or not the repo policy was updated.
633    /// Note that when unseeding, we don't announce anything to the network. This is because by
634    /// simply not announcing it anymore, it will eventually be pruned by nodes.
635    pub fn unseed(&mut self, id: &RepoId) -> Result<bool, policy::Error> {
636        let updated = self.policies.unseed(id)?;
637
638        if updated {
639            // Nb. This is potentially slow if we have lots of repos. We should probably
640            // only re-compute the filter when we've unseeded a certain amount of repos
641            // and the filter is really out of date.
642            self.filter = Filter::allowed_by(self.policies.seed_policies()?);
643            // Update and announce new inventory.
644            if let Err(e) = self.remove_inventory(id) {
645                warn!(target: "service", "Failed to update inventory after unseed: {e}");
646            }
647        }
648        Ok(updated)
649    }
650
651    /// Find the closest `n` peers by proximity in seeding graphs.
652    /// Returns a sorted list from the closest peer to the furthest.
653    /// Peers with more seedings in common score score higher.
654    #[allow(unused)]
655    pub fn closest_peers(&self, n: usize) -> Vec<NodeId> {
656        todo!()
657    }
658
659    /// Get the database.
660    pub fn database(&self) -> &Stores<D> {
661        &self.db
662    }
663
664    /// Get the mutable database.
665    pub fn database_mut(&mut self) -> &mut Stores<D> {
666        &mut self.db
667    }
668
669    /// Get the storage instance.
670    pub fn storage(&self) -> &S {
671        &self.storage
672    }
673
674    /// Get the mutable storage instance.
675    pub fn storage_mut(&mut self) -> &mut S {
676        &mut self.storage
677    }
678
679    /// Get the node policies.
680    pub fn policies(&self) -> &policy::Config<Write> {
681        &self.policies
682    }
683
684    /// Get the local signer.
685    pub fn signer(&self) -> &Device<G> {
686        &self.signer
687    }
688
689    /// Subscriber to inner `Emitter` events.
690    pub fn events(&mut self) -> Events {
691        Events::from(self.emitter.subscribe())
692    }
693
694    pub fn fetcher(&self) -> &FetcherState {
695        self.fetcher.state()
696    }
697
698    /// Get I/O outbox.
699    pub fn outbox(&mut self) -> &mut Outbox {
700        &mut self.outbox
701    }
702
703    /// Get configuration.
704    pub fn config(&self) -> &Config {
705        &self.config
706    }
707
708    /// Lookup a repository, both locally and in the routing table.
709    pub fn lookup(&self, rid: RepoId) -> Result<Lookup, LookupError> {
710        let this = self.nid();
711        let local = self.storage.get(rid)?;
712        let remote = self
713            .db
714            .routing()
715            .get(&rid)?
716            .iter()
717            .filter(|nid| nid != &this)
718            .cloned()
719            .collect();
720
721        Ok(Lookup { local, remote })
722    }
723
724    pub fn tick(&mut self, now: LocalTime, metrics: &Metrics) {
725        trace!(
726            target: "service",
727            "Tick +{}",
728            now - self.started_at.expect("Service::tick: service must be initialized")
729        );
730        if now >= self.clock {
731            self.clock = now;
732        } else {
733            // Nb. In tests, we often move the clock forwards in time to test different behaviors,
734            // so this warning isn't applicable there.
735            #[cfg(not(test))]
736            warn!(
737                target: "service",
738                "System clock is not monotonic: {now} is not greater or equal to {}", self.clock
739            );
740        }
741        self.metrics = metrics.clone();
742    }
743
744    pub fn wake(&mut self) {
745        let now = self.clock;
746
747        trace!(
748            target: "service",
749            "Wake +{}",
750            now - self.started_at.expect("Service::wake: service must be initialized")
751        );
752
753        if now - self.last_idle >= IDLE_INTERVAL {
754            trace!(target: "service", "Running 'idle' task...");
755
756            self.keep_alive(&now);
757            self.disconnect_unresponsive_peers(&now);
758            self.idle_connections();
759            self.maintain_connections();
760            self.dequeue_fetches();
761            self.outbox.wakeup(IDLE_INTERVAL);
762            self.last_idle = now;
763        }
764        if now - self.last_gossip >= GOSSIP_INTERVAL {
765            trace!(target: "service", "Running 'gossip' task...");
766
767            if let Err(e) = self.relay_announcements() {
768                warn!(target: "service", "Failed to relay stored announcements: {e}");
769            }
770            self.outbox.wakeup(GOSSIP_INTERVAL);
771            self.last_gossip = now;
772        }
773        if now - self.last_sync >= SYNC_INTERVAL {
774            trace!(target: "service", "Running 'sync' task...");
775
776            if let Err(e) = self.fetch_missing_repositories() {
777                warn!(target: "service", "Failed to fetch missing inventory: {e}");
778            }
779            self.outbox.wakeup(SYNC_INTERVAL);
780            self.last_sync = now;
781        }
782        if now - self.last_announce >= ANNOUNCE_INTERVAL {
783            trace!(target: "service", "Running 'announce' task...");
784
785            self.announce_inventory();
786            self.outbox.wakeup(ANNOUNCE_INTERVAL);
787            self.last_announce = now;
788        }
789        if now - self.last_prune >= PRUNE_INTERVAL {
790            trace!(target: "service", "Running 'prune' task...");
791
792            if let Err(err) = self.prune_routing_entries(&now) {
793                warn!(target: "service", "Failed to prune routing entries: {err}");
794            }
795            if let Err(err) = self
796                .db
797                .gossip_mut()
798                .prune((now - LocalDuration::from(self.config.limits.gossip_max_age)).into())
799            {
800                warn!(target: "service", "Failed to prune gossip entries: {err}");
801            }
802
803            self.outbox.wakeup(PRUNE_INTERVAL);
804            self.last_prune = now;
805        }
806
807        // Always check whether there are persistent peers that need reconnecting.
808        self.maintain_persistent();
809    }
810
811    pub fn command(&mut self, cmd: Command) {
812        info!(target: "service", "Received command {cmd:?}");
813
814        match cmd {
815            Command::Connect(nid, addr, opts) => {
816                if opts.persistent {
817                    self.config.connect.insert((nid, addr.clone()).into());
818                }
819                if let Err(e) = self.connect(nid, addr) {
820                    match e {
821                        ConnectError::SessionExists { nid } => {
822                            self.emitter.emit(Event::PeerConnected { nid });
823                        }
824                        e => {
825                            // N.b. using the fact that the call to connect waits for an event
826                            self.emitter.emit(Event::PeerDisconnected {
827                                nid,
828                                reason: e.to_string(),
829                            });
830                        }
831                    }
832                }
833            }
834            Command::Disconnect(nid) => {
835                self.outbox.disconnect(nid, DisconnectReason::Command);
836            }
837            Command::Config(resp) => {
838                resp.ok(self.config.clone()).ok();
839            }
840            Command::ListenAddrs(resp) => {
841                resp.ok(self.listening.clone()).ok();
842            }
843            Command::Seeds(rid, namespaces, resp) => match self.seeds(&rid, namespaces) {
844                Ok(seeds) => {
845                    let (connected, disconnected) = seeds.partition();
846                    debug!(
847                        target: "service",
848                        "Found {} connected seed(s) and {} disconnected seed(s) for {}",
849                        connected.len(), disconnected.len(),  rid
850                    );
851                    resp.ok(seeds).ok();
852                }
853                Err(e) => {
854                    warn!(target: "service", "Failed to get seeds for {rid}: {e}");
855                    resp.err(e).ok();
856                }
857            },
858            Command::Fetch(rid, seed, timeout, signed_references_minimum_feature_level, resp) => {
859                let feature_level = signed_references_minimum_feature_level
860                    .unwrap_or(self.config.fetch.feature_level_min());
861                let config = self
862                    .fetch_config()
863                    .with_timeout(timeout)
864                    .with_minimum_feature_level(feature_level);
865                self.fetch(rid, seed, vec![], config, Some(resp));
866            }
867            Command::Seed(rid, scope, resp) => {
868                // Update our seeding policy.
869                let seeded = self
870                    .seed(&rid, scope)
871                    .expect("Service::command: error seeding repository");
872                resp.ok(seeded).ok();
873
874                // Let all our peers know that we're interested in this repo from now on.
875                self.outbox.broadcast(
876                    Message::subscribe(self.filter(), self.clock.into(), Timestamp::MAX),
877                    self.sessions.connected().map(|(_, s)| s),
878                );
879            }
880            Command::Unseed(id, resp) => {
881                let updated = self
882                    .unseed(&id)
883                    .expect("Service::command: error unseeding repository");
884                resp.ok(updated).ok();
885            }
886            Command::Follow(id, alias, resp) => {
887                let seeded = self
888                    .policies
889                    .follow(&id, alias.as_ref())
890                    .expect("Service::command: error following node");
891                resp.ok(seeded).ok();
892            }
893            Command::Unfollow(id, resp) => {
894                let updated = self
895                    .policies
896                    .unfollow(&id)
897                    .expect("Service::command: error unfollowing node");
898                resp.ok(updated).ok();
899            }
900            Command::Block(id, resp) => {
901                let updated = self
902                    .policies
903                    .set_follow_policy(&id, policy::Policy::Block)
904                    .expect("Service::command: error blocking node");
905                if updated {
906                    self.outbox.disconnect(id, DisconnectReason::Policy);
907                }
908                resp.send(updated).ok();
909            }
910            Command::AnnounceRefs(id, namespaces, resp) => {
911                let doc = match self.storage.get(id) {
912                    Ok(Some(doc)) => doc,
913                    Ok(None) => {
914                        warn!(target: "service", "Failed to announce refs: repository {id} not found");
915                        resp.err(command::Error::custom(format!("repository {id} not found")))
916                            .ok();
917                        return;
918                    }
919                    Err(e) => {
920                        warn!(target: "service", "Failed to announce refs: doc error: {e}");
921                        resp.err(e).ok();
922                        return;
923                    }
924                };
925
926                match self.announce_own_refs(id, doc, namespaces) {
927                    Ok((refs, _timestamp)) => {
928                        // TODO(finto): currently the command caller only
929                        // expects one `RefsAt`, this should be fixed in the
930                        // trait, eventually.
931                        if let Some(refs) = refs.first() {
932                            resp.ok(*refs).ok();
933                        } else {
934                            resp.err(command::Error::custom(format!(
935                                "no refs were announced for {id}"
936                            )))
937                            .ok();
938                        }
939                    }
940                    Err(err) => {
941                        warn!(target: "service", "Failed to announce refs: {err}");
942                        resp.err(err).ok();
943                    }
944                }
945            }
946            Command::AnnounceInventory => {
947                self.announce_inventory();
948            }
949            Command::AddInventory(rid, resp) => match self.add_inventory(rid) {
950                Ok(updated) => {
951                    resp.ok(updated).ok();
952                }
953                Err(e) => {
954                    warn!(target: "service", "Failed to add {rid} to inventory: {e}");
955                    resp.err(e).ok();
956                }
957            },
958            Command::QueryState(query, sender) => {
959                sender.send(query(self)).ok();
960            }
961        }
962    }
963
964    /// Initiate an outgoing fetch for some repository, based on another node's announcement.
965    /// Returns `true` if the fetch was initiated and `false` if it was skipped.
966    fn fetch_refs_at(
967        &mut self,
968        rid: RepoId,
969        from: NodeId,
970        refs: NonEmpty<RefsAt>,
971        scope: Scope,
972        config: fetcher::FetchConfig,
973    ) -> bool {
974        match self.refs_status_of(rid, refs, &scope) {
975            Ok(status) => {
976                if status.want.is_empty() {
977                    debug!(target: "service", "Skipping fetch for {rid}, all refs are already in storage");
978                } else {
979                    self.fetch(rid, from, status.want, config, None);
980                    return true;
981                }
982            }
983            Err(e) => {
984                warn!(target: "service", "Failed to get the refs status of {rid}: {e}");
985            }
986        }
987        // We didn't try to fetch anything.
988        false
989    }
990
991    fn fetch(
992        &mut self,
993        rid: RepoId,
994        from: NodeId,
995        refs_at: Vec<RefsAt>,
996        config: fetcher::FetchConfig,
997        channel: Option<command::Responder<FetchResult>>,
998    ) {
999        let session = {
1000            let reason = format!("peer {from} is not connected; cannot initiate fetch");
1001            let Some(session) = self.sessions.get_mut(&from) else {
1002                if let Some(c) = channel {
1003                    c.ok(FetchResult::Failed { reason }).ok();
1004                }
1005                return;
1006            };
1007            if !session.is_connected() {
1008                if let Some(c) = channel {
1009                    c.ok(FetchResult::Failed { reason }).ok();
1010                }
1011                return;
1012            }
1013            session
1014        };
1015
1016        let cmd = fetcher::state::command::Fetch {
1017            from,
1018            rid,
1019            refs: refs_at.into(),
1020            config,
1021        };
1022        let fetcher::service::FetchInitiated { event, rejected } = self.fetcher.fetch(cmd, channel);
1023
1024        if let Some(c) = rejected {
1025            c.ok(FetchResult::Failed {
1026                reason: "fetch queue at capacity".to_string(),
1027            })
1028            .ok();
1029        }
1030
1031        match event {
1032            fetcher::state::event::Fetch::Started {
1033                rid,
1034                from,
1035                refs: refs_at,
1036                config,
1037            } => {
1038                debug!(target: "service", "Starting fetch for {rid} from {from}");
1039                self.outbox.fetch(
1040                    session,
1041                    rid,
1042                    refs_at.into(),
1043                    self.config.limits.fetch_pack_receive,
1044                    config,
1045                );
1046            }
1047            fetcher::state::event::Fetch::Queued { rid, from } => {
1048                debug!(target: "service", "Queued fetch for {rid} from {from}");
1049            }
1050            fetcher::state::event::Fetch::AlreadyFetching { rid, from } => {
1051                debug!(target: "service", "Already fetching {rid} from {from}");
1052            }
1053            fetcher::state::event::Fetch::QueueAtCapacity { rid, from, .. } => {
1054                debug!(target: "service", "Queue at capacity for {from}, rejected {rid}");
1055            }
1056        }
1057    }
1058
1059    pub fn fetched(
1060        &mut self,
1061        rid: RepoId,
1062        from: NodeId,
1063        result: Result<crate::worker::fetch::FetchResult, crate::worker::FetchError>,
1064    ) {
1065        let cmd = fetcher::state::command::Fetched { from, rid };
1066        let fetcher::service::FetchCompleted { event, subscribers } = self.fetcher.fetched(cmd);
1067
1068        // Dequeue next fetches
1069        self.dequeue_fetches();
1070
1071        match event {
1072            fetcher::state::event::Fetched::NotFound { from, rid } => {
1073                debug!(target: "service", "Unexpected fetch result for {rid} from {from}");
1074            }
1075            fetcher::state::event::Fetched::Completed { from, rid, refs: _ } => {
1076                // Notify responders
1077                let fetch_result = match &result {
1078                    Ok(success) => FetchResult::Success {
1079                        updated: success.updated.clone(),
1080                        namespaces: success.namespaces.clone(),
1081                        clone: success.clone,
1082                    },
1083                    Err(e) => FetchResult::Failed {
1084                        reason: e.to_string(),
1085                    },
1086                };
1087                for responder in subscribers {
1088                    responder.ok(fetch_result.clone()).ok();
1089                }
1090                match result {
1091                    Ok(crate::worker::fetch::FetchResult {
1092                        updated,
1093                        canonical,
1094                        namespaces,
1095                        clone,
1096                        doc,
1097                    }) => {
1098                        info!(target: "service", "Fetched {rid} from {from} successfully");
1099                        // Update our routing table in case this fetch was user-initiated and doesn't
1100                        // come from an announcement.
1101                        self.seed_discovered(rid, from, self.clock.into());
1102
1103                        for update in &updated {
1104                            if update.is_skipped() {
1105                                trace!(target: "service", "Ref skipped: {update} for {rid}");
1106                            } else {
1107                                debug!(target: "service", "Ref updated: {update} for {rid}");
1108                            }
1109                        }
1110                        self.emitter.emit(Event::RefsFetched {
1111                            remote: from,
1112                            rid,
1113                            updated: updated.clone(),
1114                        });
1115                        self.emitter
1116                            .emit_all(canonical.into_iter().map(|(refname, target)| {
1117                                Event::CanonicalRefUpdated {
1118                                    rid,
1119                                    refname,
1120                                    target,
1121                                }
1122                            }));
1123
1124                        // Announce our new inventory if this fetch was a full clone.
1125                        // Only update and announce inventory for public repositories.
1126                        if clone && doc.is_public() {
1127                            debug!(target: "service", "Updating and announcing inventory for cloned repository {rid}..");
1128
1129                            if let Err(e) = self.add_inventory(rid) {
1130                                warn!(target: "service", "Failed to announce inventory for {rid}: {e}");
1131                            }
1132                        }
1133
1134                        // It's possible for a fetch to succeed but nothing was updated.
1135                        if updated.is_empty() || updated.iter().all(|u| u.is_skipped()) {
1136                            debug!(target: "service", "Nothing to announce, no refs were updated..");
1137                        } else {
1138                            // Finally, announce the refs. This is useful for nodes to know what we've synced,
1139                            // beyond just knowing that we have added an item to our inventory.
1140                            if let Err(e) = self.announce_refs(rid, doc.into(), namespaces, false) {
1141                                warn!(target: "service", "Failed to announce new refs: {e}");
1142                            }
1143                        }
1144                    }
1145                    Err(err) => {
1146                        warn!(target: "service", "Fetch failed for {rid} from {from}: {err}");
1147
1148                        // For now, we only disconnect the from in case of timeout. In the future,
1149                        // there may be other reasons to disconnect.
1150                        if err.is_timeout() {
1151                            self.outbox.disconnect(from, DisconnectReason::Fetch(err));
1152                        }
1153                    }
1154                }
1155            }
1156        }
1157    }
1158
1159    /// Attempt to dequeue fetches from all peers.
1160    /// At most one fetch is dequeued per peer. If the fetch cannot be processed,
1161    /// it is put back on the queue for that peer.
1162    ///
1163    /// Fetches are queued for two reasons:
1164    /// 1. The RID was already being fetched.
1165    /// 2. The session was already at fetch capacity.
1166    pub fn dequeue_fetches(&mut self) {
1167        let sessions = self
1168            .sessions
1169            .shuffled()
1170            .map(|(k, _)| *k)
1171            .collect::<Vec<_>>();
1172
1173        for nid in sessions {
1174            #[allow(clippy::unwrap_used)]
1175            let sess = self.sessions.get_mut(&nid).unwrap();
1176            if !sess.is_connected() {
1177                continue;
1178            }
1179
1180            let Some(fetcher::QueuedFetch {
1181                rid,
1182                refs: refs_at,
1183                config,
1184            }) = self.fetcher.dequeue(&nid)
1185            else {
1186                continue;
1187            };
1188
1189            // Check seeding policy
1190            let repo_entry = self
1191                .policies
1192                .seed_policy(&rid)
1193                .expect("error accessing repo seeding configuration");
1194
1195            let SeedingPolicy::Allow { scope } = repo_entry.policy else {
1196                debug!(target: "service", "Repository {} no longer seeded, skipping", rid);
1197                continue;
1198            };
1199
1200            debug!(target: "service", "Dequeued fetch for {} from {}", rid, nid);
1201
1202            match refs_at {
1203                RefsToFetch::Refs(refs) => {
1204                    self.fetch_refs_at(rid, nid, refs, scope, config);
1205                }
1206                RefsToFetch::All => {
1207                    // Channel is `None` since they will already be
1208                    // registered with the fetcher service.
1209                    self.fetch(rid, nid, vec![], config, None);
1210                }
1211            }
1212        }
1213    }
1214
1215    /// Inbound connection attempt.
1216    pub fn accepted(&mut self, ip: IpAddr) -> bool {
1217        // Always accept localhost connections, even if we already reached
1218        // our inbound connection limit.
1219        if ip.is_loopback() || ip.is_unspecified() {
1220            return true;
1221        }
1222        // Check for inbound connection limit.
1223        if self.sessions.inbound().count() >= self.config.limits.connection.inbound.into() {
1224            return false;
1225        }
1226        match self.db.addresses().is_ip_banned(ip) {
1227            Ok(banned) => {
1228                if banned {
1229                    debug!(target: "service", "Rejecting inbound connection from banned ip {ip}");
1230                    return false;
1231                }
1232            }
1233            Err(e) => warn!(target: "service", "Failed to query ban status for {ip}: {e}"),
1234        }
1235        let host: HostName = ip.into();
1236        let tokens = self.config.limits.rate.inbound;
1237
1238        if self.limiter.limit(host.clone(), None, &tokens, self.clock) {
1239            trace!(target: "service", "Rate limiting inbound connection from {host}..");
1240            return false;
1241        }
1242        true
1243    }
1244
1245    pub fn attempted(&mut self, nid: NodeId, addr: Address) {
1246        debug!(target: "service", "Attempted connection to {nid} ({addr})");
1247
1248        if let Some(sess) = self.sessions.get_mut(&nid) {
1249            sess.to_attempted();
1250        } else {
1251            #[cfg(debug_assertions)]
1252            panic!("Service::attempted: unknown session {nid}@{addr}");
1253        }
1254    }
1255
1256    pub fn listening(&mut self, local_addr: net::SocketAddr) {
1257        info!(target: "node", "Listening on {local_addr}..");
1258
1259        self.listening.push(local_addr);
1260    }
1261
1262    pub fn connected(&mut self, remote: NodeId, addr: Address, link: Link) {
1263        if let Ok(true) = self.policies.is_blocked(&remote) {
1264            self.emitter.emit(Event::PeerDisconnected {
1265                nid: remote,
1266                reason: format!("{remote} is blocked"),
1267            });
1268            info!(target: "service", "Disconnecting blocked inbound peer {remote}");
1269            self.outbox.disconnect(remote, DisconnectReason::Policy);
1270            return;
1271        }
1272
1273        info!(target: "service", "Connected to {remote} ({addr}) ({link:?})");
1274        self.emitter.emit(Event::PeerConnected { nid: remote });
1275
1276        let msgs = self.initial(link);
1277
1278        if link.is_outbound() {
1279            if let Some(peer) = self.sessions.get_mut(&remote) {
1280                peer.to_connected(self.clock);
1281                self.outbox.write_all(peer, msgs);
1282            }
1283        } else {
1284            match self.sessions.entry(remote) {
1285                Entry::Occupied(mut e) => {
1286                    // In this scenario, it's possible that our peer is persistent, and
1287                    // disconnected. We get an inbound connection before we attempt a re-connection,
1288                    // and therefore we treat it as a regular inbound connection.
1289                    //
1290                    // It's also possible that a disconnection hasn't gone through yet and our
1291                    // peer is still in connected state here, while a new inbound connection from
1292                    // that same peer is made. This results in a new connection from a peer that is
1293                    // already connected from the perspective of the service. This appears to be
1294                    // a bug in the underlying networking library.
1295                    let peer = e.get_mut();
1296                    debug!(
1297                        target: "service",
1298                        "Connecting peer {remote} already has a session open ({peer})"
1299                    );
1300                    peer.link = link;
1301                    peer.to_connected(self.clock);
1302                    self.outbox.write_all(peer, msgs);
1303                }
1304                Entry::Vacant(e) => {
1305                    if let HostName::Ip(ip) = addr.host {
1306                        if !address::is_local(&ip) {
1307                            if let Err(e) =
1308                                self.db
1309                                    .addresses_mut()
1310                                    .record_ip(&remote, ip, self.clock.into())
1311                            {
1312                                log::debug!(target: "service", "Failed to record IP address for {remote}: {e}");
1313                            }
1314                        }
1315                    }
1316                    let peer = e.insert(Session::inbound(
1317                        remote,
1318                        addr,
1319                        self.config.is_persistent(&remote),
1320                        self.rng.clone(),
1321                        self.clock,
1322                    ));
1323                    self.outbox.write_all(peer, msgs);
1324                }
1325            }
1326        }
1327    }
1328
1329    pub fn disconnected(&mut self, remote: NodeId, link: Link, reason: &DisconnectReason) {
1330        let since = self.local_time();
1331        let Some(session) = self.sessions.get_mut(&remote) else {
1332            // Since we sometimes disconnect the service eagerly, it's not unusual to get a second
1333            // disconnection event once the transport is dropped.
1334            trace!(target: "service", "Redundant disconnection for {remote} ({reason})");
1335            return;
1336        };
1337        // In cases of connection conflicts, there may be disconnections of one of the two
1338        // connections. In that case we don't want the service to remove the session.
1339        if session.link != link {
1340            return;
1341        }
1342
1343        info!(target: "service", "Disconnected from {remote} ({reason})");
1344        self.emitter.emit(Event::PeerDisconnected {
1345            nid: remote,
1346            reason: reason.to_string(),
1347        });
1348
1349        let link = session.link;
1350        let addr = session.addr.clone();
1351
1352        let cmd = fetcher::state::command::Cancel { from: remote };
1353        let fetcher::service::FetchesCancelled { event, orphaned } = self.fetcher.cancel(cmd);
1354
1355        match event {
1356            fetcher::state::event::Cancel::Unexpected { from } => {
1357                debug!(target: "service", "No fetches to cancel for {from}");
1358            }
1359            fetcher::state::event::Cancel::Canceled {
1360                from,
1361                active,
1362                queued,
1363            } => {
1364                debug!(target: "service", "Cancelled {} ongoing, {} queued for {from}", active.len(), queued.len());
1365            }
1366        }
1367
1368        // Notify orphaned responders
1369        for (rid, responder) in orphaned {
1370            responder
1371                .ok(FetchResult::Failed {
1372                    reason: format!("failed fetch to {rid}, peer disconnected: {reason}"),
1373                })
1374                .ok();
1375        }
1376
1377        // Attempt to re-connect to persistent peers.
1378        if self.config.is_persistent(&remote) {
1379            let delay = LocalDuration::from_secs(2u64.saturating_pow(session.attempts() as u32))
1380                .clamp(MIN_RECONNECTION_DELTA, MAX_RECONNECTION_DELTA);
1381
1382            // Nb. We always try to reconnect to persistent peers, even when the error appears
1383            // to not be transient.
1384            session.to_disconnected(since, since + delay);
1385
1386            debug!(target: "service", "Reconnecting to {remote} in {delay}..");
1387
1388            self.outbox.wakeup(delay);
1389        } else {
1390            debug!(target: "service", "Dropping peer {remote}..");
1391            self.sessions.remove(&remote);
1392
1393            let severity = match reason {
1394                DisconnectReason::Dial(_)
1395                | DisconnectReason::Fetch(_)
1396                | DisconnectReason::Connection(_) => {
1397                    if self.is_online() {
1398                        // If we're "online", there's something wrong with this
1399                        // peer connection specifically.
1400                        Severity::Medium
1401                    } else {
1402                        Severity::Low
1403                    }
1404                }
1405                DisconnectReason::Session(e) => e.severity(),
1406                DisconnectReason::Command
1407                | DisconnectReason::Conflict
1408                | DisconnectReason::Policy
1409                | DisconnectReason::SelfConnection => Severity::Low,
1410            };
1411
1412            if let Err(e) = self
1413                .db
1414                .addresses_mut()
1415                .disconnected(&remote, &addr, severity)
1416            {
1417                debug!(target: "service", "Failed to update address store: {e}");
1418            }
1419            // Only re-attempt outbound connections, since we don't care if an inbound connection
1420            // is dropped.
1421            if link.is_outbound() {
1422                self.maintain_connections();
1423            }
1424        }
1425        self.dequeue_fetches();
1426    }
1427
1428    pub fn received_message(&mut self, remote: NodeId, message: Message) {
1429        if let Err(err) = self.handle_message(&remote, message) {
1430            // If there's an error, stop processing messages from this peer.
1431            // However, we still relay messages returned up to this point.
1432            self.outbox
1433                .disconnect(remote, DisconnectReason::Session(err));
1434
1435            // FIXME: The peer should be set in a state such that we don't
1436            // process further messages.
1437        }
1438    }
1439
1440    /// Handle an announcement message.
1441    ///
1442    /// Returns `true` if this announcement should be stored and relayed to connected peers,
1443    /// and `false` if it should not.
1444    pub fn handle_announcement(
1445        &mut self,
1446        relayer: &NodeId,
1447        relayer_addr: &Address,
1448        announcement: &Announcement,
1449    ) -> Result<Option<gossip::AnnouncementId>, session::Error> {
1450        if !announcement.verify() {
1451            return Err(session::Error::Misbehavior);
1452        }
1453        let Announcement {
1454            node: announcer,
1455            message,
1456            ..
1457        } = announcement;
1458
1459        // Ignore our own announcements, in case the relayer sent one by mistake.
1460        if announcer == self.nid() {
1461            return Ok(None);
1462        }
1463        let now = self.clock;
1464        let timestamp = message.timestamp();
1465
1466        // Don't allow messages from too far in the future.
1467        if timestamp.saturating_sub(now.as_millis()) > MAX_TIME_DELTA.as_millis() as u64 {
1468            return Err(session::Error::InvalidTimestamp(timestamp));
1469        }
1470
1471        // We don't process announcements from nodes we don't know, since the node announcement is
1472        // what provides DoS protection.
1473        //
1474        // Note that it's possible to *not* receive the node announcement, but receive the
1475        // subsequent announcements of a node in the case of historical gossip messages requested
1476        // from the `subscribe` message. This can happen if the cut-off time is after the node
1477        // announcement timestamp, but before the other announcements. In that case, we simply
1478        // ignore all announcements of that node until we get a node announcement.
1479        if let AnnouncementMessage::Inventory(_) | AnnouncementMessage::Refs(_) = message {
1480            match self.db.addresses().get(announcer) {
1481                Ok(node) => {
1482                    if node.is_none() {
1483                        debug!(target: "service", "Ignoring announcement from unknown node {announcer} (t={timestamp})");
1484                        return Ok(None);
1485                    }
1486                }
1487                Err(e) => {
1488                    debug!(target: "service", "Failed to look up node in address book: {e}");
1489                    return Ok(None);
1490                }
1491            }
1492        }
1493
1494        // Discard announcement messages we've already seen, otherwise update our last seen time.
1495        let relay = match self.db.gossip_mut().announced(announcer, announcement) {
1496            Ok(Some(id)) => {
1497                log::debug!(
1498                    target: "service",
1499                    "Stored announcement from {announcer} to be broadcast in {} (t={timestamp})",
1500                    (self.last_gossip + GOSSIP_INTERVAL) - self.clock
1501                );
1502                // Keep track of who relayed the message for later.
1503                self.relayed_by.entry(id).or_default().push(*relayer);
1504
1505                // Decide whether or not to relay this message, if it's fresh.
1506                // To avoid spamming peers on startup with historical gossip messages,
1507                // don't relay messages that are too old. We make an exception for node announcements,
1508                // since they are cached, and will hence often carry old timestamps.
1509                let relay = message.is_node_announcement()
1510                    || now - timestamp.to_local_time() <= MAX_TIME_DELTA;
1511                relay.then_some(id)
1512            }
1513            Ok(None) => {
1514                // FIXME: Still mark as relayed by this peer.
1515                // FIXME: Refs announcements should not be delayed, since they are only sent
1516                // to subscribers.
1517                debug!(target: "service", "Ignoring stale announcement from {announcer} (t={timestamp})");
1518                return Ok(None);
1519            }
1520            Err(e) => {
1521                debug!(target: "service", "Failed to update gossip entry from {announcer}: {e}");
1522                return Ok(None);
1523            }
1524        };
1525
1526        match message {
1527            // Process a peer inventory update announcement by (maybe) fetching.
1528            AnnouncementMessage::Inventory(message) => {
1529                self.emitter.emit(Event::InventoryAnnounced {
1530                    nid: *announcer,
1531                    inventory: message.inventory.to_vec(),
1532                    timestamp: message.timestamp,
1533                });
1534                match self.sync_routing(
1535                    message.inventory.iter().cloned(),
1536                    *announcer,
1537                    message.timestamp,
1538                ) {
1539                    Ok(synced) => {
1540                        if synced.is_empty() {
1541                            trace!(target: "service", "No routes updated by inventory announcement from {announcer}");
1542                            return Ok(None);
1543                        }
1544                    }
1545                    Err(e) => {
1546                        debug!(target: "service", "Failed to process inventory from {announcer}: {e}");
1547                        return Ok(None);
1548                    }
1549                }
1550                let mut missing = Vec::new();
1551                let nid = *self.nid();
1552
1553                // Here we handle the special case where the inventory we received is that of
1554                // a connected peer, as opposed to being relayed to us.
1555                if let Some(sess) = self.sessions.get_mut(announcer) {
1556                    for id in message.inventory.as_slice() {
1557                        // If we are connected to the announcer of this inventory, update the peer's
1558                        // subscription filter to include all inventory items. This way, we'll
1559                        // relay messages relating to the peer's inventory.
1560                        if let Some(sub) = &mut sess.subscribe {
1561                            sub.filter.insert(id);
1562                        }
1563
1564                        // If we're seeding and connected to the announcer, and we don't have
1565                        // the inventory, fetch it from the announcer.
1566                        if self.policies.is_seeding(id).expect(
1567                            "Service::handle_announcement: error accessing seeding configuration",
1568                        ) {
1569                            // Only if we do not have the repository locally do we fetch here.
1570                            // If we do have it, only fetch after receiving a ref announcement.
1571                            match self.db.routing().entry(id, &nid) {
1572                                Ok(entry) => {
1573                                    if entry.is_none() {
1574                                        missing.push(*id);
1575                                    }
1576                                }
1577                                Err(e) => debug!(
1578                                    target: "service",
1579                                    "Error checking local inventory for {id}: {e}"
1580                                ),
1581                            }
1582                        }
1583                    }
1584                }
1585                // Since we have limited fetch capacity, it may be that we can't fetch an entire
1586                // inventory from a peer. Therefore we randomize the order of the RIDs to fetch
1587                // different RIDs from different peers in case multiple peers announce the same
1588                // RIDs.
1589                self.rng.shuffle(&mut missing);
1590
1591                for rid in missing {
1592                    debug!(target: "service", "Missing seeded inventory {rid}; initiating fetch..");
1593                    self.fetch(rid, *announcer, vec![], self.fetch_config(), None);
1594                }
1595                return Ok(relay);
1596            }
1597            AnnouncementMessage::Refs(message) => {
1598                self.emitter.emit(Event::RefsAnnounced {
1599                    nid: *announcer,
1600                    rid: message.rid,
1601                    refs: message.refs.to_vec(),
1602                    timestamp: message.timestamp,
1603                });
1604                // Empty announcements can be safely ignored.
1605                let Some(refs) = NonEmpty::from_vec(message.refs.to_vec()) else {
1606                    debug!(target: "service", "Skipping fetch, no refs in announcement for {} (t={timestamp})", message.rid);
1607                    return Ok(None);
1608                };
1609                // We update inventories when receiving ref announcements, as these could come
1610                // from a new repository being initialized.
1611                self.seed_discovered(message.rid, *announcer, message.timestamp);
1612
1613                // Update sync status of announcer for this repo.
1614                if let Some(refs) = refs.iter().find(|r| &r.remote == self.nid()) {
1615                    debug!(
1616                        target: "service",
1617                        "Refs announcement of {announcer} for {} contains our own remote at {} (t={})",
1618                        message.rid, refs.at, message.timestamp
1619                    );
1620                    match self.db.seeds_mut().synced(
1621                        &message.rid,
1622                        announcer,
1623                        refs.at,
1624                        message.timestamp,
1625                    ) {
1626                        Ok(updated) => {
1627                            if updated {
1628                                debug!(
1629                                    target: "service",
1630                                    "Updating sync status of {announcer} for {} to {}",
1631                                    message.rid, refs.at
1632                                );
1633                                self.emitter.emit(Event::RefsSynced {
1634                                    rid: message.rid,
1635                                    remote: *announcer,
1636                                    at: refs.at,
1637                                });
1638                            } else {
1639                                debug!(
1640                                    target: "service",
1641                                    "Sync status of {announcer} was not updated for {}",
1642                                    message.rid,
1643                                );
1644                            }
1645                        }
1646                        Err(e) => {
1647                            debug!(target: "service", "Failed to update sync status for {}: {e}", message.rid);
1648                        }
1649                    }
1650                }
1651                let repo_entry = self.policies.seed_policy(&message.rid).expect(
1652                    "Service::handle_announcement: error accessing repo seeding configuration",
1653                );
1654                let SeedingPolicy::Allow { scope } = repo_entry.policy else {
1655                    debug!(
1656                        target: "service",
1657                        "Ignoring refs announcement from {announcer}: repository {} isn't seeded (t={timestamp})",
1658                        message.rid
1659                    );
1660                    return Ok(None);
1661                };
1662                // Ref announcements may be relayed by peers who don't have the
1663                // actual refs in storage, therefore we only check whether we
1664                // are connected to the *announcer*, which is required by the
1665                // protocol to only announce refs it has.
1666                //
1667                // TODO(Ade): Perhaps it makes sense to establish connections to
1668                // followed but unconnected peers. Consider:
1669                //   Connections: Alice ←→ Bob ←→ Eve
1670                //   Follows:     Alice ←→ Eve
1671                // Eve announces refs, and Bob relays these announcements to Alice.
1672                // Then, Alice might determine that Bob does not have Eve's refs,
1673                // and therefore connect directly to Eve in order to fetch.
1674                let Some(remote) = self.sessions.get(announcer).cloned() else {
1675                    trace!(
1676                        target: "service",
1677                        "Skipping fetch of {}, no sessions connected to {announcer}",
1678                        message.rid
1679                    );
1680                    return Ok(relay);
1681                };
1682                // Finally, start the fetch.
1683                self.fetch_refs_at(message.rid, remote.id, refs, scope, self.fetch_config());
1684
1685                return Ok(relay);
1686            }
1687            AnnouncementMessage::Node(
1688                ann @ NodeAnnouncement {
1689                    features,
1690                    addresses,
1691                    ..
1692                },
1693            ) => {
1694                self.emitter.emit(Event::NodeAnnounced {
1695                    nid: *announcer,
1696                    alias: ann.alias.clone(),
1697                    timestamp: ann.timestamp,
1698                    features: *features,
1699                    addresses: addresses.to_vec(),
1700                });
1701                // If this node isn't a seed, we're not interested in adding it
1702                // to our address book, but other nodes may be, so we relay the message anyway.
1703                if !features.has(Features::SEED) {
1704                    return Ok(relay);
1705                }
1706
1707                match self.db.addresses_mut().insert(
1708                    announcer,
1709                    ann.version,
1710                    ann.features,
1711                    &ann.alias,
1712                    ann.work(),
1713                    &ann.agent,
1714                    timestamp,
1715                    addresses
1716                        .iter()
1717                        // Ignore non-routable addresses unless received from a local network
1718                        // peer. This allows the node to function in a local network.
1719                        .filter(|a| a.is_routable() || relayer_addr.is_local())
1720                        .map(|a| KnownAddress::new(a.clone(), address::Source::Peer)),
1721                ) {
1722                    Ok(updated) => {
1723                        // Only relay if we received new information.
1724                        if updated {
1725                            debug!(
1726                                target: "service",
1727                                "Address store entry for node {announcer} updated at {timestamp}"
1728                            );
1729                            return Ok(relay);
1730                        }
1731                    }
1732                    Err(err) => {
1733                        // An error here is due to a fault in our address store.
1734                        warn!(target: "service", "Failed to process node announcement from {announcer}: {err}");
1735                    }
1736                }
1737            }
1738        }
1739        Ok(None)
1740    }
1741
1742    pub fn handle_info(&mut self, remote: NodeId, info: &Info) -> Result<(), session::Error> {
1743        match info {
1744            // Nb. We don't currently send this message.
1745            Info::RefsAlreadySynced { rid, at } => {
1746                debug!(target: "service", "Refs already synced for {rid} by {remote}");
1747                self.emitter.emit(Event::RefsSynced {
1748                    rid: *rid,
1749                    remote,
1750                    at: *at,
1751                });
1752            }
1753        }
1754
1755        Ok(())
1756    }
1757
1758    pub fn handle_message(
1759        &mut self,
1760        remote: &NodeId,
1761        message: Message,
1762    ) -> Result<(), session::Error> {
1763        let local = self.node_id();
1764        let relay = self.config.is_relay();
1765        let Some(peer) = self.sessions.get_mut(remote) else {
1766            debug!(target: "service", "Session not found for {remote}");
1767            return Ok(());
1768        };
1769        peer.last_active = self.clock;
1770
1771        let limit: RateLimit = match peer.link {
1772            Link::Outbound => self.config.limits.rate.outbound.into(),
1773            Link::Inbound => self.config.limits.rate.inbound.into(),
1774        };
1775        if self
1776            .limiter
1777            .limit(peer.addr.clone().into(), Some(remote), &limit, self.clock)
1778        {
1779            debug!(target: "service", "Rate limiting message from {remote} ({})", peer.addr);
1780            return Ok(());
1781        }
1782        message.log(log::Level::Debug, remote, Link::Inbound);
1783
1784        let connected = match &mut peer.state {
1785            session::State::Disconnected { .. } => {
1786                debug!(target: "service", "Ignoring message from disconnected peer {}", peer.id);
1787                return Ok(());
1788            }
1789            // In case of a discrepancy between the service state and the state of the underlying
1790            // wire protocol, we may receive a message from a peer that we consider not fully connected
1791            // at the service level. To remedy this, we simply transition the peer to a connected state.
1792            //
1793            // This is not ideal, but until the wire protocol and service are unified, it's the simplest
1794            // solution to converge towards the same state.
1795            session::State::Attempted | session::State::Initial => {
1796                debug!(target: "service", "Received unexpected message from connecting peer {}", peer.id);
1797                debug!(target: "service", "Transitioning peer {} to 'connected' state", peer.id);
1798
1799                peer.to_connected(self.clock);
1800
1801                None
1802            }
1803            session::State::Connected {
1804                ping, latencies, ..
1805            } => Some((ping, latencies)),
1806        };
1807
1808        trace!(target: "service", "Received message {message:?} from {remote}");
1809
1810        match message {
1811            // Process a peer announcement.
1812            Message::Announcement(ann) => {
1813                let relayer = remote;
1814                let relayer_addr = peer.addr.clone();
1815
1816                if let Some(id) = self.handle_announcement(relayer, &relayer_addr, &ann)? {
1817                    if self.config.is_relay() {
1818                        if let AnnouncementMessage::Inventory(_) = ann.message {
1819                            if let Err(e) = self
1820                                .database_mut()
1821                                .gossip_mut()
1822                                .set_relay(id, gossip::RelayStatus::Relay)
1823                            {
1824                                warn!(target: "service", "Failed to set relay flag for message: {e}");
1825                                return Ok(());
1826                            }
1827                        } else {
1828                            self.relay(id, ann);
1829                        }
1830                    }
1831                }
1832            }
1833            Message::Subscribe(subscribe) => {
1834                // Filter announcements by interest.
1835                match self
1836                    .db
1837                    .gossip()
1838                    .filtered(&subscribe.filter, subscribe.since, subscribe.until)
1839                {
1840                    Ok(anns) => {
1841                        for ann in anns {
1842                            let ann = match ann {
1843                                Ok(a) => a,
1844                                Err(e) => {
1845                                    debug!(target: "service", "Failed to read gossip message from store: {e}");
1846                                    continue;
1847                                }
1848                            };
1849                            // Don't send announcements authored by the remote, back to the remote.
1850                            if ann.node == *remote {
1851                                continue;
1852                            }
1853                            // Only send messages if we're a relay, or it's our own messages.
1854                            if relay || ann.node == local {
1855                                self.outbox.write(peer, ann.into());
1856                            }
1857                        }
1858                    }
1859                    Err(e) => {
1860                        warn!(target: "service", "Failed to query gossip messages from store: {e}");
1861                    }
1862                }
1863                peer.subscribe = Some(subscribe);
1864            }
1865            Message::Info(info) => {
1866                self.handle_info(*remote, &info)?;
1867            }
1868            Message::Ping(Ping { ponglen, .. }) => {
1869                // Ignore pings which ask for too much data.
1870                if ponglen > Ping::MAX_PONG_ZEROES {
1871                    return Ok(());
1872                }
1873                self.outbox.write(
1874                    peer,
1875                    Message::Pong {
1876                        zeroes: ZeroBytes::new(ponglen),
1877                    },
1878                );
1879            }
1880            Message::Pong { zeroes } => {
1881                if let Some((ping, latencies)) = connected {
1882                    if let session::PingState::AwaitingResponse {
1883                        len: ponglen,
1884                        since,
1885                    } = *ping
1886                    {
1887                        if (ponglen as usize) == zeroes.len() {
1888                            *ping = session::PingState::Ok;
1889                            // Keep track of peer latency.
1890                            latencies.push_back(self.clock - since);
1891                            if latencies.len() > MAX_LATENCIES {
1892                                latencies.pop_front();
1893                            }
1894                        }
1895                    }
1896                }
1897            }
1898        }
1899        Ok(())
1900    }
1901
1902    /// A convenient method to check if we should fetch from a `RefsAnnouncement` with `scope`.
1903    fn refs_status_of(
1904        &self,
1905        rid: RepoId,
1906        refs: NonEmpty<RefsAt>,
1907        scope: &policy::Scope,
1908    ) -> Result<RefsStatus, Error> {
1909        let mut refs = RefsStatus::new(rid, refs, self.db.refs())?;
1910        // Check that there's something we want.
1911        if refs.want.is_empty() {
1912            return Ok(refs);
1913        }
1914        // Check scope.
1915        let mut refs = match scope {
1916            policy::Scope::All => refs,
1917            policy::Scope::Followed => match self.policies.namespaces_for(&self.storage, &rid) {
1918                Ok(Namespaces::All) => refs,
1919                Ok(Namespaces::Followed(followed)) => {
1920                    refs.want.retain(|r| followed.contains(&r.remote));
1921                    refs
1922                }
1923                Err(e) => return Err(e.into()),
1924            },
1925        };
1926        // Remove our own remote, we don't want to fetch that.
1927        refs.want.retain(|r| r.remote != self.node_id());
1928
1929        Ok(refs)
1930    }
1931
1932    /// Add a seed to our routing table.
1933    fn seed_discovered(&mut self, rid: RepoId, nid: NodeId, time: Timestamp) {
1934        if let Ok(result) = self.db.routing_mut().add_inventory([&rid], nid, time) {
1935            if let &[(_, InsertResult::SeedAdded)] = result.as_slice() {
1936                self.emitter.emit(Event::SeedDiscovered { rid, nid });
1937                debug!(target: "service", "Routing table updated for {rid} with seed {nid}");
1938            }
1939        }
1940    }
1941
1942    /// Set of initial messages to send to a peer.
1943    fn initial(&mut self, _link: Link) -> Vec<Message> {
1944        let now = self.clock();
1945        let filter = self.filter();
1946
1947        // TODO: Only subscribe to outbound connections, otherwise we will consume too
1948        // much bandwidth.
1949
1950        // If we've been previously connected to the network, we'll have received gossip messages.
1951        // Instead of simply taking the last timestamp we try to ensure we don't miss any
1952        // messages due un-synchronized clocks.
1953        //
1954        // If this is our first connection to the network, we just ask for a fixed backlog
1955        // of messages to get us started.
1956        let since = if let Some(last) = self.last_online_at {
1957            Timestamp::from(last - SUBSCRIBE_BACKLOG_DELTA)
1958        } else {
1959            (*now - INITIAL_SUBSCRIBE_BACKLOG_DELTA).into()
1960        };
1961        debug!(target: "service", "Subscribing to messages since timestamp {since}..");
1962
1963        vec![
1964            Message::node(self.node.clone(), &self.signer),
1965            Message::inventory(self.inventory.clone(), &self.signer),
1966            Message::subscribe(filter, since, Timestamp::MAX),
1967        ]
1968    }
1969
1970    /// Try to guess whether we're online or not.
1971    fn is_online(&self) -> bool {
1972        self.sessions
1973            .connected()
1974            .filter(|(_, s)| s.addr.is_routable() && s.last_active >= self.clock - IDLE_INTERVAL)
1975            .count()
1976            > 0
1977    }
1978
1979    /// Remove a local repository from our inventory.
1980    fn remove_inventory(&mut self, rid: &RepoId) -> Result<bool, Error> {
1981        let node = self.node_id();
1982        let now = self.timestamp();
1983
1984        let removed = self.db.routing_mut().remove_inventory(rid, &node)?;
1985        if removed {
1986            self.refresh_and_announce_inventory(now)?;
1987        }
1988        Ok(removed)
1989    }
1990
1991    /// Add a local repository to our inventory.
1992    fn add_inventory(&mut self, rid: RepoId) -> Result<bool, Error> {
1993        let node = self.node_id();
1994        let now = self.timestamp();
1995
1996        if !self.storage.contains(&rid)? {
1997            debug!(target: "service", "Attempt to add non-existing inventory {rid}: repository not found in storage");
1998            return Ok(false);
1999        }
2000        // Add to our local inventory.
2001        let updates = self.db.routing_mut().add_inventory([&rid], node, now)?;
2002        let updated = !updates.is_empty();
2003
2004        if updated {
2005            self.refresh_and_announce_inventory(now)?;
2006        }
2007        Ok(updated)
2008    }
2009
2010    /// Update cached inventory message, and announce new inventory to peers.
2011    fn refresh_and_announce_inventory(&mut self, time: Timestamp) -> Result<(), Error> {
2012        let inventory = self.inventory()?;
2013
2014        self.inventory = gossip::inventory(time, inventory);
2015        self.announce_inventory();
2016
2017        Ok(())
2018    }
2019
2020    /// Get our local inventory.
2021    ///
2022    /// A node's inventory is the advertised list of repositories offered by a node.
2023    ///
2024    /// A node's inventory consists of *public* repositories that are seeded and available locally
2025    /// in the node's storage. We use the routing table as the canonical state of all inventories,
2026    /// including the local node's.
2027    ///
2028    /// When a repository is unseeded, it is also removed from the inventory. Private repositories
2029    /// are *not* part of a node's inventory.
2030    fn inventory(&self) -> Result<HashSet<RepoId>, Error> {
2031        self.db
2032            .routing()
2033            .get_inventory(self.nid())
2034            .map_err(Error::from)
2035    }
2036
2037    /// Process a peer inventory announcement by updating our routing table.
2038    /// This function expects the peer's full inventory, and prunes entries that are not in the
2039    /// given inventory.
2040    fn sync_routing(
2041        &mut self,
2042        inventory: impl IntoIterator<Item = RepoId>,
2043        from: NodeId,
2044        timestamp: Timestamp,
2045    ) -> Result<SyncedRouting, Error> {
2046        let mut synced = SyncedRouting::default();
2047        let included = inventory.into_iter().collect::<BTreeSet<_>>();
2048        let mut events = Vec::new();
2049
2050        for (rid, result) in
2051            self.db
2052                .routing_mut()
2053                .add_inventory(included.iter(), from, timestamp)?
2054        {
2055            match result {
2056                InsertResult::SeedAdded => {
2057                    debug!(target: "service", "Routing table updated for {rid} with seed {from}");
2058                    events.push(Event::SeedDiscovered { rid, nid: from });
2059
2060                    if self
2061                        .policies
2062                        .is_seeding(&rid)
2063                        .expect("Service::process_inventory: error accessing seeding configuration")
2064                    {
2065                        // TODO: We should fetch here if we're already connected, case this seed has
2066                        // refs we don't have.
2067                    }
2068                    synced.added.push(rid);
2069                }
2070                InsertResult::TimeUpdated => {
2071                    synced.updated.push(rid);
2072                }
2073                InsertResult::NotUpdated => {}
2074            }
2075        }
2076
2077        synced.removed.extend(
2078            self.db
2079                .routing()
2080                .get_inventory(&from)?
2081                .into_iter()
2082                .filter(|rid| !included.contains(rid)),
2083        );
2084        self.db
2085            .routing_mut()
2086            .remove_inventories(&synced.removed, &from)?;
2087        events.extend(
2088            synced
2089                .removed
2090                .iter()
2091                .map(|&rid| Event::SeedDropped { rid, nid: from }),
2092        );
2093
2094        self.emitter.emit_all(events);
2095
2096        Ok(synced)
2097    }
2098
2099    /// Return a refs announcement including the given remotes.
2100    fn refs_announcement_for(
2101        &mut self,
2102        rid: RepoId,
2103        remotes: impl IntoIterator<Item = NodeId>,
2104    ) -> Result<(Announcement, Vec<RefsAt>), Error> {
2105        let repo = self.storage.repository(rid)?;
2106        let timestamp = self.timestamp();
2107        let mut refs = BoundedVec::<_, REF_REMOTE_LIMIT>::new();
2108
2109        for remote_id in remotes.into_iter() {
2110            let refs_at = RefsAt::new(&repo, remote_id).map_err(|err| {
2111                radicle::storage::Error::Refs(radicle::storage::refs::Error::Read(err))
2112            })?;
2113
2114            if refs.push(refs_at).is_err() {
2115                warn!(
2116                    target: "service",
2117                    "refs announcement limit ({REF_REMOTE_LIMIT}) exceeded, peers will see only some of your repository references",
2118                );
2119                break;
2120            }
2121        }
2122
2123        let msg = AnnouncementMessage::from(RefsAnnouncement {
2124            rid,
2125            refs: refs.clone(),
2126            timestamp,
2127        });
2128        Ok((msg.signed(&self.signer), refs.into()))
2129    }
2130
2131    /// Announce our own refs for the given repo.
2132    fn announce_own_refs(
2133        &mut self,
2134        rid: RepoId,
2135        doc: Doc,
2136        namespaces: impl IntoIterator<Item = NodeId>,
2137    ) -> Result<(Vec<RefsAt>, Timestamp), Error> {
2138        let (refs, timestamp) = self.announce_refs(rid, doc, namespaces, true)?;
2139
2140        // Update refs database with our signed refs branches.
2141        // This isn't strictly necessary for now, as we only use the database for fetches, and
2142        // we don't fetch our own refs that are announced, but it's for good measure.
2143        for r in refs.iter() {
2144            self.emitter.emit(Event::LocalRefsAnnounced {
2145                rid,
2146                refs: *r,
2147                timestamp,
2148            });
2149            if let Err(e) = self.database_mut().refs_mut().set(
2150                &rid,
2151                &r.remote,
2152                &SIGREFS_BRANCH,
2153                r.at,
2154                timestamp.to_local_time(),
2155            ) {
2156                warn!(
2157                    target: "service",
2158                    "Error updating refs database for `rad/sigrefs` of {} in {rid}: {e}",
2159                    r.remote
2160                );
2161            }
2162        }
2163        Ok((refs, timestamp))
2164    }
2165
2166    /// Announce local refs for given repo.
2167    fn announce_refs(
2168        &mut self,
2169        rid: RepoId,
2170        doc: Doc,
2171        remotes: impl IntoIterator<Item = NodeId>,
2172        own: bool,
2173    ) -> Result<(Vec<RefsAt>, Timestamp), Error> {
2174        let (ann, refs) = self.refs_announcement_for(rid, remotes)?;
2175        let timestamp = ann.timestamp();
2176        let peers = self.sessions.connected().map(|(_, p)| p);
2177
2178        // Update our sync status for our own refs. This is useful for determining if refs were
2179        // updated while the node was stopped.
2180        for r in refs.iter().filter(|r| own || r.remote == ann.node) {
2181            info!(
2182                target: "service",
2183                "Announcing refs {rid}/{r} to peers (t={timestamp})..",
2184            );
2185            // Update our local node's sync status to mark the refs as announced.
2186            if let Err(e) = self.db.seeds_mut().synced(&rid, &ann.node, r.at, timestamp) {
2187                warn!(target: "service", "Failed to update sync status for local node: {e}");
2188            } else {
2189                debug!(target: "service", "Saved local sync status for {rid}..");
2190            }
2191        }
2192
2193        self.outbox.announce(
2194            ann,
2195            peers.filter(|p| {
2196                // Only announce to peers who are allowed to view this repo.
2197                doc.is_visible_to(&p.id.into())
2198            }),
2199            self.db.gossip_mut(),
2200        );
2201        Ok((refs, timestamp))
2202    }
2203
2204    fn reconnect(&mut self, nid: NodeId, addr: Address) -> bool {
2205        if let Some(sess) = self.sessions.get_mut(&nid) {
2206            sess.to_initial();
2207            self.outbox.connect(nid, addr);
2208
2209            return true;
2210        }
2211        false
2212    }
2213
2214    fn connect(&mut self, nid: NodeId, addr: Address) -> Result<(), ConnectError> {
2215        debug!(target: "service", "Connecting to {nid} ({addr})..");
2216
2217        if nid == self.node_id() {
2218            return Err(ConnectError::SelfConnection);
2219        }
2220        if let Ok(true) = self.policies.is_blocked(&nid) {
2221            return Err(ConnectError::Blocked { nid });
2222        }
2223        if !self.is_supported_address(&addr) {
2224            return Err(ConnectError::UnsupportedAddress { nid, addr });
2225        }
2226        if self.sessions.contains_key(&nid) {
2227            return Err(ConnectError::SessionExists { nid });
2228        }
2229        if self.sessions.outbound().count() >= self.config.limits.connection.outbound.into() {
2230            return Err(ConnectError::LimitReached { nid, addr });
2231        }
2232        let persistent = self.config.is_persistent(&nid);
2233        let timestamp: Timestamp = self.clock.into();
2234
2235        if let Err(e) = self.db.addresses_mut().attempted(&nid, &addr, timestamp) {
2236            warn!(target: "service", "Failed to update address book with connection attempt: {e}");
2237        }
2238        self.sessions.insert(
2239            nid,
2240            Session::outbound(nid, addr.clone(), persistent, self.rng.clone()),
2241        );
2242        self.outbox.connect(nid, addr);
2243
2244        Ok(())
2245    }
2246
2247    fn seeds(&self, rid: &RepoId, namespaces: HashSet<PublicKey>) -> Result<Seeds, Error> {
2248        let mut seeds = Seeds::new(self.rng.clone());
2249
2250        // First, build a list of peers that have synced refs for `namespaces`, if any.
2251        // This step is skipped:
2252        //  1. For the repository (and thus all `namespaces`), if it not exist in storage.
2253        //  2. For each `namespace` in `namespaces`, which does not exist in storage.
2254        if let Ok(repo) = self.storage.repository(*rid) {
2255            for namespace in namespaces.iter() {
2256                let Ok(local) = RefsAt::new(&repo, *namespace) else {
2257                    continue;
2258                };
2259
2260                for seed in self.db.seeds().seeds_for(rid)? {
2261                    let seed = seed?;
2262                    let state = self.sessions.get(&seed.nid).map(|s| s.state.clone());
2263                    let synced = if local.at == seed.synced_at.oid {
2264                        SyncStatus::Synced { at: seed.synced_at }
2265                    } else {
2266                        let local = SyncedAt::new(local.at, &repo)?;
2267
2268                        SyncStatus::OutOfSync {
2269                            local,
2270                            remote: seed.synced_at,
2271                        }
2272                    };
2273                    seeds.insert(Seed::new(seed.nid, seed.addresses, state, Some(synced)));
2274                }
2275            }
2276        }
2277
2278        // Then, add peers we know about but have no information about the sync status.
2279        // These peers have announced that they seed the repository via an inventory
2280        // announcement, but we haven't received any ref announcements from them.
2281        for nid in self.db.routing().get(rid)? {
2282            if namespaces.contains(&nid) {
2283                continue;
2284            }
2285            if seeds.contains(&nid) {
2286                // We already have a richer entry for this node.
2287                continue;
2288            }
2289            let addrs = self.db.addresses().addresses_of(&nid)?;
2290            let state = self.sessions.get(&nid).map(|s| s.state.clone());
2291
2292            seeds.insert(Seed::new(nid, addrs, state, None));
2293        }
2294        Ok(seeds)
2295    }
2296
2297    /// Return a new filter object, based on our seeding policy.
2298    fn filter(&self) -> Filter {
2299        if self.config.seeding_policy.is_allow() {
2300            // TODO: Remove bits for blocked repos.
2301            Filter::default()
2302        } else {
2303            self.filter.clone()
2304        }
2305    }
2306
2307    /// Get a timestamp for using in announcements.
2308    /// Never returns the same timestamp twice.
2309    fn timestamp(&mut self) -> Timestamp {
2310        let now = Timestamp::from(self.clock);
2311        if *now > *self.last_timestamp {
2312            self.last_timestamp = now;
2313        } else {
2314            self.last_timestamp = self.last_timestamp + 1;
2315        }
2316        self.last_timestamp
2317    }
2318
2319    fn relay(&mut self, id: gossip::AnnouncementId, ann: Announcement) {
2320        let announcer = ann.node;
2321        let relayed_by = self.relayed_by.get(&id);
2322        let rid = if let AnnouncementMessage::Refs(RefsAnnouncement { rid, .. }) = ann.message {
2323            Some(rid)
2324        } else {
2325            None
2326        };
2327        // Choose peers we should relay this message to.
2328        // 1. Don't relay to a peer who sent us this message.
2329        // 2. Don't relay to the peer who signed this announcement.
2330        let relay_to = self
2331            .sessions
2332            .connected()
2333            .filter(|(id, _)| {
2334                relayed_by
2335                    .map(|relayers| !relayers.contains(id))
2336                    .unwrap_or(true) // If there are no relayers we let it through.
2337            })
2338            .filter(|(id, _)| **id != announcer)
2339            .filter(|(id, _)| {
2340                if let Some(rid) = rid {
2341                    // Only relay this message if the peer is allowed to know about the
2342                    // repository. If we don't have the repository, return `false` because
2343                    // we can't determine if it's private or public.
2344                    self.storage
2345                        .get(rid)
2346                        .ok()
2347                        .flatten()
2348                        .map(|doc| doc.is_visible_to(&(*id).into()))
2349                        .unwrap_or(false)
2350                } else {
2351                    // Announcement doesn't concern a specific repository, let it through.
2352                    true
2353                }
2354            })
2355            .map(|(_, p)| p);
2356
2357        self.outbox.relay(ann, relay_to);
2358    }
2359
2360    ////////////////////////////////////////////////////////////////////////////
2361    // Periodic tasks
2362    ////////////////////////////////////////////////////////////////////////////
2363
2364    fn relay_announcements(&mut self) -> Result<(), Error> {
2365        let now = self.clock.into();
2366        let rows = self.database_mut().gossip_mut().relays(now)?;
2367        let local = self.node_id();
2368
2369        for (id, msg) in rows {
2370            let announcer = msg.node;
2371            if announcer == local {
2372                // Don't relay our own stored gossip messages.
2373                continue;
2374            }
2375            self.relay(id, msg);
2376        }
2377        Ok(())
2378    }
2379
2380    /// Announce our inventory to all connected peers, unless it was already announced.
2381    fn announce_inventory(&mut self) {
2382        let timestamp = self.inventory.timestamp.to_local_time();
2383
2384        if self.last_inventory == timestamp {
2385            debug!(target: "service", "Skipping redundant inventory announcement (t={})", self.inventory.timestamp);
2386            return;
2387        }
2388        let msg = AnnouncementMessage::from(self.inventory.clone());
2389
2390        self.outbox.announce(
2391            msg.signed(&self.signer),
2392            self.sessions.connected().map(|(_, p)| p),
2393            self.db.gossip_mut(),
2394        );
2395        self.last_inventory = timestamp;
2396    }
2397
2398    fn prune_routing_entries(&mut self, now: &LocalTime) -> Result<(), routing::Error> {
2399        let count = self.db.routing().len()?;
2400        if count <= self.config.limits.routing_max_size.into() {
2401            return Ok(());
2402        }
2403
2404        let delta = count - usize::from(self.config.limits.routing_max_size);
2405        let nid = self.node_id();
2406        self.db.routing_mut().prune(
2407            (*now - LocalDuration::from(self.config.limits.routing_max_age)).into(),
2408            Some(delta),
2409            &nid,
2410        )?;
2411        Ok(())
2412    }
2413
2414    fn disconnect_unresponsive_peers(&mut self, now: &LocalTime) {
2415        let stale = self
2416            .sessions
2417            .connected()
2418            .filter(|(_, session)| *now - session.last_active >= STALE_CONNECTION_TIMEOUT);
2419
2420        for (_, session) in stale {
2421            debug!(target: "service", "Disconnecting unresponsive peer {}..", session.id);
2422
2423            // TODO: Should we switch the session state to "disconnected" even before receiving
2424            // an official "disconnect"? Otherwise we keep pinging until we get the disconnection.
2425
2426            self.outbox.disconnect(
2427                session.id,
2428                DisconnectReason::Session(session::Error::Timeout),
2429            );
2430        }
2431    }
2432
2433    /// Ensure connection health by pinging connected peers.
2434    fn keep_alive(&mut self, now: &LocalTime) {
2435        let inactive_sessions = self
2436            .sessions
2437            .connected_mut()
2438            .filter(|(_, session)| *now - session.last_active >= KEEP_ALIVE_DELTA)
2439            .map(|(_, session)| session);
2440        for session in inactive_sessions {
2441            session.ping(self.clock, &mut self.outbox).ok();
2442        }
2443    }
2444
2445    /// Get a list of peers available to connect to, sorted by lowest penalty.
2446    fn available_peers(&mut self) -> Vec<Peer> {
2447        match self.db.addresses().entries() {
2448            Ok(entries) => {
2449                // Nb. we don't want to connect to any peers that already have a session with us,
2450                // even if it's in a disconnected state. Those sessions are re-attempted automatically.
2451                let mut peers = entries
2452                    .filter(|entry| entry.version == PROTOCOL_VERSION)
2453                    .filter(|entry| !entry.address.banned)
2454                    .filter(|entry| !entry.penalty.is_connect_threshold_reached())
2455                    .filter(|entry| !self.sessions.contains_key(&entry.node))
2456                    .filter(|entry| !self.policies.is_blocked(&entry.node).unwrap_or(false))
2457                    .filter(|entry| !self.config.external_addresses.contains(&entry.address.addr))
2458                    .filter(|entry| &entry.node != self.nid())
2459                    .filter(|entry| self.is_supported_address(&entry.address.addr))
2460                    .fold(HashMap::new(), |mut acc, entry| {
2461                        acc.entry(entry.node)
2462                            .and_modify(|e: &mut Peer| e.addresses.push(entry.address.clone()))
2463                            .or_insert_with(|| Peer {
2464                                nid: entry.node,
2465                                addresses: vec![entry.address],
2466                                penalty: entry.penalty,
2467                            });
2468                        acc
2469                    })
2470                    .into_values()
2471                    .collect::<Vec<_>>();
2472                peers.sort_by_key(|p| p.penalty);
2473                peers
2474            }
2475            Err(e) => {
2476                warn!(target: "service", "Unable to lookup available peers in address book: {e}");
2477                Vec::new()
2478            }
2479        }
2480    }
2481
2482    /// Fetch all repositories that are seeded but missing from storage.
2483    fn fetch_missing_repositories(&mut self) -> Result<(), Error> {
2484        let policies = self.policies.seed_policies()?.collect::<Vec<_>>();
2485        for policy in policies {
2486            let policy = match policy {
2487                Ok(policy) => policy,
2488                Err(err) => {
2489                    debug!(target: "protocol::filter", "Failed to read seed policy: {err}");
2490                    continue;
2491                }
2492            };
2493
2494            let rid = policy.rid;
2495
2496            if !policy.is_allow() {
2497                continue;
2498            }
2499            match self.storage.contains(&rid) {
2500                Ok(exists) => {
2501                    if exists {
2502                        continue;
2503                    }
2504                }
2505                Err(err) => {
2506                    log::debug!(target: "protocol::filter", "Failed to check if {rid} exists: {err}");
2507                    continue;
2508                }
2509            }
2510            match self.seeds(&rid, [self.node_id()].into()) {
2511                Ok(seeds) => {
2512                    if let Some(connected) = NonEmpty::from_vec(seeds.connected().collect()) {
2513                        for seed in connected {
2514                            self.fetch(rid, seed.nid, vec![], self.fetch_config(), None);
2515                        }
2516                    } else {
2517                        // TODO: We should make sure that this fetch is retried later, either
2518                        // when we connect to a seed, or when we discover a new seed.
2519                        // Since new connections and routing table updates are both conditions for
2520                        // fetching, we should trigger fetches when those conditions appear.
2521                        // Another way to handle this would be to update our database, saying
2522                        // that we're trying to fetch a certain repo. We would then just
2523                        // iterate over those entries in the above circumstances. This is
2524                        // merely an optimization though, we can also iterate over all seeded
2525                        // repos and check which ones are not in our inventory.
2526                        debug!(target: "service", "No connected seeds found for {rid}..");
2527                    }
2528                }
2529                Err(e) => {
2530                    debug!(target: "service", "Couldn't fetch missing repo {rid}: failed to lookup seeds: {e}");
2531                }
2532            }
2533        }
2534        Ok(())
2535    }
2536
2537    /// Run idle task for all connections.
2538    fn idle_connections(&mut self) {
2539        for (_, sess) in self.sessions.iter_mut() {
2540            sess.idle(self.clock);
2541
2542            if sess.is_stable() {
2543                // Mark as connected once connection is stable.
2544                if let Err(e) =
2545                    self.db
2546                        .addresses_mut()
2547                        .connected(&sess.id, &sess.addr, self.clock.into())
2548                {
2549                    warn!(target: "service", "Failed to update address book with connection: {e}");
2550                }
2551            }
2552        }
2553    }
2554
2555    /// Try to maintain a target number of connections.
2556    fn maintain_connections(&mut self) {
2557        let PeerConfig::Dynamic = self.config.peers else {
2558            return;
2559        };
2560        trace!(target: "service", "Maintaining connections..");
2561
2562        let target = TARGET_OUTBOUND_PEERS;
2563        let now = self.clock;
2564        let outbound = self
2565            .sessions
2566            .values()
2567            .filter(|s| s.link.is_outbound())
2568            .filter(|s| s.is_connected() || s.is_connecting())
2569            .count();
2570        let wanted = target.saturating_sub(outbound);
2571
2572        // Don't connect to more peers than needed.
2573        if wanted == 0 {
2574            return;
2575        }
2576
2577        // Peers available to connect to.
2578        let available = self
2579            .available_peers()
2580            .into_iter()
2581            .filter_map(|peer| {
2582                peer.addresses
2583                    .into_iter()
2584                    .find(|ka| match (ka.last_success, ka.last_attempt) {
2585                        // If we succeeded the last time we tried, this is a good address.
2586                        // If it's been long enough that we failed to connect, we also try again.
2587                        (Some(success), Some(attempt)) => {
2588                            success >= attempt || now - attempt >= CONNECTION_RETRY_DELTA
2589                        }
2590                        // If we haven't succeeded yet, and we waited long enough, we can try this address.
2591                        (None, Some(attempt)) => now - attempt >= CONNECTION_RETRY_DELTA,
2592                        // If we have no failed attempts for this address, it's worth a try.
2593                        (_, None) => true,
2594                    })
2595                    .map(|ka| (peer.nid, ka))
2596            })
2597            .filter(|(_, ka)| self.is_supported_address(&ka.addr));
2598
2599        // Peers we are going to attempt connections to.
2600        let connect = available.take(wanted).collect::<Vec<_>>();
2601        if connect.len() < wanted {
2602            log::debug!(
2603                target: "service",
2604                "Not enough available peers to connect to (available={}, wanted={wanted})",
2605                connect.len()
2606            );
2607        }
2608        for (id, ka) in connect {
2609            if let Err(e) = self.connect(id, ka.addr.clone()) {
2610                warn!(target: "service", "Service::maintain_connections connection error: {e}");
2611            }
2612        }
2613    }
2614
2615    /// Maintain persistent peer connections.
2616    fn maintain_persistent(&mut self) {
2617        trace!(target: "service", "Maintaining persistent peers..");
2618
2619        let now = self.local_time();
2620        let mut reconnect = Vec::new();
2621
2622        for (nid, session) in self.sessions.iter_mut() {
2623            if self.config.is_persistent(nid) {
2624                if self.policies.is_blocked(nid).unwrap_or(false) {
2625                    continue;
2626                }
2627                if let session::State::Disconnected { retry_at, .. } = &mut session.state {
2628                    // TODO: Try to reconnect only if the peer was attempted. A disconnect without
2629                    // even a successful attempt means that we're unlikely to be able to reconnect.
2630
2631                    if now >= *retry_at {
2632                        reconnect.push((*nid, session.addr.clone(), session.attempts()));
2633                    }
2634                }
2635            }
2636        }
2637
2638        for (nid, addr, attempts) in reconnect {
2639            if self.reconnect(nid, addr) {
2640                debug!(target: "service", "Reconnecting to {nid} (attempts={attempts})...");
2641            }
2642        }
2643    }
2644
2645    /// Checks if the given [`Address`] is supported for connecting to.
2646    ///
2647    /// # IPv4/IPv6/DNS
2648    ///
2649    /// Always returns `true`.
2650    ///
2651    /// # Tor
2652    ///
2653    /// If the [`Address`] is an `.onion` address and the service supports onion
2654    /// routing then this will return `true`.
2655    fn is_supported_address(&self, address: &Address) -> bool {
2656        match AddressType::from(address) {
2657            // Only consider onion addresses if configured.
2658            AddressType::Onion => self.config.onion.is_some(),
2659            AddressType::Dns | AddressType::Ipv4 | AddressType::Ipv6 => true,
2660        }
2661    }
2662
2663    fn fetch_config(&self) -> fetcher::FetchConfig {
2664        fetcher::FetchConfig::default()
2665            .with_minimum_feature_level(self.config.fetch.feature_level_min())
2666    }
2667}
2668
2669/// Gives read access to the service state.
2670pub trait ServiceState {
2671    /// Get the Node ID.
2672    fn nid(&self) -> &NodeId;
2673    /// Get the existing sessions.
2674    fn sessions(&self) -> &Sessions;
2675    /// Get fetch state.
2676    fn fetching(&self) -> &FetcherState;
2677    /// Get outbox.
2678    fn outbox(&self) -> &Outbox;
2679    /// Get rate limiter.
2680    fn limiter(&self) -> &RateLimiter;
2681    /// Get event emitter.
2682    fn emitter(&self) -> &Emitter<Event>;
2683    /// Get a repository from storage.
2684    fn get(&self, rid: RepoId) -> Result<Option<Doc>, RepositoryError>;
2685    /// Get the clock.
2686    fn clock(&self) -> &LocalTime;
2687    /// Get the clock mutably.
2688    fn clock_mut(&mut self) -> &mut LocalTime;
2689    /// Get service configuration.
2690    fn config(&self) -> &Config;
2691    /// Get service metrics.
2692    fn metrics(&self) -> &Metrics;
2693}
2694
2695impl<D, S, G> ServiceState for Service<D, S, G>
2696where
2697    D: routing::Store,
2698    G: crypto::signature::Signer<crypto::Signature>,
2699    S: ReadStorage,
2700{
2701    fn nid(&self) -> &NodeId {
2702        self.signer.public_key()
2703    }
2704
2705    fn sessions(&self) -> &Sessions {
2706        &self.sessions
2707    }
2708
2709    fn fetching(&self) -> &FetcherState {
2710        self.fetcher.state()
2711    }
2712
2713    fn outbox(&self) -> &Outbox {
2714        &self.outbox
2715    }
2716
2717    fn limiter(&self) -> &RateLimiter {
2718        &self.limiter
2719    }
2720
2721    fn emitter(&self) -> &Emitter<Event> {
2722        &self.emitter
2723    }
2724
2725    fn get(&self, rid: RepoId) -> Result<Option<Doc>, RepositoryError> {
2726        self.storage.get(rid)
2727    }
2728
2729    fn clock(&self) -> &LocalTime {
2730        &self.clock
2731    }
2732
2733    fn clock_mut(&mut self) -> &mut LocalTime {
2734        &mut self.clock
2735    }
2736
2737    fn config(&self) -> &Config {
2738        &self.config
2739    }
2740
2741    fn metrics(&self) -> &Metrics {
2742        &self.metrics
2743    }
2744}
2745
2746/// Disconnect reason.
2747#[derive(Debug)]
2748pub enum DisconnectReason {
2749    /// Error while dialing the remote. This error occurs before a connection is
2750    /// even established. Errors of this kind are usually not transient.
2751    Dial(Arc<dyn std::error::Error + Sync + Send>),
2752    /// Error with an underlying established connection. Sometimes, reconnecting
2753    /// after such an error is possible.
2754    Connection(Arc<dyn std::error::Error + Sync + Send>),
2755    /// Error with a fetch.
2756    Fetch(FetchError),
2757    /// Session error.
2758    Session(session::Error),
2759    /// Session conflicts with existing session.
2760    Conflict,
2761    /// Connection to self.
2762    SelfConnection,
2763    /// Peer is blocked by policy
2764    Policy,
2765    /// User requested disconnect
2766    Command,
2767}
2768
2769impl DisconnectReason {
2770    pub fn is_dial_err(&self) -> bool {
2771        matches!(self, Self::Dial(_))
2772    }
2773
2774    pub fn is_connection_err(&self) -> bool {
2775        matches!(self, Self::Connection(_))
2776    }
2777
2778    pub fn connection() -> Self {
2779        DisconnectReason::Connection(Arc::new(std::io::Error::from(
2780            std::io::ErrorKind::ConnectionReset,
2781        )))
2782    }
2783}
2784
2785impl fmt::Display for DisconnectReason {
2786    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2787        match self {
2788            Self::Dial(err) => write!(f, "{err}"),
2789            Self::Connection(err) => write!(f, "{err}"),
2790            Self::Command => write!(f, "command"),
2791            Self::SelfConnection => write!(f, "self-connection"),
2792            Self::Conflict => write!(f, "conflict"),
2793            Self::Policy => write!(f, "policy"),
2794            Self::Session(err) => write!(f, "{err}"),
2795            Self::Fetch(err) => write!(f, "fetch: {err}"),
2796        }
2797    }
2798}
2799
2800/// Result of a project lookup.
2801#[derive(Debug)]
2802pub struct Lookup {
2803    /// Whether the project was found locally or not.
2804    pub local: Option<Doc>,
2805    /// A list of remote peers on which the project is known to exist.
2806    pub remote: Vec<NodeId>,
2807}
2808
2809#[derive(thiserror::Error, Debug)]
2810pub enum LookupError {
2811    #[error(transparent)]
2812    Routing(#[from] routing::Error),
2813    #[error(transparent)]
2814    Repository(#[from] RepositoryError),
2815}
2816
2817#[derive(Debug, Clone)]
2818/// Holds currently (or recently) connected peers.
2819pub struct Sessions(AddressBook<NodeId, Session>);
2820
2821impl Sessions {
2822    pub fn new(rng: Rng) -> Self {
2823        Self(AddressBook::new(rng))
2824    }
2825
2826    /// Iterator over fully connected peers.
2827    pub fn connected(&self) -> impl Iterator<Item = (&NodeId, &Session)> + Clone {
2828        self.0
2829            .iter()
2830            .filter_map(move |(id, sess)| match &sess.state {
2831                session::State::Connected { .. } => Some((id, sess)),
2832                _ => None,
2833            })
2834    }
2835
2836    /// Iterator over connected inbound peers.
2837    pub fn inbound(&self) -> impl Iterator<Item = (&NodeId, &Session)> + Clone {
2838        self.connected().filter(|(_, s)| s.link.is_inbound())
2839    }
2840
2841    /// Iterator over outbound peers.
2842    pub fn outbound(&self) -> impl Iterator<Item = (&NodeId, &Session)> + Clone {
2843        self.connected().filter(|(_, s)| s.link.is_outbound())
2844    }
2845
2846    /// Iterator over mutable fully connected peers.
2847    pub fn connected_mut(&mut self) -> impl Iterator<Item = (&NodeId, &mut Session)> {
2848        self.0.iter_mut().filter(move |(_, s)| s.is_connected())
2849    }
2850
2851    /// Iterator over disconnected peers.
2852    pub fn disconnected_mut(&mut self) -> impl Iterator<Item = (&NodeId, &mut Session)> {
2853        self.0.iter_mut().filter(move |(_, s)| s.is_disconnected())
2854    }
2855
2856    /// Return whether this node has a fully established session.
2857    pub fn is_connected(&self, id: &NodeId) -> bool {
2858        self.0.get(id).map(|s| s.is_connected()).unwrap_or(false)
2859    }
2860
2861    /// Return whether this node can be connected to.
2862    pub fn is_disconnected(&self, id: &NodeId) -> bool {
2863        self.0.get(id).map(|s| s.is_disconnected()).unwrap_or(true)
2864    }
2865}
2866
2867impl Deref for Sessions {
2868    type Target = AddressBook<NodeId, Session>;
2869
2870    fn deref(&self) -> &Self::Target {
2871        &self.0
2872    }
2873}
2874
2875impl DerefMut for Sessions {
2876    fn deref_mut(&mut self) -> &mut Self::Target {
2877        &mut self.0
2878    }
2879}