radicle_node/
service.rs

1#![allow(clippy::too_many_arguments)]
2#![allow(clippy::collapsible_match)]
3#![allow(clippy::collapsible_if)]
4#![warn(clippy::unwrap_used)]
5pub mod filter;
6pub mod gossip;
7pub mod io;
8pub mod limiter;
9pub mod message;
10pub mod session;
11
12use std::collections::hash_map::Entry;
13use std::collections::{BTreeSet, HashMap, HashSet};
14use std::net::IpAddr;
15use std::ops::{Deref, DerefMut};
16use std::sync::Arc;
17use std::{fmt, net, time};
18
19use crossbeam_channel as chan;
20use fastrand::Rng;
21use localtime::{LocalDuration, LocalTime};
22use log::*;
23use nonempty::NonEmpty;
24
25use radicle::node;
26use radicle::node::address;
27use radicle::node::address::Store as _;
28use radicle::node::address::{AddressBook, AddressType, KnownAddress};
29use radicle::node::config::PeerConfig;
30use radicle::node::refs::Store as _;
31use radicle::node::routing::Store as _;
32use radicle::node::seed;
33use radicle::node::seed::Store as _;
34use radicle::node::{ConnectOptions, Penalty, Severity};
35use radicle::storage::refs::SIGREFS_BRANCH;
36use radicle::storage::RepositoryError;
37use radicle_fetch::policy::SeedingPolicy;
38
39use crate::crypto::{Signer, Verified};
40use crate::identity::{Doc, RepoId};
41use crate::node::routing;
42use crate::node::routing::InsertResult;
43use crate::node::{
44    Address, Alias, Features, FetchResult, HostName, Seed, Seeds, SyncStatus, SyncedAt,
45};
46use crate::prelude::*;
47use crate::runtime::Emitter;
48use crate::service::gossip::Store as _;
49use crate::service::message::{
50    Announcement, AnnouncementMessage, Info, NodeAnnouncement, Ping, RefsAnnouncement, RefsStatus,
51};
52use crate::service::policy::{store::Write, Scope};
53use crate::storage;
54use crate::storage::{refs::RefsAt, Namespaces, ReadStorage};
55use crate::worker::fetch;
56use crate::worker::FetchError;
57use crate::Link;
58use crate::{crypto, PROTOCOL_VERSION};
59
60pub use crate::node::events::{Event, Events};
61pub use crate::node::{config::Network, Config, NodeId};
62pub use crate::service::message::{Message, ZeroBytes};
63pub use crate::service::session::{QueuedFetch, Session};
64
65pub use radicle::node::policy::config as policy;
66
67use self::io::Outbox;
68use self::limiter::RateLimiter;
69use self::message::InventoryAnnouncement;
70use self::policy::NamespacesError;
71
72/// How often to run the "idle" task.
73pub const IDLE_INTERVAL: LocalDuration = LocalDuration::from_secs(30);
74/// How often to run the "gossip" task.
75pub const GOSSIP_INTERVAL: LocalDuration = LocalDuration::from_secs(6);
76/// How often to run the "announce" task.
77pub const ANNOUNCE_INTERVAL: LocalDuration = LocalDuration::from_mins(60);
78/// How often to run the "sync" task.
79pub const SYNC_INTERVAL: LocalDuration = LocalDuration::from_secs(60);
80/// How often to run the "prune" task.
81pub const PRUNE_INTERVAL: LocalDuration = LocalDuration::from_mins(30);
82/// Duration to wait on an unresponsive peer before dropping its connection.
83pub const STALE_CONNECTION_TIMEOUT: LocalDuration = LocalDuration::from_mins(2);
84/// How much time should pass after a peer was last active for a *ping* to be sent.
85pub const KEEP_ALIVE_DELTA: LocalDuration = LocalDuration::from_mins(1);
86/// Maximum number of latency values to keep for a session.
87pub const MAX_LATENCIES: usize = 16;
88/// Maximum time difference between the local time, and an announcement timestamp.
89pub const MAX_TIME_DELTA: LocalDuration = LocalDuration::from_mins(60);
90/// Maximum attempts to connect to a peer before we give up.
91pub const MAX_CONNECTION_ATTEMPTS: usize = 3;
92/// How far back from the present time should we request gossip messages when connecting to a peer,
93/// when we come online for the first time.
94pub const INITIAL_SUBSCRIBE_BACKLOG_DELTA: LocalDuration = LocalDuration::from_mins(60 * 24);
95/// When subscribing, what margin of error do we give ourselves. A igher delta means we ask for
96/// messages further back than strictly necessary, to account for missed messages.
97pub const SUBSCRIBE_BACKLOG_DELTA: LocalDuration = LocalDuration::from_mins(3);
98/// Minimum amount of time to wait before reconnecting to a peer.
99pub const MIN_RECONNECTION_DELTA: LocalDuration = LocalDuration::from_secs(3);
100/// Maximum amount of time to wait before reconnecting to a peer.
101pub const MAX_RECONNECTION_DELTA: LocalDuration = LocalDuration::from_mins(60);
102/// Connection retry delta used for ephemeral peers that failed to connect previously.
103pub const CONNECTION_RETRY_DELTA: LocalDuration = LocalDuration::from_mins(10);
104/// How long to wait for a fetch to stall before aborting, default is 3s.
105pub const FETCH_TIMEOUT: time::Duration = time::Duration::from_secs(3);
106/// Target number of peers to maintain connections to.
107pub const TARGET_OUTBOUND_PEERS: usize = 8;
108
109/// Maximum external address limit imposed by message size limits.
110pub use message::ADDRESS_LIMIT;
111/// Maximum inventory limit imposed by message size limits.
112pub use message::INVENTORY_LIMIT;
113/// Maximum number of project git references imposed by message size limits.
114pub use message::REF_REMOTE_LIMIT;
115
116/// Metrics we track.
117#[derive(Clone, Debug, Default, serde::Serialize)]
118#[serde(rename_all = "camelCase")]
119pub struct Metrics {
120    /// Metrics for each peer.
121    pub peers: HashMap<NodeId, PeerMetrics>,
122    /// Tasks queued in worker queue.
123    pub worker_queue_size: usize,
124    /// Current open channel count.
125    pub open_channels: usize,
126}
127
128impl Metrics {
129    /// Get metrics for the given peer.
130    pub fn peer(&mut self, nid: NodeId) -> &mut PeerMetrics {
131        self.peers.entry(nid).or_default()
132    }
133}
134
135/// Per-peer metrics we track.
136#[derive(Clone, Debug, Default, serde::Serialize)]
137#[serde(rename_all = "camelCase")]
138pub struct PeerMetrics {
139    pub received_git_bytes: usize,
140    pub received_fetch_requests: usize,
141    pub received_bytes: usize,
142    pub received_gossip_messages: usize,
143    pub sent_bytes: usize,
144    pub sent_fetch_requests: usize,
145    pub sent_git_bytes: usize,
146    pub sent_gossip_messages: usize,
147    pub streams_opened: usize,
148    pub inbound_connection_attempts: usize,
149    pub outbound_connection_attempts: usize,
150    pub disconnects: usize,
151}
152
153/// Result of syncing our routing table with a node's inventory.
154#[derive(Default)]
155struct SyncedRouting {
156    /// Repo entries added.
157    added: Vec<RepoId>,
158    /// Repo entries removed.
159    removed: Vec<RepoId>,
160    /// Repo entries updated (time).
161    updated: Vec<RepoId>,
162}
163
164impl SyncedRouting {
165    fn is_empty(&self) -> bool {
166        self.added.is_empty() && self.removed.is_empty() && self.updated.is_empty()
167    }
168}
169
170/// A peer we can connect to.
171#[derive(Debug, Clone)]
172struct Peer {
173    nid: NodeId,
174    addresses: Vec<KnownAddress>,
175    penalty: Penalty,
176}
177
178/// General service error.
179#[derive(thiserror::Error, Debug)]
180pub enum Error {
181    #[error(transparent)]
182    Git(#[from] radicle::git::raw::Error),
183    #[error(transparent)]
184    GitExt(#[from] radicle::git::ext::Error),
185    #[error(transparent)]
186    Storage(#[from] storage::Error),
187    #[error(transparent)]
188    Gossip(#[from] gossip::Error),
189    #[error(transparent)]
190    Refs(#[from] storage::refs::Error),
191    #[error(transparent)]
192    Routing(#[from] routing::Error),
193    #[error(transparent)]
194    Address(#[from] address::Error),
195    #[error(transparent)]
196    Database(#[from] node::db::Error),
197    #[error(transparent)]
198    Seeds(#[from] seed::Error),
199    #[error(transparent)]
200    Policy(#[from] policy::Error),
201    #[error(transparent)]
202    Repository(#[from] radicle::storage::RepositoryError),
203    #[error("namespaces error: {0}")]
204    Namespaces(#[from] NamespacesError),
205}
206
207/// A store for all node data.
208pub trait Store:
209    address::Store + gossip::Store + routing::Store + seed::Store + node::refs::Store
210{
211}
212
213impl Store for node::Database {}
214
215/// Function used to query internal service state.
216pub type QueryState = dyn Fn(&dyn ServiceState) -> Result<(), CommandError> + Send + Sync;
217
218/// Commands sent to the service by the operator.
219pub enum Command {
220    /// Announce repository references for given repository to peers.
221    AnnounceRefs(RepoId, chan::Sender<RefsAt>),
222    /// Announce local repositories to peers.
223    AnnounceInventory,
224    /// Add repository to local inventory.
225    AddInventory(RepoId, chan::Sender<bool>),
226    /// Connect to node with the given address.
227    Connect(NodeId, Address, ConnectOptions),
228    /// Disconnect from node.
229    Disconnect(NodeId),
230    /// Get the node configuration.
231    Config(chan::Sender<Config>),
232    /// Get the node's listen addresses.
233    ListenAddrs(chan::Sender<Vec<std::net::SocketAddr>>),
234    /// Lookup seeds for the given repository in the routing table.
235    Seeds(RepoId, chan::Sender<Seeds>),
236    /// Fetch the given repository from the network.
237    Fetch(RepoId, NodeId, time::Duration, chan::Sender<FetchResult>),
238    /// Seed the given repository.
239    Seed(RepoId, Scope, chan::Sender<bool>),
240    /// Unseed the given repository.
241    Unseed(RepoId, chan::Sender<bool>),
242    /// Follow the given node.
243    Follow(NodeId, Option<Alias>, chan::Sender<bool>),
244    /// Unfollow the given node.
245    Unfollow(NodeId, chan::Sender<bool>),
246    /// Query the internal service state.
247    QueryState(Arc<QueryState>, chan::Sender<Result<(), CommandError>>),
248}
249
250impl fmt::Debug for Command {
251    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
252        match self {
253            Self::AnnounceRefs(id, _) => write!(f, "AnnounceRefs({id})"),
254            Self::AnnounceInventory => write!(f, "AnnounceInventory"),
255            Self::AddInventory(rid, _) => write!(f, "AddInventory({rid})"),
256            Self::Connect(id, addr, opts) => write!(f, "Connect({id}, {addr}, {opts:?})"),
257            Self::Disconnect(id) => write!(f, "Disconnect({id})"),
258            Self::Config(_) => write!(f, "Config"),
259            Self::ListenAddrs(_) => write!(f, "ListenAddrs"),
260            Self::Seeds(id, _) => write!(f, "Seeds({id})"),
261            Self::Fetch(id, node, _, _) => write!(f, "Fetch({id}, {node})"),
262            Self::Seed(id, scope, _) => write!(f, "Seed({id}, {scope})"),
263            Self::Unseed(id, _) => write!(f, "Unseed({id})"),
264            Self::Follow(id, _, _) => write!(f, "Follow({id})"),
265            Self::Unfollow(id, _) => write!(f, "Unfollow({id})"),
266            Self::QueryState { .. } => write!(f, "QueryState(..)"),
267        }
268    }
269}
270
271/// Command-related errors.
272#[derive(thiserror::Error, Debug)]
273pub enum CommandError {
274    #[error(transparent)]
275    Storage(#[from] storage::Error),
276    #[error(transparent)]
277    Routing(#[from] routing::Error),
278    #[error(transparent)]
279    Policy(#[from] policy::Error),
280}
281
282/// Error returned by [`Service::try_fetch`].
283#[derive(thiserror::Error, Debug)]
284enum TryFetchError<'a> {
285    #[error("ongoing fetch for repository exists")]
286    AlreadyFetching(&'a mut FetchState),
287    #[error("peer is not connected; cannot initiate fetch")]
288    SessionNotConnected,
289    #[error("peer fetch capacity reached; cannot initiate fetch")]
290    SessionCapacityReached,
291    #[error(transparent)]
292    Namespaces(#[from] NamespacesError),
293}
294
295/// Fetch state for an ongoing fetch.
296#[derive(Debug)]
297pub struct FetchState {
298    /// Node we're fetching from.
299    pub from: NodeId,
300    /// What refs we're fetching.
301    pub refs_at: Vec<RefsAt>,
302    /// Channels waiting for fetch results.
303    pub subscribers: Vec<chan::Sender<FetchResult>>,
304}
305
306impl FetchState {
307    /// Add a subscriber to this fetch.
308    fn subscribe(&mut self, c: chan::Sender<FetchResult>) {
309        if !self.subscribers.iter().any(|s| s.same_channel(&c)) {
310            self.subscribers.push(c);
311        }
312    }
313}
314
315/// Holds all node stores.
316#[derive(Debug)]
317pub struct Stores<D>(D);
318
319impl<D> Stores<D>
320where
321    D: Store,
322{
323    /// Get the database as a routing store.
324    pub fn routing(&self) -> &impl routing::Store {
325        &self.0
326    }
327
328    /// Get the database as a routing store, mutably.
329    pub fn routing_mut(&mut self) -> &mut impl routing::Store {
330        &mut self.0
331    }
332
333    /// Get the database as an address store.
334    pub fn addresses(&self) -> &impl address::Store {
335        &self.0
336    }
337
338    /// Get the database as an address store, mutably.
339    pub fn addresses_mut(&mut self) -> &mut impl address::Store {
340        &mut self.0
341    }
342
343    /// Get the database as a gossip store.
344    pub fn gossip(&self) -> &impl gossip::Store {
345        &self.0
346    }
347
348    /// Get the database as a gossip store, mutably.
349    pub fn gossip_mut(&mut self) -> &mut impl gossip::Store {
350        &mut self.0
351    }
352
353    /// Get the database as a seed store.
354    pub fn seeds(&self) -> &impl seed::Store {
355        &self.0
356    }
357
358    /// Get the database as a seed store, mutably.
359    pub fn seeds_mut(&mut self) -> &mut impl seed::Store {
360        &mut self.0
361    }
362
363    /// Get the database as a refs db.
364    pub fn refs(&self) -> &impl node::refs::Store {
365        &self.0
366    }
367
368    /// Get the database as a refs db, mutably.
369    pub fn refs_mut(&mut self) -> &mut impl node::refs::Store {
370        &mut self.0
371    }
372}
373
374impl<D> AsMut<D> for Stores<D> {
375    fn as_mut(&mut self) -> &mut D {
376        &mut self.0
377    }
378}
379
380impl<D> From<D> for Stores<D> {
381    fn from(db: D) -> Self {
382        Self(db)
383    }
384}
385
386/// The node service.
387#[derive(Debug)]
388pub struct Service<D, S, G> {
389    /// Service configuration.
390    config: Config,
391    /// Our cryptographic signer and key.
392    signer: G,
393    /// Project storage.
394    storage: S,
395    /// Node database.
396    db: Stores<D>,
397    /// Policy configuration.
398    policies: policy::Config<Write>,
399    /// Peer sessions, currently or recently connected.
400    sessions: Sessions,
401    /// Clock. Tells the time.
402    clock: LocalTime,
403    /// Who relayed what announcement to us. We keep track of this to ensure that
404    /// we don't relay messages to nodes that already know about these messages.
405    relayed_by: HashMap<gossip::AnnouncementId, Vec<NodeId>>,
406    /// I/O outbox.
407    outbox: Outbox,
408    /// Cached local node announcement.
409    node: NodeAnnouncement,
410    /// Cached local inventory announcement.
411    inventory: InventoryAnnouncement,
412    /// Source of entropy.
413    rng: Rng,
414    /// Ongoing fetches.
415    fetching: HashMap<RepoId, FetchState>,
416    /// Request/connection rate limiter.
417    limiter: RateLimiter,
418    /// Current seeded repositories bloom filter.
419    filter: Filter,
420    /// Last time the service was idle.
421    last_idle: LocalTime,
422    /// Last time the gossip messages were relayed.
423    last_gossip: LocalTime,
424    /// Last time the service synced.
425    last_sync: LocalTime,
426    /// Last time the service routing table was pruned.
427    last_prune: LocalTime,
428    /// Last time the announcement task was run.
429    last_announce: LocalTime,
430    /// Timestamp of last local inventory announced.
431    last_inventory: LocalTime,
432    /// Last timestamp used for announcements.
433    last_timestamp: Timestamp,
434    /// Time when the service was initialized, or `None` if it wasn't initialized.
435    started_at: Option<LocalTime>,
436    /// Time when the service was last online, or `None` if this is the first time.
437    last_online_at: Option<LocalTime>,
438    /// Publishes events to subscribers.
439    emitter: Emitter<Event>,
440    /// Local listening addresses.
441    listening: Vec<net::SocketAddr>,
442    /// Latest metrics for all nodes connected to since the last start.
443    metrics: Metrics,
444}
445
446impl<D, S, G> Service<D, S, G>
447where
448    G: crypto::Signer,
449{
450    /// Get the local node id.
451    pub fn node_id(&self) -> NodeId {
452        *self.signer.public_key()
453    }
454
455    /// Get the local service time.
456    pub fn local_time(&self) -> LocalTime {
457        self.clock
458    }
459
460    pub fn emitter(&self) -> Emitter<Event> {
461        self.emitter.clone()
462    }
463}
464
465impl<D, S, G> Service<D, S, G>
466where
467    D: Store,
468    S: ReadStorage + 'static,
469    G: Signer,
470{
471    pub fn new(
472        config: Config,
473        db: Stores<D>,
474        storage: S,
475        policies: policy::Config<Write>,
476        signer: G,
477        rng: Rng,
478        node: NodeAnnouncement,
479        emitter: Emitter<Event>,
480    ) -> Self {
481        let sessions = Sessions::new(rng.clone());
482        let limiter = RateLimiter::new(config.peers());
483        let last_timestamp = node.timestamp;
484        let clock = LocalTime::default(); // Updated on initialize.
485        let inventory = gossip::inventory(clock.into(), []); // Updated on initialize.
486
487        Self {
488            config,
489            storage,
490            policies,
491            signer,
492            rng,
493            inventory,
494            node,
495            clock,
496            db,
497            outbox: Outbox::default(),
498            limiter,
499            sessions,
500            fetching: HashMap::new(),
501            filter: Filter::empty(),
502            relayed_by: HashMap::default(),
503            last_idle: LocalTime::default(),
504            last_gossip: LocalTime::default(),
505            last_sync: LocalTime::default(),
506            last_prune: LocalTime::default(),
507            last_timestamp,
508            last_announce: LocalTime::default(),
509            last_inventory: LocalTime::default(),
510            started_at: None,     // Updated on initialize.
511            last_online_at: None, // Updated on initialize.
512            emitter,
513            listening: vec![],
514            metrics: Metrics::default(),
515        }
516    }
517
518    /// Whether the service was started (initialized) and if so, at what time.
519    pub fn started(&self) -> Option<LocalTime> {
520        self.started_at
521    }
522
523    /// Return the next i/o action to execute.
524    #[allow(clippy::should_implement_trait)]
525    pub fn next(&mut self) -> Option<io::Io> {
526        self.outbox.next()
527    }
528
529    /// Seed a repository.
530    /// Returns whether or not the repo policy was updated.
531    pub fn seed(&mut self, id: &RepoId, scope: Scope) -> Result<bool, policy::Error> {
532        let updated = self.policies.seed(id, scope)?;
533        self.filter.insert(id);
534
535        Ok(updated)
536    }
537
538    /// Unseed a repository.
539    /// Returns whether or not the repo policy was updated.
540    /// Note that when unseeding, we don't announce anything to the network. This is because by
541    /// simply not announcing it anymore, it will eventually be pruned by nodes.
542    pub fn unseed(&mut self, id: &RepoId) -> Result<bool, policy::Error> {
543        let updated = self.policies.unseed(id)?;
544
545        if updated {
546            // Nb. This is potentially slow if we have lots of repos. We should probably
547            // only re-compute the filter when we've unseeded a certain amount of repos
548            // and the filter is really out of date.
549            //
550            // TODO: Share this code with initialization code.
551            self.filter = Filter::new(
552                self.policies
553                    .seed_policies()?
554                    .filter_map(|t| (t.policy.is_allow()).then_some(t.rid)),
555            );
556            // Update and announce new inventory.
557            if let Err(e) = self.remove_inventory(id) {
558                error!(target: "service", "Error updating inventory after unseed: {e}");
559            }
560        }
561        Ok(updated)
562    }
563
564    /// Find the closest `n` peers by proximity in seeding graphs.
565    /// Returns a sorted list from the closest peer to the furthest.
566    /// Peers with more seedings in common score score higher.
567    #[allow(unused)]
568    pub fn closest_peers(&self, n: usize) -> Vec<NodeId> {
569        todo!()
570    }
571
572    /// Get the database.
573    pub fn database(&self) -> &Stores<D> {
574        &self.db
575    }
576
577    /// Get the mutable database.
578    pub fn database_mut(&mut self) -> &mut Stores<D> {
579        &mut self.db
580    }
581
582    /// Get the storage instance.
583    pub fn storage(&self) -> &S {
584        &self.storage
585    }
586
587    /// Get the mutable storage instance.
588    pub fn storage_mut(&mut self) -> &mut S {
589        &mut self.storage
590    }
591
592    /// Get the node policies.
593    pub fn policies(&self) -> &policy::Config<Write> {
594        &self.policies
595    }
596
597    /// Get the local signer.
598    pub fn signer(&self) -> &G {
599        &self.signer
600    }
601
602    /// Subscriber to inner `Emitter` events.
603    pub fn events(&mut self) -> Events {
604        Events::from(self.emitter.subscribe())
605    }
606
607    /// Get I/O outbox.
608    pub fn outbox(&mut self) -> &mut Outbox {
609        &mut self.outbox
610    }
611
612    /// Get configuration.
613    pub fn config(&self) -> &Config {
614        &self.config
615    }
616
617    /// Lookup a repository, both locally and in the routing table.
618    pub fn lookup(&self, rid: RepoId) -> Result<Lookup, LookupError> {
619        let this = self.nid();
620        let local = self.storage.get(rid)?;
621        let remote = self
622            .db
623            .routing()
624            .get(&rid)?
625            .iter()
626            .filter(|nid| nid != &this)
627            .cloned()
628            .collect();
629
630        Ok(Lookup { local, remote })
631    }
632
633    /// Initialize service with current time. Call this once.
634    pub fn initialize(&mut self, time: LocalTime) -> Result<(), Error> {
635        debug!(target: "service", "Init @{}", time.as_millis());
636        assert_ne!(time, LocalTime::default());
637
638        let nid = self.node_id();
639
640        self.clock = time;
641        self.started_at = Some(time);
642        self.last_online_at = match self.db.gossip().last() {
643            Ok(Some(last)) => Some(last.to_local_time()),
644            Ok(None) => None,
645            Err(e) => {
646                error!(target: "service", "Error getting the lastest gossip message from db: {e}");
647                None
648            }
649        };
650
651        // Populate refs database. This is only useful as part of the upgrade process for nodes
652        // that have been online since before the refs database was created.
653        match self.db.refs().count() {
654            Ok(0) => {
655                info!(target: "service", "Empty refs database, populating from storage..");
656                if let Err(e) = self.db.refs_mut().populate(&self.storage) {
657                    error!(target: "service", "Failed to populate refs database: {e}");
658                }
659            }
660            Ok(n) => debug!(target: "service", "Refs database has {n} cached references"),
661            Err(e) => error!(target: "service", "Error checking refs database: {e}"),
662        }
663
664        let announced = self
665            .db
666            .seeds()
667            .seeded_by(&nid)?
668            .collect::<Result<HashMap<_, _>, _>>()?;
669        let mut inventory = BTreeSet::new();
670        let mut private = BTreeSet::new();
671
672        for repo in self.storage.repositories()? {
673            let rid = repo.rid;
674
675            // If we're not seeding this repo, just skip it.
676            if !self.policies.is_seeding(&rid)? {
677                warn!(target: "service", "Local repository {rid} is not seeded");
678                continue;
679            }
680            // Add public repositories to inventory.
681            if repo.doc.visibility.is_public() {
682                inventory.insert(rid);
683            } else {
684                private.insert(rid);
685            }
686            // If we have no owned refs for this repo, then there's nothing to announce.
687            let Some(updated_at) = repo.synced_at else {
688                continue;
689            };
690            // Skip this repo if the sync status matches what we have in storage.
691            if let Some(announced) = announced.get(&rid) {
692                if updated_at.oid == announced.oid {
693                    continue;
694                }
695            }
696            // Make sure our local node's sync status is up to date with storage.
697            if self.db.seeds_mut().synced(
698                &rid,
699                &nid,
700                updated_at.oid,
701                updated_at.timestamp.into(),
702            )? {
703                debug!(target: "service", "Saved local sync status for {rid}..");
704            }
705            // If we got here, it likely means a repo was updated while the node was stopped.
706            // Therefore, we pre-load a refs announcement for this repo, so that it is included in
707            // the historical gossip messages when a node connects and subscribes to this repo.
708            if let Ok((ann, _)) = self.refs_announcement_for(rid, [nid]) {
709                debug!(target: "service", "Adding refs announcement for {rid} to historical gossip messages..");
710                self.db.gossip_mut().announced(&nid, &ann)?;
711            }
712        }
713
714        // Ensure that our inventory is recorded in our routing table, and we are seeding
715        // all of it. It can happen that inventory is not properly seeded if for eg. the
716        // user creates a new repository while the node is stopped.
717        self.db
718            .routing_mut()
719            .add_inventory(inventory.iter(), nid, time.into())?;
720        self.inventory = gossip::inventory(self.timestamp(), inventory);
721
722        // Ensure that private repositories are not in our inventory. It's possible that
723        // a repository was public and then it was made private.
724        self.db
725            .routing_mut()
726            .remove_inventories(private.iter(), &nid)?;
727
728        // Setup subscription filter for seeded repos.
729        self.filter = Filter::new(
730            self.policies
731                .seed_policies()?
732                .filter_map(|t| (t.policy.is_allow()).then_some(t.rid)),
733        );
734        // Connect to configured peers.
735        let addrs = self.config.connect.clone();
736        for (id, addr) in addrs.into_iter().map(|ca| ca.into()) {
737            self.connect(id, addr);
738        }
739        // Try to establish some connections.
740        self.maintain_connections();
741        // Start periodic tasks.
742        self.outbox.wakeup(IDLE_INTERVAL);
743        self.outbox.wakeup(GOSSIP_INTERVAL);
744
745        Ok(())
746    }
747
748    pub fn tick(&mut self, now: LocalTime, metrics: &Metrics) {
749        trace!(
750            target: "service",
751            "Tick +{}",
752            now - self.started_at.expect("Service::tick: service must be initialized")
753        );
754        if now >= self.clock {
755            self.clock = now;
756        } else {
757            // Nb. In tests, we often move the clock forwards in time to test different behaviors,
758            // so this warning isn't applicable there.
759            #[cfg(not(test))]
760            warn!(
761                target: "service",
762                "System clock is not monotonic: {now} is not greater or equal to {}", self.clock
763            );
764        }
765        self.metrics = metrics.clone();
766    }
767
768    pub fn wake(&mut self) {
769        let now = self.clock;
770
771        trace!(
772            target: "service",
773            "Wake +{}",
774            now - self.started_at.expect("Service::wake: service must be initialized")
775        );
776
777        if now - self.last_idle >= IDLE_INTERVAL {
778            trace!(target: "service", "Running 'idle' task...");
779
780            self.keep_alive(&now);
781            self.disconnect_unresponsive_peers(&now);
782            self.idle_connections();
783            self.maintain_connections();
784            self.dequeue_fetches();
785            self.outbox.wakeup(IDLE_INTERVAL);
786            self.last_idle = now;
787        }
788        if now - self.last_gossip >= GOSSIP_INTERVAL {
789            trace!(target: "service", "Running 'gossip' task...");
790
791            if let Err(e) = self.relay_announcements() {
792                error!(target: "service", "Error relaying stored announcements: {e}");
793            }
794            self.outbox.wakeup(GOSSIP_INTERVAL);
795            self.last_gossip = now;
796        }
797        if now - self.last_sync >= SYNC_INTERVAL {
798            trace!(target: "service", "Running 'sync' task...");
799
800            if let Err(e) = self.fetch_missing_repositories() {
801                error!(target: "service", "Error fetching missing inventory: {e}");
802            }
803            self.outbox.wakeup(SYNC_INTERVAL);
804            self.last_sync = now;
805        }
806        if now - self.last_announce >= ANNOUNCE_INTERVAL {
807            trace!(target: "service", "Running 'announce' task...");
808
809            self.announce_inventory();
810            self.outbox.wakeup(ANNOUNCE_INTERVAL);
811            self.last_announce = now;
812        }
813        if now - self.last_prune >= PRUNE_INTERVAL {
814            trace!(target: "service", "Running 'prune' task...");
815
816            if let Err(err) = self.prune_routing_entries(&now) {
817                error!(target: "service", "Error pruning routing entries: {err}");
818            }
819            if let Err(err) = self
820                .db
821                .gossip_mut()
822                .prune((now - self.config.limits.gossip_max_age).into())
823            {
824                error!(target: "service", "Error pruning gossip entries: {err}");
825            }
826
827            self.outbox.wakeup(PRUNE_INTERVAL);
828            self.last_prune = now;
829        }
830
831        // Always check whether there are persistent peers that need reconnecting.
832        self.maintain_persistent();
833    }
834
835    pub fn command(&mut self, cmd: Command) {
836        info!(target: "service", "Received command {:?}", cmd);
837
838        match cmd {
839            Command::Connect(nid, addr, opts) => {
840                if opts.persistent {
841                    self.config.connect.insert((nid, addr.clone()).into());
842                }
843                if !self.connect(nid, addr) {
844                    // TODO: Return error to command.
845                }
846            }
847            Command::Disconnect(nid) => {
848                self.outbox.disconnect(nid, DisconnectReason::Command);
849            }
850            Command::Config(resp) => {
851                resp.send(self.config.clone()).ok();
852            }
853            Command::ListenAddrs(resp) => {
854                resp.send(self.listening.clone()).ok();
855            }
856            Command::Seeds(rid, resp) => match self.seeds(&rid) {
857                Ok(seeds) => {
858                    let (connected, disconnected) = seeds.partition();
859                    debug!(
860                        target: "service",
861                        "Found {} connected seed(s) and {} disconnected seed(s) for {}",
862                        connected.len(), disconnected.len(),  rid
863                    );
864                    resp.send(seeds).ok();
865                }
866                Err(e) => {
867                    error!(target: "service", "Error getting seeds for {rid}: {e}");
868                }
869            },
870            Command::Fetch(rid, seed, timeout, resp) => {
871                self.fetch(rid, seed, timeout, Some(resp));
872            }
873            Command::Seed(rid, scope, resp) => {
874                // Update our seeding policy.
875                let seeded = self
876                    .seed(&rid, scope)
877                    .expect("Service::command: error seeding repository");
878                resp.send(seeded).ok();
879
880                // Let all our peers know that we're interested in this repo from now on.
881                self.outbox.broadcast(
882                    Message::subscribe(self.filter(), self.clock.into(), Timestamp::MAX),
883                    self.sessions.connected().map(|(_, s)| s),
884                );
885            }
886            Command::Unseed(id, resp) => {
887                let updated = self
888                    .unseed(&id)
889                    .expect("Service::command: error unseeding repository");
890                resp.send(updated).ok();
891            }
892            Command::Follow(id, alias, resp) => {
893                let seeded = self
894                    .policies
895                    .follow(&id, alias.as_deref())
896                    .expect("Service::command: error following node");
897                resp.send(seeded).ok();
898            }
899            Command::Unfollow(id, resp) => {
900                let updated = self
901                    .policies
902                    .unfollow(&id)
903                    .expect("Service::command: error unfollowing node");
904                resp.send(updated).ok();
905            }
906            Command::AnnounceRefs(id, resp) => {
907                let doc = match self.storage.get(id) {
908                    Ok(Some(doc)) => doc,
909                    Ok(None) => {
910                        error!(target: "service", "Error announcing refs: repository {id} not found");
911                        return;
912                    }
913                    Err(e) => {
914                        error!(target: "service", "Error announcing refs: doc error: {e}");
915                        return;
916                    }
917                };
918
919                match self.announce_own_refs(id, doc) {
920                    Ok(refs) => match refs.as_slice() {
921                        &[refs] => {
922                            resp.send(refs).ok();
923                        }
924                        // SAFETY: Since we passed in one NID, we should get exactly one item back.
925                        [..] => panic!("Service::command: unexpected refs returned"),
926                    },
927                    Err(err) => {
928                        error!(target: "service", "Error announcing refs: {err}");
929                    }
930                }
931            }
932            Command::AnnounceInventory => {
933                self.announce_inventory();
934            }
935            Command::AddInventory(rid, resp) => match self.add_inventory(rid) {
936                Ok(updated) => {
937                    resp.send(updated).ok();
938                }
939                Err(e) => {
940                    error!(target: "service", "Error adding {rid} to inventory: {e}");
941                }
942            },
943            Command::QueryState(query, sender) => {
944                sender.send(query(self)).ok();
945            }
946        }
947    }
948
949    /// Initiate an outgoing fetch for some repository, based on another node's announcement.
950    /// Returns `true` if the fetch was initiated and `false` if it was skipped.
951    fn fetch_refs_at(
952        &mut self,
953        rid: RepoId,
954        from: NodeId,
955        refs: NonEmpty<RefsAt>,
956        scope: Scope,
957        timeout: time::Duration,
958        channel: Option<chan::Sender<FetchResult>>,
959    ) -> bool {
960        match self.refs_status_of(rid, refs, &scope) {
961            Ok(status) => {
962                if status.want.is_empty() {
963                    debug!(target: "service", "Skipping fetch for {rid}, all refs are already in storage");
964                } else {
965                    return self._fetch(rid, from, status.want, timeout, channel);
966                }
967            }
968            Err(e) => {
969                error!(target: "service", "Error getting the refs status of {rid}: {e}");
970            }
971        }
972        // We didn't try to fetch anything.
973        false
974    }
975
976    /// Initiate an outgoing fetch for some repository.
977    fn fetch(
978        &mut self,
979        rid: RepoId,
980        from: NodeId,
981        timeout: time::Duration,
982        channel: Option<chan::Sender<FetchResult>>,
983    ) -> bool {
984        self._fetch(rid, from, vec![], timeout, channel)
985    }
986
987    fn _fetch(
988        &mut self,
989        rid: RepoId,
990        from: NodeId,
991        refs_at: Vec<RefsAt>,
992        timeout: time::Duration,
993        channel: Option<chan::Sender<FetchResult>>,
994    ) -> bool {
995        match self.try_fetch(rid, &from, refs_at.clone(), timeout) {
996            Ok(fetching) => {
997                if let Some(c) = channel {
998                    fetching.subscribe(c);
999                }
1000                return true;
1001            }
1002            Err(TryFetchError::AlreadyFetching(fetching)) => {
1003                // If we're already fetching the same refs from the requested peer, there's nothing
1004                // to do, we simply add the supplied channel to the list of subscribers so that it
1005                // is notified on completion. Otherwise, we queue a fetch with the requested peer.
1006                if fetching.from == from && fetching.refs_at == refs_at {
1007                    debug!(target: "service", "Ignoring redundant fetch of {rid} from {from}");
1008
1009                    if let Some(c) = channel {
1010                        fetching.subscribe(c);
1011                    }
1012                } else {
1013                    let fetch = QueuedFetch {
1014                        rid,
1015                        refs_at,
1016                        from,
1017                        timeout,
1018                        channel,
1019                    };
1020                    debug!(target: "service", "Queueing fetch for {rid} with {from} (already fetching)..");
1021
1022                    self.queue_fetch(fetch);
1023                }
1024            }
1025            Err(TryFetchError::SessionCapacityReached) => {
1026                debug!(target: "service", "Fetch capacity reached for {from}, queueing {rid}..");
1027                self.queue_fetch(QueuedFetch {
1028                    rid,
1029                    refs_at,
1030                    from,
1031                    timeout,
1032                    channel,
1033                });
1034            }
1035            Err(e) => {
1036                if let Some(c) = channel {
1037                    c.send(FetchResult::Failed {
1038                        reason: e.to_string(),
1039                    })
1040                    .ok();
1041                }
1042            }
1043        }
1044        false
1045    }
1046
1047    fn queue_fetch(&mut self, fetch: QueuedFetch) {
1048        let Some(s) = self.sessions.get_mut(&fetch.from) else {
1049            log::error!(target: "service", "Cannot queue fetch for unknown session {}", fetch.from);
1050            return;
1051        };
1052        if let Err(e) = s.queue_fetch(fetch) {
1053            let fetch = e.inner();
1054            log::debug!(target: "service", "Unable to queue fetch for {} with {}: {e}", &fetch.rid, &fetch.from);
1055        }
1056    }
1057
1058    // TODO: Buffer/throttle fetches.
1059    fn try_fetch(
1060        &mut self,
1061        rid: RepoId,
1062        from: &NodeId,
1063        refs_at: Vec<RefsAt>,
1064        timeout: time::Duration,
1065    ) -> Result<&mut FetchState, TryFetchError> {
1066        let from = *from;
1067        let Some(session) = self.sessions.get_mut(&from) else {
1068            return Err(TryFetchError::SessionNotConnected);
1069        };
1070        let fetching = self.fetching.entry(rid);
1071
1072        trace!(target: "service", "Trying to fetch {refs_at:?} for {rid}..");
1073
1074        let fetching = match fetching {
1075            Entry::Vacant(fetching) => fetching,
1076            Entry::Occupied(fetching) => {
1077                // We're already fetching this repo from some peer.
1078                return Err(TryFetchError::AlreadyFetching(fetching.into_mut()));
1079            }
1080        };
1081        // Sanity check: We shouldn't be fetching from this session, since we return above if we're
1082        // fetching from any session.
1083        debug_assert!(!session.is_fetching(&rid));
1084
1085        if !session.is_connected() {
1086            // This can happen if a session disconnects in the time between asking for seeds to
1087            // fetch from, and initiating the fetch from one of those seeds.
1088            return Err(TryFetchError::SessionNotConnected);
1089        }
1090        if session.is_at_capacity() {
1091            // If we're already fetching multiple repos from this peer.
1092            return Err(TryFetchError::SessionCapacityReached);
1093        }
1094
1095        let fetching = fetching.insert(FetchState {
1096            from,
1097            refs_at: refs_at.clone(),
1098            subscribers: vec![],
1099        });
1100        self.outbox.fetch(session, rid, refs_at, timeout);
1101
1102        Ok(fetching)
1103    }
1104
1105    pub fn fetched(
1106        &mut self,
1107        rid: RepoId,
1108        remote: NodeId,
1109        result: Result<fetch::FetchResult, FetchError>,
1110    ) {
1111        let Some(fetching) = self.fetching.remove(&rid) else {
1112            error!(target: "service", "Received unexpected fetch result for {rid}, from {remote}");
1113            return;
1114        };
1115        debug_assert_eq!(fetching.from, remote);
1116
1117        if let Some(s) = self.sessions.get_mut(&remote) {
1118            // Mark this RID as fetched for this session.
1119            s.fetched(rid);
1120        }
1121
1122        // Notify all fetch subscribers of the fetch result. This is used when the user requests
1123        // a fetch via the CLI, for example.
1124        for sub in &fetching.subscribers {
1125            debug!(target: "service", "Found existing fetch request from {remote}, sending result..");
1126
1127            let result = match &result {
1128                Ok(success) => FetchResult::Success {
1129                    updated: success.updated.clone(),
1130                    namespaces: success.namespaces.clone(),
1131                    clone: success.clone,
1132                },
1133                Err(e) => FetchResult::Failed {
1134                    reason: e.to_string(),
1135                },
1136            };
1137            if sub.send(result).is_err() {
1138                error!(target: "service", "Error sending fetch result for {rid} from {remote}..");
1139            } else {
1140                debug!(target: "service", "Sent fetch result for {rid} from {remote}..");
1141            }
1142        }
1143
1144        match result {
1145            Ok(fetch::FetchResult {
1146                updated,
1147                namespaces,
1148                clone,
1149                doc,
1150            }) => {
1151                info!(target: "service", "Fetched {rid} from {remote} successfully");
1152                // Update our routing table in case this fetch was user-initiated and doesn't
1153                // come from an announcement.
1154                self.seed_discovered(rid, remote, self.clock.into());
1155
1156                for update in &updated {
1157                    if update.is_skipped() {
1158                        trace!(target: "service", "Ref skipped: {update} for {rid}");
1159                    } else {
1160                        debug!(target: "service", "Ref updated: {update} for {rid}");
1161                    }
1162                }
1163                self.emitter.emit(Event::RefsFetched {
1164                    remote,
1165                    rid,
1166                    updated: updated.clone(),
1167                });
1168
1169                // Announce our new inventory if this fetch was a full clone.
1170                // Only update and announce inventory for public repositories.
1171                if clone && doc.visibility.is_public() {
1172                    debug!(target: "service", "Updating and announcing inventory for cloned repository {rid}..");
1173
1174                    if let Err(e) = self.add_inventory(rid) {
1175                        error!(target: "service", "Error announcing inventory for {rid}: {e}");
1176                    }
1177                }
1178
1179                // It's possible for a fetch to succeed but nothing was updated.
1180                if updated.is_empty() || updated.iter().all(|u| u.is_skipped()) {
1181                    debug!(target: "service", "Nothing to announce, no refs were updated..");
1182                } else {
1183                    // Finally, announce the refs. This is useful for nodes to know what we've synced,
1184                    // beyond just knowing that we have added an item to our inventory.
1185                    if let Err(e) = self.announce_refs(rid, doc.into(), namespaces) {
1186                        error!(target: "service", "Failed to announce new refs: {e}");
1187                    }
1188                }
1189            }
1190            Err(err) => {
1191                error!(target: "service", "Fetch failed for {rid} from {remote}: {err}");
1192
1193                // For now, we only disconnect the remote in case of timeout. In the future,
1194                // there may be other reasons to disconnect.
1195                if err.is_timeout() {
1196                    self.outbox.disconnect(remote, DisconnectReason::Fetch(err));
1197                }
1198            }
1199        }
1200        // We can now try to dequeue more fetches.
1201        self.dequeue_fetches();
1202    }
1203
1204    /// Attempt to dequeue fetches from all peers.
1205    /// At most one fetch is dequeued per peer. If the fetch cannot be processed,
1206    /// it is put back on the queue for that peer.
1207    ///
1208    /// Fetches are queued for two reasons:
1209    /// 1. The RID was already being fetched.
1210    /// 2. The session was already at fetch capacity.
1211    pub fn dequeue_fetches(&mut self) {
1212        let sessions = self
1213            .sessions
1214            .shuffled()
1215            .map(|(k, _)| *k)
1216            .collect::<Vec<_>>();
1217
1218        // Try to dequeue once per session.
1219        for nid in sessions {
1220            // SAFETY: All the keys we are iterating on exist.
1221            #[allow(clippy::unwrap_used)]
1222            let sess = self.sessions.get_mut(&nid).unwrap();
1223            if !sess.is_connected() || sess.is_at_capacity() {
1224                continue;
1225            }
1226
1227            if let Some(QueuedFetch {
1228                rid,
1229                from,
1230                refs_at,
1231                timeout,
1232                channel,
1233            }) = sess.dequeue_fetch()
1234            {
1235                debug!(target: "service", "Dequeued fetch for {rid} from session {from}..");
1236
1237                if let Some(refs) = NonEmpty::from_vec(refs_at) {
1238                    let repo_entry = self.policies.seed_policy(&rid).expect(
1239                        "Service::dequeue_fetch: error accessing repo seeding configuration",
1240                    );
1241                    let SeedingPolicy::Allow { scope } = repo_entry.policy else {
1242                        debug!(target: "service", "Repository {rid} is no longer seeded, skipping..");
1243                        continue;
1244                    };
1245                    self.fetch_refs_at(rid, from, refs, scope, timeout, channel);
1246                } else {
1247                    // If no refs are specified, always do a full fetch.
1248                    self.fetch(rid, from, timeout, channel);
1249                }
1250            }
1251        }
1252    }
1253
1254    /// Inbound connection attempt.
1255    pub fn accepted(&mut self, ip: IpAddr) -> bool {
1256        // Always accept localhost connections, even if we already reached
1257        // our inbound connection limit.
1258        if ip.is_loopback() || ip.is_unspecified() {
1259            return true;
1260        }
1261        // Check for inbound connection limit.
1262        if self.sessions.inbound().count() >= self.config.limits.connection.inbound {
1263            return false;
1264        }
1265        match self.db.addresses().is_ip_banned(ip) {
1266            Ok(banned) => {
1267                if banned {
1268                    debug!(target: "service", "Rejecting inbound connection from banned ip {ip}");
1269                    return false;
1270                }
1271            }
1272            Err(e) => error!(target: "service", "Error querying ban status for {ip}: {e}"),
1273        }
1274        let host: HostName = ip.into();
1275
1276        if self.limiter.limit(
1277            host.clone(),
1278            None,
1279            &self.config.limits.rate.inbound,
1280            self.clock,
1281        ) {
1282            trace!(target: "service", "Rate limiting inbound connection from {host}..");
1283            return false;
1284        }
1285        true
1286    }
1287
1288    pub fn attempted(&mut self, nid: NodeId, addr: Address) {
1289        debug!(target: "service", "Attempted connection to {nid} ({addr})");
1290
1291        if let Some(sess) = self.sessions.get_mut(&nid) {
1292            sess.to_attempted();
1293        } else {
1294            #[cfg(debug_assertions)]
1295            panic!("Service::attempted: unknown session {nid}@{addr}");
1296        }
1297    }
1298
1299    pub fn listening(&mut self, local_addr: net::SocketAddr) {
1300        info!(target: "node", "Listening on {local_addr}..");
1301
1302        self.listening.push(local_addr);
1303    }
1304
1305    pub fn connected(&mut self, remote: NodeId, addr: Address, link: Link) {
1306        info!(target: "service", "Connected to {remote} ({addr}) ({link:?})");
1307        self.emitter.emit(Event::PeerConnected { nid: remote });
1308
1309        let msgs = self.initial(link);
1310
1311        if link.is_outbound() {
1312            if let Some(peer) = self.sessions.get_mut(&remote) {
1313                peer.to_connected(self.clock);
1314                self.outbox.write_all(peer, msgs);
1315            }
1316        } else {
1317            match self.sessions.entry(remote) {
1318                Entry::Occupied(mut e) => {
1319                    // In this scenario, it's possible that our peer is persistent, and
1320                    // disconnected. We get an inbound connection before we attempt a re-connection,
1321                    // and therefore we treat it as a regular inbound connection.
1322                    //
1323                    // It's also possible that a disconnection hasn't gone through yet and our
1324                    // peer is still in connected state here, while a new inbound connection from
1325                    // that same peer is made. This results in a new connection from a peer that is
1326                    // already connected from the perspective of the service. This appears to be
1327                    // a bug in the underlying networking library.
1328                    let peer = e.get_mut();
1329                    debug!(
1330                        target: "service",
1331                        "Connecting peer {remote} already has a session open ({peer})"
1332                    );
1333                    peer.link = link;
1334                    peer.to_connected(self.clock);
1335                    self.outbox.write_all(peer, msgs);
1336                }
1337                Entry::Vacant(e) => {
1338                    if let HostName::Ip(ip) = addr.host {
1339                        if !address::is_local(&ip) {
1340                            if let Err(e) =
1341                                self.db
1342                                    .addresses_mut()
1343                                    .record_ip(&remote, ip, self.clock.into())
1344                            {
1345                                log::error!(target: "service", "Error recording IP address for {remote}: {e}");
1346                            }
1347                        }
1348                    }
1349                    let peer = e.insert(Session::inbound(
1350                        remote,
1351                        addr,
1352                        self.config.is_persistent(&remote),
1353                        self.rng.clone(),
1354                        self.clock,
1355                        self.config.limits.clone(),
1356                    ));
1357                    self.outbox.write_all(peer, msgs);
1358                }
1359            }
1360        }
1361    }
1362
1363    pub fn disconnected(&mut self, remote: NodeId, link: Link, reason: &DisconnectReason) {
1364        let since = self.local_time();
1365        let Some(session) = self.sessions.get_mut(&remote) else {
1366            // Since we sometimes disconnect the service eagerly, it's not unusual to get a second
1367            // disconnection event once the transport is dropped.
1368            trace!(target: "service", "Redundant disconnection for {} ({})", remote, reason);
1369            return;
1370        };
1371        // In cases of connection conflicts, there may be disconnections of one of the two
1372        // connections. In that case we don't want the service to remove the session.
1373        if session.link != link {
1374            return;
1375        }
1376
1377        info!(target: "service", "Disconnected from {} ({})", remote, reason);
1378        self.emitter.emit(Event::PeerDisconnected {
1379            nid: remote,
1380            reason: reason.to_string(),
1381        });
1382
1383        let link = session.link;
1384        let addr = session.addr.clone();
1385
1386        self.fetching.retain(|_, fetching| {
1387            if fetching.from != remote {
1388                return true;
1389            }
1390            // Remove and fail any pending fetches from this remote node.
1391            for resp in &fetching.subscribers {
1392                resp.send(FetchResult::Failed {
1393                    reason: format!("disconnected: {reason}"),
1394                })
1395                .ok();
1396            }
1397            false
1398        });
1399
1400        // Attempt to re-connect to persistent peers.
1401        if self.config.peer(&remote).is_some() {
1402            let delay = LocalDuration::from_secs(2u64.saturating_pow(session.attempts() as u32))
1403                .clamp(MIN_RECONNECTION_DELTA, MAX_RECONNECTION_DELTA);
1404
1405            // Nb. We always try to reconnect to persistent peers, even when the error appears
1406            // to not be transient.
1407            session.to_disconnected(since, since + delay);
1408
1409            debug!(target: "service", "Reconnecting to {remote} in {delay}..");
1410
1411            self.outbox.wakeup(delay);
1412        } else {
1413            debug!(target: "service", "Dropping peer {remote}..");
1414            self.sessions.remove(&remote);
1415
1416            let severity = match reason {
1417                DisconnectReason::Dial(_)
1418                | DisconnectReason::Fetch(_)
1419                | DisconnectReason::Connection(_) => {
1420                    if self.is_online() {
1421                        // If we're "online", there's something wrong with this
1422                        // peer connection specifically.
1423                        Severity::Medium
1424                    } else {
1425                        Severity::Low
1426                    }
1427                }
1428                DisconnectReason::Session(e) => e.severity(),
1429                DisconnectReason::Command
1430                | DisconnectReason::Conflict
1431                | DisconnectReason::SelfConnection => Severity::Low,
1432            };
1433
1434            if let Err(e) = self
1435                .db
1436                .addresses_mut()
1437                .disconnected(&remote, &addr, severity)
1438            {
1439                error!(target: "service", "Error updating address store: {e}");
1440            }
1441            // Only re-attempt outbound connections, since we don't care if an inbound connection
1442            // is dropped.
1443            if link.is_outbound() {
1444                self.maintain_connections();
1445            }
1446        }
1447        self.dequeue_fetches();
1448    }
1449
1450    pub fn received_message(&mut self, remote: NodeId, message: Message) {
1451        if let Err(err) = self.handle_message(&remote, message) {
1452            // If there's an error, stop processing messages from this peer.
1453            // However, we still relay messages returned up to this point.
1454            self.outbox
1455                .disconnect(remote, DisconnectReason::Session(err));
1456
1457            // FIXME: The peer should be set in a state such that we don't
1458            // process further messages.
1459        }
1460    }
1461
1462    /// Handle an announcement message.
1463    ///
1464    /// Returns `true` if this announcement should be stored and relayed to connected peers,
1465    /// and `false` if it should not.
1466    pub fn handle_announcement(
1467        &mut self,
1468        relayer: &NodeId,
1469        relayer_addr: &Address,
1470        announcement: &Announcement,
1471    ) -> Result<Option<gossip::AnnouncementId>, session::Error> {
1472        if !announcement.verify() {
1473            return Err(session::Error::Misbehavior);
1474        }
1475        let Announcement {
1476            node: announcer,
1477            message,
1478            ..
1479        } = announcement;
1480
1481        // Ignore our own announcements, in case the relayer sent one by mistake.
1482        if announcer == self.nid() {
1483            return Ok(None);
1484        }
1485        let now = self.clock;
1486        let timestamp = message.timestamp();
1487
1488        // Don't allow messages from too far in the future.
1489        if timestamp.saturating_sub(now.as_millis()) > MAX_TIME_DELTA.as_millis() as u64 {
1490            return Err(session::Error::InvalidTimestamp(timestamp));
1491        }
1492
1493        // We don't process announcements from nodes we don't know, since the node announcement is
1494        // what provides DoS protection.
1495        //
1496        // Note that it's possible to *not* receive the node announcement, but receive the
1497        // subsequent announcements of a node in the case of historical gossip messages requested
1498        // from the `subscribe` message. This can happen if the cut-off time is after the node
1499        // announcement timestamp, but before the other announcements. In that case, we simply
1500        // ignore all announcements of that node until we get a node announcement.
1501        if let AnnouncementMessage::Inventory(_) | AnnouncementMessage::Refs(_) = message {
1502            match self.db.addresses().get(announcer) {
1503                Ok(node) => {
1504                    if node.is_none() {
1505                        debug!(target: "service", "Ignoring announcement from unknown node {announcer} (t={timestamp})");
1506                        return Ok(None);
1507                    }
1508                }
1509                Err(e) => {
1510                    error!(target: "service", "Error looking up node in address book: {e}");
1511                    return Ok(None);
1512                }
1513            }
1514        }
1515
1516        // Discard announcement messages we've already seen, otherwise update our last seen time.
1517        let relay = match self.db.gossip_mut().announced(announcer, announcement) {
1518            Ok(Some(id)) => {
1519                log::debug!(
1520                    target: "service",
1521                    "Stored announcement from {announcer} to be broadcast in {} (t={timestamp})",
1522                    (self.last_gossip + GOSSIP_INTERVAL) - self.clock
1523                );
1524                // Keep track of who relayed the message for later.
1525                self.relayed_by.entry(id).or_default().push(*relayer);
1526
1527                // Decide whether or not to relay this message, if it's fresh.
1528                // To avoid spamming peers on startup with historical gossip messages,
1529                // don't relay messages that are too old. We make an exception for node announcements,
1530                // since they are cached, and will hence often carry old timestamps.
1531                let relay = message.is_node_announcement()
1532                    || now - timestamp.to_local_time() <= MAX_TIME_DELTA;
1533                relay.then_some(id)
1534            }
1535            Ok(None) => {
1536                // FIXME: Still mark as relayed by this peer.
1537                // FIXME: Refs announcements should not be delayed, since they are only sent
1538                // to subscribers.
1539                debug!(target: "service", "Ignoring stale announcement from {announcer} (t={timestamp})");
1540                return Ok(None);
1541            }
1542            Err(e) => {
1543                error!(target: "service", "Error updating gossip entry from {announcer}: {e}");
1544                return Ok(None);
1545            }
1546        };
1547
1548        match message {
1549            // Process a peer inventory update announcement by (maybe) fetching.
1550            AnnouncementMessage::Inventory(message) => {
1551                self.emitter.emit(Event::InventoryAnnounced {
1552                    nid: *announcer,
1553                    inventory: message.inventory.to_vec(),
1554                    timestamp: message.timestamp,
1555                });
1556                match self.sync_routing(
1557                    message.inventory.iter().cloned(),
1558                    *announcer,
1559                    message.timestamp,
1560                ) {
1561                    Ok(synced) => {
1562                        if synced.is_empty() {
1563                            trace!(target: "service", "No routes updated by inventory announcement from {announcer}");
1564                            return Ok(None);
1565                        }
1566                    }
1567                    Err(e) => {
1568                        error!(target: "service", "Error processing inventory from {announcer}: {e}");
1569                        return Ok(None);
1570                    }
1571                }
1572                let mut missing = Vec::new();
1573                let nid = *self.nid();
1574
1575                // Here we handle the special case where the inventory we received is that of
1576                // a connected peer, as opposed to being relayed to us.
1577                if let Some(sess) = self.sessions.get_mut(announcer) {
1578                    for id in message.inventory.as_slice() {
1579                        // If we are connected to the announcer of this inventory, update the peer's
1580                        // subscription filter to include all inventory items. This way, we'll
1581                        // relay messages relating to the peer's inventory.
1582                        if let Some(sub) = &mut sess.subscribe {
1583                            sub.filter.insert(id);
1584                        }
1585
1586                        // If we're seeding and connected to the announcer, and we don't have
1587                        // the inventory, fetch it from the announcer.
1588                        if self.policies.is_seeding(id).expect(
1589                            "Service::handle_announcement: error accessing seeding configuration",
1590                        ) {
1591                            // Only if we do not have the repository locally do we fetch here.
1592                            // If we do have it, only fetch after receiving a ref announcement.
1593                            match self.db.routing().entry(id, &nid) {
1594                                Ok(entry) => {
1595                                    if entry.is_none() {
1596                                        missing.push(*id);
1597                                    }
1598                                }
1599                                Err(e) => error!(
1600                                    target: "service",
1601                                    "Error checking local inventory for {id}: {e}"
1602                                ),
1603                            }
1604                        }
1605                    }
1606                }
1607                // Since we have limited fetch capacity, it may be that we can't fetch an entire
1608                // inventory from a peer. Therefore we randomize the order of the RIDs to fetch
1609                // different RIDs from different peers in case multiple peers announce the same
1610                // RIDs.
1611                self.rng.shuffle(&mut missing);
1612
1613                for rid in missing {
1614                    debug!(target: "service", "Missing seeded inventory {rid}; initiating fetch..");
1615                    self.fetch(rid, *announcer, FETCH_TIMEOUT, None);
1616                }
1617                return Ok(relay);
1618            }
1619            AnnouncementMessage::Refs(message) => {
1620                self.emitter.emit(Event::RefsAnnounced {
1621                    nid: *announcer,
1622                    rid: message.rid,
1623                    refs: message.refs.to_vec(),
1624                    timestamp: message.timestamp,
1625                });
1626                // Empty announcements can be safely ignored.
1627                let Some(refs) = NonEmpty::from_vec(message.refs.to_vec()) else {
1628                    debug!(target: "service", "Skipping fetch, no refs in announcement for {} (t={timestamp})", message.rid);
1629                    return Ok(None);
1630                };
1631                // We update inventories when receiving ref announcements, as these could come
1632                // from a new repository being initialized.
1633                self.seed_discovered(message.rid, *announcer, message.timestamp);
1634
1635                // Update sync status of announcer for this repo.
1636                if let Some(refs) = refs.iter().find(|r| &r.remote == self.nid()) {
1637                    debug!(
1638                        target: "service",
1639                        "Refs announcement of {announcer} for {} contains our own remote at {} (t={})",
1640                        message.rid, refs.at, message.timestamp
1641                    );
1642                    match self.db.seeds_mut().synced(
1643                        &message.rid,
1644                        announcer,
1645                        refs.at,
1646                        message.timestamp,
1647                    ) {
1648                        Ok(updated) => {
1649                            if updated {
1650                                debug!(
1651                                    target: "service",
1652                                    "Updating sync status of {announcer} for {} to {}",
1653                                    message.rid, refs.at
1654                                );
1655                                self.emitter.emit(Event::RefsSynced {
1656                                    rid: message.rid,
1657                                    remote: *announcer,
1658                                    at: refs.at,
1659                                });
1660                            } else {
1661                                debug!(
1662                                    target: "service",
1663                                    "Sync status of {announcer} was not updated for {}",
1664                                    message.rid,
1665                                );
1666                            }
1667                        }
1668                        Err(e) => {
1669                            error!(target: "service", "Error updating sync status for {}: {e}", message.rid);
1670                        }
1671                    }
1672                }
1673                let repo_entry = self.policies.seed_policy(&message.rid).expect(
1674                    "Service::handle_announcement: error accessing repo seeding configuration",
1675                );
1676                let SeedingPolicy::Allow { scope } = repo_entry.policy else {
1677                    debug!(
1678                        target: "service",
1679                        "Ignoring refs announcement from {announcer}: repository {} isn't seeded (t={timestamp})",
1680                        message.rid
1681                    );
1682                    return Ok(None);
1683                };
1684                // Refs can be relayed by peers who don't have the data in storage,
1685                // therefore we only check whether we are connected to the *announcer*,
1686                // which is required by the protocol to only announce refs it has.
1687                let Some(remote) = self.sessions.get(announcer).cloned() else {
1688                    trace!(
1689                        target: "service",
1690                        "Skipping fetch of {}, no sessions connected to {announcer}",
1691                        message.rid
1692                    );
1693                    return Ok(relay);
1694                };
1695                // Finally, start the fetch.
1696                self.fetch_refs_at(message.rid, remote.id, refs, scope, FETCH_TIMEOUT, None);
1697
1698                return Ok(relay);
1699            }
1700            AnnouncementMessage::Node(
1701                ann @ NodeAnnouncement {
1702                    features,
1703                    addresses,
1704                    ..
1705                },
1706            ) => {
1707                self.emitter.emit(Event::NodeAnnounced {
1708                    nid: *announcer,
1709                    alias: ann.alias.clone(),
1710                    timestamp: ann.timestamp,
1711                    features: *features,
1712                    addresses: addresses.to_vec(),
1713                });
1714                // If this node isn't a seed, we're not interested in adding it
1715                // to our address book, but other nodes may be, so we relay the message anyway.
1716                if !features.has(Features::SEED) {
1717                    return Ok(relay);
1718                }
1719
1720                match self.db.addresses_mut().insert(
1721                    announcer,
1722                    ann.version,
1723                    ann.features,
1724                    &ann.alias,
1725                    ann.work(),
1726                    &ann.agent,
1727                    timestamp,
1728                    addresses
1729                        .iter()
1730                        // Ignore non-routable addresses unless received from a local network
1731                        // peer. This allows the node to function in a local network.
1732                        .filter(|a| a.is_routable() || relayer_addr.is_local())
1733                        .map(|a| KnownAddress::new(a.clone(), address::Source::Peer)),
1734                ) {
1735                    Ok(updated) => {
1736                        // Only relay if we received new information.
1737                        if updated {
1738                            debug!(
1739                                target: "service",
1740                                "Address store entry for node {announcer} updated at {timestamp}"
1741                            );
1742                            return Ok(relay);
1743                        }
1744                    }
1745                    Err(err) => {
1746                        // An error here is due to a fault in our address store.
1747                        error!(target: "service", "Error processing node announcement from {announcer}: {err}");
1748                    }
1749                }
1750            }
1751        }
1752        Ok(None)
1753    }
1754
1755    pub fn handle_info(&mut self, remote: NodeId, info: &Info) -> Result<(), session::Error> {
1756        match info {
1757            // Nb. We don't currently send this message.
1758            Info::RefsAlreadySynced { rid, at } => {
1759                debug!(target: "service", "Refs already synced for {rid} by {remote}");
1760                self.emitter.emit(Event::RefsSynced {
1761                    rid: *rid,
1762                    remote,
1763                    at: *at,
1764                });
1765            }
1766        }
1767
1768        Ok(())
1769    }
1770
1771    pub fn handle_message(
1772        &mut self,
1773        remote: &NodeId,
1774        message: Message,
1775    ) -> Result<(), session::Error> {
1776        let local = self.node_id();
1777        let relay = self.config.is_relay();
1778        let Some(peer) = self.sessions.get_mut(remote) else {
1779            warn!(target: "service", "Session not found for {remote}");
1780            return Ok(());
1781        };
1782        peer.last_active = self.clock;
1783
1784        let limit = match peer.link {
1785            Link::Outbound => &self.config.limits.rate.outbound,
1786            Link::Inbound => &self.config.limits.rate.inbound,
1787        };
1788        if self
1789            .limiter
1790            .limit(peer.addr.clone().into(), Some(remote), limit, self.clock)
1791        {
1792            debug!(target: "service", "Rate limiting message from {remote} ({})", peer.addr);
1793            return Ok(());
1794        }
1795        message.log(log::Level::Debug, remote, Link::Inbound);
1796
1797        let connected = match &mut peer.state {
1798            session::State::Disconnected { .. } => {
1799                debug!(target: "service", "Ignoring message from disconnected peer {}", peer.id);
1800                return Ok(());
1801            }
1802            // In case of a discrepancy between the service state and the state of the underlying
1803            // wire protocol, we may receive a message from a peer that we consider not fully connected
1804            // at the service level. To remedy this, we simply transition the peer to a connected state.
1805            //
1806            // This is not ideal, but until the wire protocol and service are unified, it's the simplest
1807            // solution to converge towards the same state.
1808            session::State::Attempted { .. } | session::State::Initial => {
1809                debug!(target: "service", "Received unexpected message from connecting peer {}", peer.id);
1810                debug!(target: "service", "Transitioning peer {} to 'connected' state", peer.id);
1811
1812                peer.to_connected(self.clock);
1813
1814                None
1815            }
1816            session::State::Connected {
1817                ping, latencies, ..
1818            } => Some((ping, latencies)),
1819        };
1820
1821        trace!(target: "service", "Received message {message:?} from {remote}");
1822
1823        match message {
1824            // Process a peer announcement.
1825            Message::Announcement(ann) => {
1826                let relayer = remote;
1827                let relayer_addr = peer.addr.clone();
1828
1829                if let Some(id) = self.handle_announcement(relayer, &relayer_addr, &ann)? {
1830                    if self.config.is_relay() {
1831                        if let AnnouncementMessage::Inventory(_) = ann.message {
1832                            if let Err(e) = self
1833                                .database_mut()
1834                                .gossip_mut()
1835                                .set_relay(id, gossip::RelayStatus::Relay)
1836                            {
1837                                error!(target: "service", "Error setting relay flag for message: {e}");
1838                                return Ok(());
1839                            }
1840                        } else {
1841                            self.relay(id, ann);
1842                        }
1843                    }
1844                }
1845            }
1846            Message::Subscribe(subscribe) => {
1847                // Filter announcements by interest.
1848                match self
1849                    .db
1850                    .gossip()
1851                    .filtered(&subscribe.filter, subscribe.since, subscribe.until)
1852                {
1853                    Ok(anns) => {
1854                        for ann in anns {
1855                            let ann = match ann {
1856                                Ok(a) => a,
1857                                Err(e) => {
1858                                    error!(target: "service", "Error reading gossip message from store: {e}");
1859                                    continue;
1860                                }
1861                            };
1862                            // Don't send announcements authored by the remote, back to the remote.
1863                            if ann.node == *remote {
1864                                continue;
1865                            }
1866                            // Only send messages if we're a relay, or it's our own messages.
1867                            if relay || ann.node == local {
1868                                self.outbox.write(peer, ann.into());
1869                            }
1870                        }
1871                    }
1872                    Err(e) => {
1873                        error!(target: "service", "Error querying gossip messages from store: {e}");
1874                    }
1875                }
1876                peer.subscribe = Some(subscribe);
1877            }
1878            Message::Info(info) => {
1879                self.handle_info(*remote, &info)?;
1880            }
1881            Message::Ping(Ping { ponglen, .. }) => {
1882                // Ignore pings which ask for too much data.
1883                if ponglen > Ping::MAX_PONG_ZEROES {
1884                    return Ok(());
1885                }
1886                self.outbox.write(
1887                    peer,
1888                    Message::Pong {
1889                        zeroes: ZeroBytes::new(ponglen),
1890                    },
1891                );
1892            }
1893            Message::Pong { zeroes } => {
1894                if let Some((ping, latencies)) = connected {
1895                    if let session::PingState::AwaitingResponse {
1896                        len: ponglen,
1897                        since,
1898                    } = *ping
1899                    {
1900                        if (ponglen as usize) == zeroes.len() {
1901                            *ping = session::PingState::Ok;
1902                            // Keep track of peer latency.
1903                            latencies.push_back(self.clock - since);
1904                            if latencies.len() > MAX_LATENCIES {
1905                                latencies.pop_front();
1906                            }
1907                        }
1908                    }
1909                }
1910            }
1911        }
1912        Ok(())
1913    }
1914
1915    /// A convenient method to check if we should fetch from a `RefsAnnouncement` with `scope`.
1916    fn refs_status_of(
1917        &self,
1918        rid: RepoId,
1919        refs: NonEmpty<RefsAt>,
1920        scope: &policy::Scope,
1921    ) -> Result<RefsStatus, Error> {
1922        let mut refs = RefsStatus::new(rid, refs, self.db.refs())?;
1923        // Check that there's something we want.
1924        if refs.want.is_empty() {
1925            return Ok(refs);
1926        }
1927        // Check scope.
1928        let mut refs = match scope {
1929            policy::Scope::All => refs,
1930            policy::Scope::Followed => match self.policies.namespaces_for(&self.storage, &rid) {
1931                Ok(Namespaces::All) => refs,
1932                Ok(Namespaces::Followed(followed)) => {
1933                    refs.want.retain(|r| followed.contains(&r.remote));
1934                    refs
1935                }
1936                Err(e) => return Err(e.into()),
1937            },
1938        };
1939        // Remove our own remote, we don't want to fetch that.
1940        refs.want.retain(|r| r.remote != self.node_id());
1941
1942        Ok(refs)
1943    }
1944
1945    /// Add a seed to our routing table.
1946    fn seed_discovered(&mut self, rid: RepoId, nid: NodeId, time: Timestamp) {
1947        if let Ok(result) = self.db.routing_mut().add_inventory([&rid], nid, time) {
1948            if let &[(_, InsertResult::SeedAdded)] = result.as_slice() {
1949                self.emitter.emit(Event::SeedDiscovered { rid, nid });
1950                info!(target: "service", "Routing table updated for {} with seed {nid}", rid);
1951            }
1952        }
1953    }
1954
1955    /// Set of initial messages to send to a peer.
1956    fn initial(&mut self, _link: Link) -> Vec<Message> {
1957        let now = self.clock();
1958        let filter = self.filter();
1959
1960        // TODO: Only subscribe to outbound connections, otherwise we will consume too
1961        // much bandwidth.
1962
1963        // If we've been previously connected to the network, we'll have received gossip messages.
1964        // Instead of simply taking the last timestamp we try to ensure we don't miss any
1965        // messages due un-synchronized clocks.
1966        //
1967        // If this is our first connection to the network, we just ask for a fixed backlog
1968        // of messages to get us started.
1969        let since = if let Some(last) = self.last_online_at {
1970            Timestamp::from(last - SUBSCRIBE_BACKLOG_DELTA)
1971        } else {
1972            (*now - INITIAL_SUBSCRIBE_BACKLOG_DELTA).into()
1973        };
1974        debug!(target: "service", "Subscribing to messages since timestamp {since}..");
1975
1976        vec![
1977            Message::node(self.node.clone(), &self.signer),
1978            Message::inventory(self.inventory.clone(), &self.signer),
1979            Message::subscribe(filter, since, Timestamp::MAX),
1980        ]
1981    }
1982
1983    /// Try to guess whether we're online or not.
1984    fn is_online(&self) -> bool {
1985        self.sessions
1986            .connected()
1987            .filter(|(_, s)| s.addr.is_routable() && s.last_active >= self.clock - IDLE_INTERVAL)
1988            .count()
1989            > 0
1990    }
1991
1992    /// Remove a local repository from our inventory.
1993    fn remove_inventory(&mut self, rid: &RepoId) -> Result<bool, Error> {
1994        let node = self.node_id();
1995        let now = self.timestamp();
1996
1997        let removed = self.db.routing_mut().remove_inventory(rid, &node)?;
1998        if removed {
1999            self.refresh_and_announce_inventory(now)?;
2000        }
2001        Ok(removed)
2002    }
2003
2004    /// Add a local repository to our inventory.
2005    fn add_inventory(&mut self, rid: RepoId) -> Result<bool, Error> {
2006        let node = self.node_id();
2007        let now = self.timestamp();
2008
2009        if !self.storage.contains(&rid)? {
2010            error!(target: "service", "Attempt to add non-existing inventory {rid}: repository not found in storage");
2011            return Ok(false);
2012        }
2013        // Add to our local inventory.
2014        let updates = self.db.routing_mut().add_inventory([&rid], node, now)?;
2015        let updated = !updates.is_empty();
2016
2017        if updated {
2018            self.refresh_and_announce_inventory(now)?;
2019        }
2020        Ok(updated)
2021    }
2022
2023    /// Update cached inventory message, and announce new inventory to peers.
2024    fn refresh_and_announce_inventory(&mut self, time: Timestamp) -> Result<(), Error> {
2025        let inventory = self.inventory()?;
2026
2027        self.inventory = gossip::inventory(time, inventory);
2028        self.announce_inventory();
2029
2030        Ok(())
2031    }
2032
2033    /// Get our local inventory.
2034    ///
2035    /// A node's inventory is the advertized list of repositories offered by a node.
2036    ///
2037    /// A node's inventory consists of *public* repositories that are seeded and available locally
2038    /// in the node's storage. We use the routing table as the canonical state of all inventories,
2039    /// including the local node's.
2040    ///
2041    /// When a repository is unseeded, it is also removed from the inventory. Private repositories
2042    /// are *not* part of a node's inventory.
2043    fn inventory(&self) -> Result<HashSet<RepoId>, Error> {
2044        self.db
2045            .routing()
2046            .get_inventory(self.nid())
2047            .map_err(Error::from)
2048    }
2049
2050    /// Process a peer inventory announcement by updating our routing table.
2051    /// This function expects the peer's full inventory, and prunes entries that are not in the
2052    /// given inventory.
2053    fn sync_routing(
2054        &mut self,
2055        inventory: impl IntoIterator<Item = RepoId>,
2056        from: NodeId,
2057        timestamp: Timestamp,
2058    ) -> Result<SyncedRouting, Error> {
2059        let mut synced = SyncedRouting::default();
2060        let included = inventory.into_iter().collect::<BTreeSet<_>>();
2061
2062        for (rid, result) in
2063            self.db
2064                .routing_mut()
2065                .add_inventory(included.iter(), from, timestamp)?
2066        {
2067            match result {
2068                InsertResult::SeedAdded => {
2069                    info!(target: "service", "Routing table updated for {rid} with seed {from}");
2070                    self.emitter.emit(Event::SeedDiscovered { rid, nid: from });
2071
2072                    if self
2073                        .policies
2074                        .is_seeding(&rid)
2075                        .expect("Service::process_inventory: error accessing seeding configuration")
2076                    {
2077                        // TODO: We should fetch here if we're already connected, case this seed has
2078                        // refs we don't have.
2079                    }
2080                    synced.added.push(rid);
2081                }
2082                InsertResult::TimeUpdated => {
2083                    synced.updated.push(rid);
2084                }
2085                InsertResult::NotUpdated => {}
2086            }
2087        }
2088        for rid in self.db.routing().get_inventory(&from)?.into_iter() {
2089            if !included.contains(&rid) {
2090                if self.db.routing_mut().remove_inventory(&rid, &from)? {
2091                    synced.removed.push(rid);
2092                    self.emitter.emit(Event::SeedDropped { rid, nid: from });
2093                }
2094            }
2095        }
2096        Ok(synced)
2097    }
2098
2099    /// Return a refs announcement including the given remotes.
2100    fn refs_announcement_for(
2101        &mut self,
2102        rid: RepoId,
2103        remotes: impl IntoIterator<Item = NodeId>,
2104    ) -> Result<(Announcement, Vec<RefsAt>), Error> {
2105        let repo = self.storage.repository(rid)?;
2106        let timestamp = self.timestamp();
2107        let mut refs = BoundedVec::<_, REF_REMOTE_LIMIT>::new();
2108
2109        for remote_id in remotes.into_iter() {
2110            let refs_at = RefsAt::new(&repo, remote_id)?;
2111
2112            if refs.push(refs_at).is_err() {
2113                warn!(
2114                    target: "service",
2115                    "refs announcement limit ({}) exceeded, peers will see only some of your repository references",
2116                    REF_REMOTE_LIMIT,
2117                );
2118                break;
2119            }
2120        }
2121
2122        let msg = AnnouncementMessage::from(RefsAnnouncement {
2123            rid,
2124            refs: refs.clone(),
2125            timestamp,
2126        });
2127        Ok((msg.signed(&self.signer), refs.into()))
2128    }
2129
2130    /// Announce our own refs for the given repo.
2131    fn announce_own_refs(&mut self, rid: RepoId, doc: Doc<Verified>) -> Result<Vec<RefsAt>, Error> {
2132        let (refs, timestamp) = self.announce_refs(rid, doc, [self.node_id()])?;
2133
2134        // Update refs database with our signed refs branches.
2135        // This isn't strictly necessary for now, as we only use the database for fetches, and
2136        // we don't fetch our own refs that are announced, but it's for good measure.
2137        if let &[r] = refs.as_slice() {
2138            self.emitter.emit(Event::LocalRefsAnnounced {
2139                rid,
2140                refs: r,
2141                timestamp,
2142            });
2143            if let Err(e) = self.database_mut().refs_mut().set(
2144                &rid,
2145                &r.remote,
2146                &SIGREFS_BRANCH,
2147                r.at,
2148                timestamp.to_local_time(),
2149            ) {
2150                error!(
2151                    target: "service",
2152                    "Error updating refs database for `rad/sigrefs` of {} in {rid}: {e}",
2153                    r.remote
2154                );
2155            }
2156        }
2157        Ok(refs)
2158    }
2159
2160    /// Announce local refs for given repo.
2161    fn announce_refs(
2162        &mut self,
2163        rid: RepoId,
2164        doc: Doc<Verified>,
2165        remotes: impl IntoIterator<Item = NodeId>,
2166    ) -> Result<(Vec<RefsAt>, Timestamp), Error> {
2167        let (ann, refs) = self.refs_announcement_for(rid, remotes)?;
2168        let timestamp = ann.timestamp();
2169        let peers = self.sessions.connected().map(|(_, p)| p);
2170
2171        // Update our sync status for our own refs. This is useful for determining if refs were
2172        // updated while the node was stopped.
2173        if let Some(refs) = refs.iter().find(|r| r.remote == ann.node) {
2174            info!(
2175                target: "service",
2176                "Announcing own refs for {rid} to peers ({}) (t={timestamp})..",
2177                refs.at
2178            );
2179            // Update our local node's sync status to mark the refs as announced.
2180            if let Err(e) = self
2181                .db
2182                .seeds_mut()
2183                .synced(&rid, &ann.node, refs.at, timestamp)
2184            {
2185                error!(target: "service", "Error updating sync status for local node: {e}");
2186            } else {
2187                debug!(target: "service", "Saved local sync status for {rid}..");
2188            }
2189        }
2190
2191        self.outbox.announce(
2192            ann,
2193            peers.filter(|p| {
2194                // Only announce to peers who are allowed to view this repo.
2195                doc.is_visible_to(&p.id)
2196            }),
2197            self.db.gossip_mut(),
2198        );
2199        Ok((refs, timestamp))
2200    }
2201
2202    fn reconnect(&mut self, nid: NodeId, addr: Address) -> bool {
2203        if let Some(sess) = self.sessions.get_mut(&nid) {
2204            sess.to_initial();
2205            self.outbox.connect(nid, addr);
2206
2207            return true;
2208        }
2209        false
2210    }
2211
2212    fn connect(&mut self, nid: NodeId, addr: Address) -> bool {
2213        debug!(target: "service", "Connecting to {nid} ({addr})..");
2214
2215        if self.sessions.contains_key(&nid) {
2216            warn!(target: "service", "Attempted connection to peer {nid} which already has a session");
2217            return false;
2218        }
2219        if nid == self.node_id() {
2220            error!(target: "service", "Attempted connection to self");
2221            return false;
2222        }
2223        if self.sessions.outbound().count() >= self.config.limits.connection.outbound {
2224            error!(target: "service", "Outbound connection limit reached when attempting {nid} ({addr})");
2225            return false;
2226        }
2227        let persistent = self.config.is_persistent(&nid);
2228        let timestamp: Timestamp = self.clock.into();
2229
2230        if let Err(e) = self.db.addresses_mut().attempted(&nid, &addr, timestamp) {
2231            error!(target: "service", "Error updating address book with connection attempt: {e}");
2232        }
2233        self.sessions.insert(
2234            nid,
2235            Session::outbound(
2236                nid,
2237                addr.clone(),
2238                persistent,
2239                self.rng.clone(),
2240                self.config.limits.clone(),
2241            ),
2242        );
2243        self.outbox.connect(nid, addr);
2244
2245        true
2246    }
2247
2248    fn seeds(&self, rid: &RepoId) -> Result<Seeds, Error> {
2249        let mut seeds = Seeds::new(self.rng.clone());
2250
2251        // First build a list from peers that have synced our own refs, if any.
2252        // This step is skipped if we don't have the repository yet, or don't have
2253        // our own refs.
2254        if let Ok(repo) = self.storage.repository(*rid) {
2255            if let Ok(local) = RefsAt::new(&repo, self.node_id()) {
2256                for seed in self.db.seeds().seeds_for(rid)? {
2257                    let seed = seed?;
2258                    let state = self.sessions.get(&seed.nid).map(|s| s.state.clone());
2259                    let synced = if local.at == seed.synced_at.oid {
2260                        SyncStatus::Synced { at: seed.synced_at }
2261                    } else {
2262                        let local = SyncedAt::new(local.at, &repo)?;
2263
2264                        SyncStatus::OutOfSync {
2265                            local,
2266                            remote: seed.synced_at,
2267                        }
2268                    };
2269                    seeds.insert(Seed::new(seed.nid, seed.addresses, state, Some(synced)));
2270                }
2271            }
2272        }
2273
2274        // Then, add peers we know about but have no information about the sync status.
2275        // These peers have announced that they seed the repository via an inventory
2276        // announcement, but we haven't received any ref announcements from them.
2277        for nid in self.db.routing().get(rid)? {
2278            if nid == self.node_id() {
2279                continue;
2280            }
2281            if seeds.contains(&nid) {
2282                // We already have a richer entry for this node.
2283                continue;
2284            }
2285            let addrs = self.db.addresses().addresses_of(&nid)?;
2286            let state = self.sessions.get(&nid).map(|s| s.state.clone());
2287
2288            seeds.insert(Seed::new(nid, addrs, state, None));
2289        }
2290        Ok(seeds)
2291    }
2292
2293    /// Return a new filter object, based on our seeding policy.
2294    fn filter(&self) -> Filter {
2295        if self.config.seeding_policy.is_allow() {
2296            // TODO: Remove bits for blocked repos.
2297            Filter::default()
2298        } else {
2299            self.filter.clone()
2300        }
2301    }
2302
2303    /// Get a timestamp for using in announcements.
2304    /// Never returns the same timestamp twice.
2305    fn timestamp(&mut self) -> Timestamp {
2306        let now = Timestamp::from(self.clock);
2307        if *now > *self.last_timestamp {
2308            self.last_timestamp = now;
2309        } else {
2310            self.last_timestamp = self.last_timestamp + 1;
2311        }
2312        self.last_timestamp
2313    }
2314
2315    fn relay(&mut self, id: gossip::AnnouncementId, msg: Announcement) {
2316        let announcer = msg.node;
2317        let relayed_by = self.relayed_by.get(&id);
2318        // Choose peers we should relay this message to.
2319        // 1. Don't relay to a peer who sent us this message.
2320        // 2. Don't relay to the peer who signed this announcement.
2321        let relay_to = self
2322            .sessions
2323            .connected()
2324            .filter(|(id, _)| {
2325                relayed_by
2326                    .map(|relayers| !relayers.contains(id))
2327                    .unwrap_or(true) // If there are no relayers we let it through.
2328            })
2329            .filter(|(id, _)| **id != announcer)
2330            .map(|(_, p)| p);
2331
2332        self.outbox.relay(msg, relay_to);
2333    }
2334
2335    ////////////////////////////////////////////////////////////////////////////
2336    // Periodic tasks
2337    ////////////////////////////////////////////////////////////////////////////
2338
2339    fn relay_announcements(&mut self) -> Result<(), Error> {
2340        let now = self.clock.into();
2341        let rows = self.database_mut().gossip_mut().relays(now)?;
2342        let local = self.node_id();
2343
2344        for (id, msg) in rows {
2345            let announcer = msg.node;
2346            if announcer == local {
2347                // Don't relay our own stored gossip messages.
2348                continue;
2349            }
2350            self.relay(id, msg);
2351        }
2352        Ok(())
2353    }
2354
2355    /// Announce our inventory to all connected peers, unless it was already announced.
2356    fn announce_inventory(&mut self) {
2357        let timestamp = self.inventory.timestamp.to_local_time();
2358
2359        if self.last_inventory == timestamp {
2360            debug!(target: "service", "Skipping redundant inventory announcement (t={})", self.inventory.timestamp);
2361            return;
2362        }
2363        let msg = AnnouncementMessage::from(self.inventory.clone());
2364
2365        self.outbox.announce(
2366            msg.signed(&self.signer),
2367            self.sessions.connected().map(|(_, p)| p),
2368            self.db.gossip_mut(),
2369        );
2370        self.last_inventory = timestamp;
2371    }
2372
2373    fn prune_routing_entries(&mut self, now: &LocalTime) -> Result<(), routing::Error> {
2374        let count = self.db.routing().len()?;
2375        if count <= self.config.limits.routing_max_size {
2376            return Ok(());
2377        }
2378
2379        let delta = count - self.config.limits.routing_max_size;
2380        let nid = self.node_id();
2381        self.db.routing_mut().prune(
2382            (*now - self.config.limits.routing_max_age).into(),
2383            Some(delta),
2384            &nid,
2385        )?;
2386        Ok(())
2387    }
2388
2389    fn disconnect_unresponsive_peers(&mut self, now: &LocalTime) {
2390        let stale = self
2391            .sessions
2392            .connected()
2393            .filter(|(_, session)| *now - session.last_active >= STALE_CONNECTION_TIMEOUT);
2394
2395        for (_, session) in stale {
2396            debug!(target: "service", "Disconnecting unresponsive peer {}..", session.id);
2397
2398            // TODO: Should we switch the session state to "disconnected" even before receiving
2399            // an official "disconnect"? Otherwise we keep pinging until we get the disconnection.
2400
2401            self.outbox.disconnect(
2402                session.id,
2403                DisconnectReason::Session(session::Error::Timeout),
2404            );
2405        }
2406    }
2407
2408    /// Ensure connection health by pinging connected peers.
2409    fn keep_alive(&mut self, now: &LocalTime) {
2410        let inactive_sessions = self
2411            .sessions
2412            .connected_mut()
2413            .filter(|(_, session)| *now - session.last_active >= KEEP_ALIVE_DELTA)
2414            .map(|(_, session)| session);
2415        for session in inactive_sessions {
2416            session.ping(self.clock, &mut self.outbox).ok();
2417        }
2418    }
2419
2420    /// Get a list of peers available to connect to, sorted by lowest penalty.
2421    fn available_peers(&mut self) -> Vec<Peer> {
2422        match self.db.addresses().entries() {
2423            Ok(entries) => {
2424                // Nb. we don't want to connect to any peers that already have a session with us,
2425                // even if it's in a disconnected state. Those sessions are re-attempted automatically.
2426                let mut peers = entries
2427                    .filter(|entry| entry.version == PROTOCOL_VERSION)
2428                    .filter(|entry| !entry.address.banned)
2429                    .filter(|entry| !entry.penalty.is_connect_threshold_reached())
2430                    .filter(|entry| !self.sessions.contains_key(&entry.node))
2431                    .filter(|entry| !self.config.external_addresses.contains(&entry.address.addr))
2432                    .filter(|entry| &entry.node != self.nid())
2433                    .fold(HashMap::new(), |mut acc, entry| {
2434                        acc.entry(entry.node)
2435                            .and_modify(|e: &mut Peer| e.addresses.push(entry.address.clone()))
2436                            .or_insert_with(|| Peer {
2437                                nid: entry.node,
2438                                addresses: vec![entry.address],
2439                                penalty: entry.penalty,
2440                            });
2441                        acc
2442                    })
2443                    .into_values()
2444                    .collect::<Vec<_>>();
2445                peers.sort_by_key(|p| p.penalty);
2446                peers
2447            }
2448            Err(e) => {
2449                error!(target: "service", "Unable to lookup available peers in address book: {e}");
2450                Vec::new()
2451            }
2452        }
2453    }
2454
2455    /// Fetch all repositories that are seeded but missing from storage.
2456    fn fetch_missing_repositories(&mut self) -> Result<(), Error> {
2457        for policy in self.policies.seed_policies()? {
2458            let rid = policy.rid;
2459
2460            if !policy.is_allow() {
2461                continue;
2462            }
2463            if self.storage.contains(&rid)? {
2464                continue;
2465            }
2466            match self.seeds(&rid) {
2467                Ok(seeds) => {
2468                    if let Some(connected) = NonEmpty::from_vec(seeds.connected().collect()) {
2469                        for seed in connected {
2470                            self.fetch(rid, seed.nid, FETCH_TIMEOUT, None);
2471                        }
2472                    } else {
2473                        // TODO: We should make sure that this fetch is retried later, either
2474                        // when we connect to a seed, or when we discover a new seed.
2475                        // Since new connections and routing table updates are both conditions for
2476                        // fetching, we should trigger fetches when those conditions appear.
2477                        // Another way to handle this would be to update our database, saying
2478                        // that we're trying to fetch a certain repo. We would then just
2479                        // iterate over those entries in the above circumstances. This is
2480                        // merely an optimization though, we can also iterate over all seeded
2481                        // repos and check which ones are not in our inventory.
2482                        debug!(target: "service", "No connected seeds found for {rid}..");
2483                    }
2484                }
2485                Err(e) => {
2486                    error!(target: "service", "Couldn't fetch missing repo {rid}: failed to lookup seeds: {e}");
2487                }
2488            }
2489        }
2490        Ok(())
2491    }
2492
2493    /// Run idle task for all connections.
2494    fn idle_connections(&mut self) {
2495        for (_, sess) in self.sessions.iter_mut() {
2496            sess.idle(self.clock);
2497
2498            if sess.is_stable() {
2499                // Mark as connected once connection is stable.
2500                if let Err(e) =
2501                    self.db
2502                        .addresses_mut()
2503                        .connected(&sess.id, &sess.addr, self.clock.into())
2504                {
2505                    error!(target: "service", "Error updating address book with connection: {e}");
2506                }
2507            }
2508        }
2509    }
2510
2511    /// Try to maintain a target number of connections.
2512    fn maintain_connections(&mut self) {
2513        let PeerConfig::Dynamic = self.config.peers else {
2514            return;
2515        };
2516        trace!(target: "service", "Maintaining connections..");
2517
2518        let target = TARGET_OUTBOUND_PEERS;
2519        let now = self.clock;
2520        let outbound = self
2521            .sessions
2522            .values()
2523            .filter(|s| s.link.is_outbound())
2524            .filter(|s| s.is_connected() || s.is_connecting())
2525            .count();
2526        let wanted = target.saturating_sub(outbound);
2527
2528        // Don't connect to more peers than needed.
2529        if wanted == 0 {
2530            return;
2531        }
2532
2533        // Peers available to connect to.
2534        let available = self
2535            .available_peers()
2536            .into_iter()
2537            .filter_map(|peer| {
2538                peer.addresses
2539                    .into_iter()
2540                    .find(|ka| match (ka.last_success, ka.last_attempt) {
2541                        // If we succeeded the last time we tried, this is a good address.
2542                        // If it's been long enough that we failed to connect, we also try again.
2543                        (Some(success), Some(attempt)) => {
2544                            success >= attempt || now - attempt >= CONNECTION_RETRY_DELTA
2545                        }
2546                        // If we haven't succeeded yet, and we waited long enough, we can try this address.
2547                        (None, Some(attempt)) => now - attempt >= CONNECTION_RETRY_DELTA,
2548                        // If we have no failed attempts for this address, it's worth a try.
2549                        (_, None) => true,
2550                    })
2551                    .map(|ka| (peer.nid, ka))
2552            })
2553            .filter(|(_, ka)| match AddressType::from(&ka.addr) {
2554                // Only consider onion addresses if configured.
2555                AddressType::Onion => self.config.onion.is_some(),
2556                AddressType::Dns | AddressType::Ipv4 | AddressType::Ipv6 => true,
2557            });
2558
2559        // Peers we are going to attempt connections to.
2560        let connect = available.take(wanted).collect::<Vec<_>>();
2561        if connect.len() < wanted {
2562            log::debug!(
2563                target: "service",
2564                "Not enough available peers to connect to (available={}, wanted={wanted})",
2565                connect.len()
2566            );
2567        }
2568        for (id, ka) in connect {
2569            self.connect(id, ka.addr.clone());
2570        }
2571    }
2572
2573    /// Maintain persistent peer connections.
2574    fn maintain_persistent(&mut self) {
2575        trace!(target: "service", "Maintaining persistent peers..");
2576
2577        let now = self.local_time();
2578        let mut reconnect = Vec::new();
2579
2580        for (nid, session) in self.sessions.iter_mut() {
2581            if let Some(addr) = self.config.peer(nid) {
2582                if let session::State::Disconnected { retry_at, .. } = &mut session.state {
2583                    // TODO: Try to reconnect only if the peer was attempted. A disconnect without
2584                    // even a successful attempt means that we're unlikely to be able to reconnect.
2585
2586                    if now >= *retry_at {
2587                        reconnect.push((*nid, addr.clone(), session.attempts()));
2588                    }
2589                }
2590            }
2591        }
2592
2593        for (nid, addr, attempts) in reconnect {
2594            if self.reconnect(nid, addr) {
2595                debug!(target: "service", "Reconnecting to {nid} (attempts={attempts})...");
2596            }
2597        }
2598    }
2599}
2600
2601/// Gives read access to the service state.
2602pub trait ServiceState {
2603    /// Get the Node ID.
2604    fn nid(&self) -> &NodeId;
2605    /// Get the existing sessions.
2606    fn sessions(&self) -> &Sessions;
2607    /// Get fetch state.
2608    fn fetching(&self) -> &HashMap<RepoId, FetchState>;
2609    /// Get outbox.
2610    fn outbox(&self) -> &Outbox;
2611    /// Get rate limiter.
2612    fn limiter(&self) -> &RateLimiter;
2613    /// Get event emitter.
2614    fn emitter(&self) -> &Emitter<Event>;
2615    /// Get a repository from storage.
2616    fn get(&self, rid: RepoId) -> Result<Option<Doc<Verified>>, RepositoryError>;
2617    /// Get the clock.
2618    fn clock(&self) -> &LocalTime;
2619    /// Get the clock mutably.
2620    fn clock_mut(&mut self) -> &mut LocalTime;
2621    /// Get service configuration.
2622    fn config(&self) -> &Config;
2623    /// Get service metrics.
2624    fn metrics(&self) -> &Metrics;
2625}
2626
2627impl<D, S, G> ServiceState for Service<D, S, G>
2628where
2629    D: routing::Store,
2630    G: Signer,
2631    S: ReadStorage,
2632{
2633    fn nid(&self) -> &NodeId {
2634        self.signer.public_key()
2635    }
2636
2637    fn sessions(&self) -> &Sessions {
2638        &self.sessions
2639    }
2640
2641    fn fetching(&self) -> &HashMap<RepoId, FetchState> {
2642        &self.fetching
2643    }
2644
2645    fn outbox(&self) -> &Outbox {
2646        &self.outbox
2647    }
2648
2649    fn limiter(&self) -> &RateLimiter {
2650        &self.limiter
2651    }
2652
2653    fn emitter(&self) -> &Emitter<Event> {
2654        &self.emitter
2655    }
2656
2657    fn get(&self, rid: RepoId) -> Result<Option<Doc<Verified>>, RepositoryError> {
2658        self.storage.get(rid)
2659    }
2660
2661    fn clock(&self) -> &LocalTime {
2662        &self.clock
2663    }
2664
2665    fn clock_mut(&mut self) -> &mut LocalTime {
2666        &mut self.clock
2667    }
2668
2669    fn config(&self) -> &Config {
2670        &self.config
2671    }
2672
2673    fn metrics(&self) -> &Metrics {
2674        &self.metrics
2675    }
2676}
2677
2678/// Disconnect reason.
2679#[derive(Debug)]
2680pub enum DisconnectReason {
2681    /// Error while dialing the remote. This error occures before a connection is
2682    /// even established. Errors of this kind are usually not transient.
2683    Dial(Arc<dyn std::error::Error + Sync + Send>),
2684    /// Error with an underlying established connection. Sometimes, reconnecting
2685    /// after such an error is possible.
2686    Connection(Arc<dyn std::error::Error + Sync + Send>),
2687    /// Error with a fetch.
2688    Fetch(FetchError),
2689    /// Session error.
2690    Session(session::Error),
2691    /// Session conflicts with existing session.
2692    Conflict,
2693    /// Connection to self.
2694    SelfConnection,
2695    /// User requested disconnect
2696    Command,
2697}
2698
2699impl DisconnectReason {
2700    pub fn is_dial_err(&self) -> bool {
2701        matches!(self, Self::Dial(_))
2702    }
2703
2704    pub fn is_connection_err(&self) -> bool {
2705        matches!(self, Self::Connection(_))
2706    }
2707
2708    pub fn connection() -> Self {
2709        DisconnectReason::Connection(Arc::new(std::io::Error::from(
2710            std::io::ErrorKind::ConnectionReset,
2711        )))
2712    }
2713}
2714
2715impl fmt::Display for DisconnectReason {
2716    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2717        match self {
2718            Self::Dial(err) => write!(f, "{err}"),
2719            Self::Connection(err) => write!(f, "{err}"),
2720            Self::Command => write!(f, "command"),
2721            Self::SelfConnection => write!(f, "self-connection"),
2722            Self::Conflict => write!(f, "conflict"),
2723            Self::Session(err) => write!(f, "{err}"),
2724            Self::Fetch(err) => write!(f, "fetch: {err}"),
2725        }
2726    }
2727}
2728
2729/// Result of a project lookup.
2730#[derive(Debug)]
2731pub struct Lookup {
2732    /// Whether the project was found locally or not.
2733    pub local: Option<Doc<Verified>>,
2734    /// A list of remote peers on which the project is known to exist.
2735    pub remote: Vec<NodeId>,
2736}
2737
2738#[derive(thiserror::Error, Debug)]
2739pub enum LookupError {
2740    #[error(transparent)]
2741    Routing(#[from] routing::Error),
2742    #[error(transparent)]
2743    Repository(#[from] RepositoryError),
2744}
2745
2746#[derive(Debug, Clone)]
2747/// Holds currently (or recently) connected peers.
2748pub struct Sessions(AddressBook<NodeId, Session>);
2749
2750impl Sessions {
2751    pub fn new(rng: Rng) -> Self {
2752        Self(AddressBook::new(rng))
2753    }
2754
2755    /// Iterator over fully connected peers.
2756    pub fn connected(&self) -> impl Iterator<Item = (&NodeId, &Session)> + Clone {
2757        self.0
2758            .iter()
2759            .filter_map(move |(id, sess)| match &sess.state {
2760                session::State::Connected { .. } => Some((id, sess)),
2761                _ => None,
2762            })
2763    }
2764
2765    /// Iterator over connected inbound peers.
2766    pub fn inbound(&self) -> impl Iterator<Item = (&NodeId, &Session)> + Clone {
2767        self.connected().filter(|(_, s)| s.link.is_inbound())
2768    }
2769
2770    /// Iterator over outbound peers.
2771    pub fn outbound(&self) -> impl Iterator<Item = (&NodeId, &Session)> + Clone {
2772        self.connected().filter(|(_, s)| s.link.is_outbound())
2773    }
2774
2775    /// Iterator over mutable fully connected peers.
2776    pub fn connected_mut(&mut self) -> impl Iterator<Item = (&NodeId, &mut Session)> {
2777        self.0.iter_mut().filter(move |(_, s)| s.is_connected())
2778    }
2779
2780    /// Iterator over disconnected peers.
2781    pub fn disconnected_mut(&mut self) -> impl Iterator<Item = (&NodeId, &mut Session)> {
2782        self.0.iter_mut().filter(move |(_, s)| s.is_disconnected())
2783    }
2784
2785    /// Return whether this node has a fully established session.
2786    pub fn is_connected(&self, id: &NodeId) -> bool {
2787        self.0.get(id).map(|s| s.is_connected()).unwrap_or(false)
2788    }
2789
2790    /// Return whether this node can be connected to.
2791    pub fn is_disconnected(&self, id: &NodeId) -> bool {
2792        self.0.get(id).map(|s| s.is_disconnected()).unwrap_or(true)
2793    }
2794}
2795
2796impl Deref for Sessions {
2797    type Target = AddressBook<NodeId, Session>;
2798
2799    fn deref(&self) -> &Self::Target {
2800        &self.0
2801    }
2802}
2803
2804impl DerefMut for Sessions {
2805    fn deref_mut(&mut self) -> &mut Self::Target {
2806        &mut self.0
2807    }
2808}