radicle_node/
service.rs

1#![allow(clippy::too_many_arguments)]
2#![allow(clippy::collapsible_match)]
3#![allow(clippy::collapsible_if)]
4#![warn(clippy::unwrap_used)]
5pub mod filter;
6pub mod gossip;
7pub mod io;
8pub mod limiter;
9pub mod message;
10pub mod session;
11
12use std::collections::hash_map::Entry;
13use std::collections::{BTreeSet, HashMap, HashSet};
14use std::net::IpAddr;
15use std::ops::{Deref, DerefMut};
16use std::sync::Arc;
17use std::{fmt, net, time};
18
19use crossbeam_channel as chan;
20use fastrand::Rng;
21use localtime::{LocalDuration, LocalTime};
22use log::*;
23use nonempty::NonEmpty;
24
25use radicle::identity::Doc;
26use radicle::node;
27use radicle::node::address;
28use radicle::node::address::Store as _;
29use radicle::node::address::{AddressBook, AddressType, KnownAddress};
30use radicle::node::config::PeerConfig;
31use radicle::node::device::Device;
32use radicle::node::refs::Store as _;
33use radicle::node::routing::Store as _;
34use radicle::node::seed;
35use radicle::node::seed::Store as _;
36use radicle::node::{ConnectOptions, Penalty, Severity};
37use radicle::storage::refs::SIGREFS_BRANCH;
38use radicle::storage::RepositoryError;
39use radicle_fetch::policy::SeedingPolicy;
40
41use crate::identity::RepoId;
42use crate::node::routing;
43use crate::node::routing::InsertResult;
44use crate::node::{
45    Address, Alias, Features, FetchResult, HostName, Seed, Seeds, SyncStatus, SyncedAt,
46};
47use crate::prelude::*;
48use crate::runtime::Emitter;
49use crate::service::gossip::Store as _;
50use crate::service::message::{
51    Announcement, AnnouncementMessage, Info, NodeAnnouncement, Ping, RefsAnnouncement, RefsStatus,
52};
53use crate::service::policy::{store::Write, Scope};
54use crate::storage;
55use crate::storage::{refs::RefsAt, Namespaces, ReadStorage};
56use crate::worker::fetch;
57use crate::worker::FetchError;
58use crate::Link;
59use crate::{crypto, PROTOCOL_VERSION};
60
61pub use crate::node::events::{Event, Events};
62pub use crate::node::{config::Network, Config, NodeId};
63pub use crate::service::message::{Message, ZeroBytes};
64pub use crate::service::session::{QueuedFetch, Session};
65
66pub use radicle::node::policy::config as policy;
67
68use self::io::Outbox;
69use self::limiter::RateLimiter;
70use self::message::InventoryAnnouncement;
71use self::policy::NamespacesError;
72
73/// How often to run the "idle" task.
74pub const IDLE_INTERVAL: LocalDuration = LocalDuration::from_secs(30);
75/// How often to run the "gossip" task.
76pub const GOSSIP_INTERVAL: LocalDuration = LocalDuration::from_secs(6);
77/// How often to run the "announce" task.
78pub const ANNOUNCE_INTERVAL: LocalDuration = LocalDuration::from_mins(60);
79/// How often to run the "sync" task.
80pub const SYNC_INTERVAL: LocalDuration = LocalDuration::from_secs(60);
81/// How often to run the "prune" task.
82pub const PRUNE_INTERVAL: LocalDuration = LocalDuration::from_mins(30);
83/// Duration to wait on an unresponsive peer before dropping its connection.
84pub const STALE_CONNECTION_TIMEOUT: LocalDuration = LocalDuration::from_mins(2);
85/// How much time should pass after a peer was last active for a *ping* to be sent.
86pub const KEEP_ALIVE_DELTA: LocalDuration = LocalDuration::from_mins(1);
87/// Maximum number of latency values to keep for a session.
88pub const MAX_LATENCIES: usize = 16;
89/// Maximum time difference between the local time, and an announcement timestamp.
90pub const MAX_TIME_DELTA: LocalDuration = LocalDuration::from_mins(60);
91/// Maximum attempts to connect to a peer before we give up.
92pub const MAX_CONNECTION_ATTEMPTS: usize = 3;
93/// How far back from the present time should we request gossip messages when connecting to a peer,
94/// when we come online for the first time.
95pub const INITIAL_SUBSCRIBE_BACKLOG_DELTA: LocalDuration = LocalDuration::from_mins(60 * 24);
96/// When subscribing, what margin of error do we give ourselves. A igher delta means we ask for
97/// messages further back than strictly necessary, to account for missed messages.
98pub const SUBSCRIBE_BACKLOG_DELTA: LocalDuration = LocalDuration::from_mins(3);
99/// Minimum amount of time to wait before reconnecting to a peer.
100pub const MIN_RECONNECTION_DELTA: LocalDuration = LocalDuration::from_secs(3);
101/// Maximum amount of time to wait before reconnecting to a peer.
102pub const MAX_RECONNECTION_DELTA: LocalDuration = LocalDuration::from_mins(60);
103/// Connection retry delta used for ephemeral peers that failed to connect previously.
104pub const CONNECTION_RETRY_DELTA: LocalDuration = LocalDuration::from_mins(10);
105/// How long to wait for a fetch to stall before aborting, default is 3s.
106pub const FETCH_TIMEOUT: time::Duration = time::Duration::from_secs(3);
107/// Target number of peers to maintain connections to.
108pub const TARGET_OUTBOUND_PEERS: usize = 8;
109
110/// Maximum external address limit imposed by message size limits.
111pub use message::ADDRESS_LIMIT;
112/// Maximum inventory limit imposed by message size limits.
113pub use message::INVENTORY_LIMIT;
114/// Maximum number of project git references imposed by message size limits.
115pub use message::REF_REMOTE_LIMIT;
116
117/// Metrics we track.
118#[derive(Clone, Debug, Default, serde::Serialize)]
119#[serde(rename_all = "camelCase")]
120pub struct Metrics {
121    /// Metrics for each peer.
122    pub peers: HashMap<NodeId, PeerMetrics>,
123    /// Tasks queued in worker queue.
124    pub worker_queue_size: usize,
125    /// Current open channel count.
126    pub open_channels: usize,
127}
128
129impl Metrics {
130    /// Get metrics for the given peer.
131    pub fn peer(&mut self, nid: NodeId) -> &mut PeerMetrics {
132        self.peers.entry(nid).or_default()
133    }
134}
135
136/// Per-peer metrics we track.
137#[derive(Clone, Debug, Default, serde::Serialize)]
138#[serde(rename_all = "camelCase")]
139pub struct PeerMetrics {
140    pub received_git_bytes: usize,
141    pub received_fetch_requests: usize,
142    pub received_bytes: usize,
143    pub received_gossip_messages: usize,
144    pub sent_bytes: usize,
145    pub sent_fetch_requests: usize,
146    pub sent_git_bytes: usize,
147    pub sent_gossip_messages: usize,
148    pub streams_opened: usize,
149    pub inbound_connection_attempts: usize,
150    pub outbound_connection_attempts: usize,
151    pub disconnects: usize,
152}
153
154/// Result of syncing our routing table with a node's inventory.
155#[derive(Default)]
156struct SyncedRouting {
157    /// Repo entries added.
158    added: Vec<RepoId>,
159    /// Repo entries removed.
160    removed: Vec<RepoId>,
161    /// Repo entries updated (time).
162    updated: Vec<RepoId>,
163}
164
165impl SyncedRouting {
166    fn is_empty(&self) -> bool {
167        self.added.is_empty() && self.removed.is_empty() && self.updated.is_empty()
168    }
169}
170
171/// A peer we can connect to.
172#[derive(Debug, Clone)]
173struct Peer {
174    nid: NodeId,
175    addresses: Vec<KnownAddress>,
176    penalty: Penalty,
177}
178
179/// General service error.
180#[derive(thiserror::Error, Debug)]
181pub enum Error {
182    #[error(transparent)]
183    Git(#[from] radicle::git::raw::Error),
184    #[error(transparent)]
185    GitExt(#[from] radicle::git::ext::Error),
186    #[error(transparent)]
187    Storage(#[from] storage::Error),
188    #[error(transparent)]
189    Gossip(#[from] gossip::Error),
190    #[error(transparent)]
191    Refs(#[from] storage::refs::Error),
192    #[error(transparent)]
193    Routing(#[from] routing::Error),
194    #[error(transparent)]
195    Address(#[from] address::Error),
196    #[error(transparent)]
197    Database(#[from] node::db::Error),
198    #[error(transparent)]
199    Seeds(#[from] seed::Error),
200    #[error(transparent)]
201    Policy(#[from] policy::Error),
202    #[error(transparent)]
203    Repository(#[from] radicle::storage::RepositoryError),
204    #[error("namespaces error: {0}")]
205    Namespaces(#[from] NamespacesError),
206}
207
208/// A store for all node data.
209pub trait Store:
210    address::Store + gossip::Store + routing::Store + seed::Store + node::refs::Store
211{
212}
213
214impl Store for node::Database {}
215
216/// Function used to query internal service state.
217pub type QueryState = dyn Fn(&dyn ServiceState) -> Result<(), CommandError> + Send + Sync;
218
219/// Commands sent to the service by the operator.
220pub enum Command {
221    /// Announce repository references for given repository to peers.
222    AnnounceRefs(RepoId, chan::Sender<RefsAt>),
223    /// Announce local repositories to peers.
224    AnnounceInventory,
225    /// Add repository to local inventory.
226    AddInventory(RepoId, chan::Sender<bool>),
227    /// Connect to node with the given address.
228    Connect(NodeId, Address, ConnectOptions),
229    /// Disconnect from node.
230    Disconnect(NodeId),
231    /// Get the node configuration.
232    Config(chan::Sender<Config>),
233    /// Get the node's listen addresses.
234    ListenAddrs(chan::Sender<Vec<std::net::SocketAddr>>),
235    /// Lookup seeds for the given repository in the routing table.
236    Seeds(RepoId, chan::Sender<Seeds>),
237    /// Fetch the given repository from the network.
238    Fetch(RepoId, NodeId, time::Duration, chan::Sender<FetchResult>),
239    /// Seed the given repository.
240    Seed(RepoId, Scope, chan::Sender<bool>),
241    /// Unseed the given repository.
242    Unseed(RepoId, chan::Sender<bool>),
243    /// Follow the given node.
244    Follow(NodeId, Option<Alias>, chan::Sender<bool>),
245    /// Unfollow the given node.
246    Unfollow(NodeId, chan::Sender<bool>),
247    /// Query the internal service state.
248    QueryState(Arc<QueryState>, chan::Sender<Result<(), CommandError>>),
249}
250
251impl fmt::Debug for Command {
252    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
253        match self {
254            Self::AnnounceRefs(id, _) => write!(f, "AnnounceRefs({id})"),
255            Self::AnnounceInventory => write!(f, "AnnounceInventory"),
256            Self::AddInventory(rid, _) => write!(f, "AddInventory({rid})"),
257            Self::Connect(id, addr, opts) => write!(f, "Connect({id}, {addr}, {opts:?})"),
258            Self::Disconnect(id) => write!(f, "Disconnect({id})"),
259            Self::Config(_) => write!(f, "Config"),
260            Self::ListenAddrs(_) => write!(f, "ListenAddrs"),
261            Self::Seeds(id, _) => write!(f, "Seeds({id})"),
262            Self::Fetch(id, node, _, _) => write!(f, "Fetch({id}, {node})"),
263            Self::Seed(id, scope, _) => write!(f, "Seed({id}, {scope})"),
264            Self::Unseed(id, _) => write!(f, "Unseed({id})"),
265            Self::Follow(id, _, _) => write!(f, "Follow({id})"),
266            Self::Unfollow(id, _) => write!(f, "Unfollow({id})"),
267            Self::QueryState { .. } => write!(f, "QueryState(..)"),
268        }
269    }
270}
271
272/// Command-related errors.
273#[derive(thiserror::Error, Debug)]
274pub enum CommandError {
275    #[error(transparent)]
276    Storage(#[from] storage::Error),
277    #[error(transparent)]
278    Routing(#[from] routing::Error),
279    #[error(transparent)]
280    Policy(#[from] policy::Error),
281}
282
283/// Error returned by [`Service::try_fetch`].
284#[derive(thiserror::Error, Debug)]
285enum TryFetchError<'a> {
286    #[error("ongoing fetch for repository exists")]
287    AlreadyFetching(&'a mut FetchState),
288    #[error("peer is not connected; cannot initiate fetch")]
289    SessionNotConnected,
290    #[error("peer fetch capacity reached; cannot initiate fetch")]
291    SessionCapacityReached,
292    #[error(transparent)]
293    Namespaces(#[from] NamespacesError),
294}
295
296/// Fetch state for an ongoing fetch.
297#[derive(Debug)]
298pub struct FetchState {
299    /// Node we're fetching from.
300    pub from: NodeId,
301    /// What refs we're fetching.
302    pub refs_at: Vec<RefsAt>,
303    /// Channels waiting for fetch results.
304    pub subscribers: Vec<chan::Sender<FetchResult>>,
305}
306
307impl FetchState {
308    /// Add a subscriber to this fetch.
309    fn subscribe(&mut self, c: chan::Sender<FetchResult>) {
310        if !self.subscribers.iter().any(|s| s.same_channel(&c)) {
311            self.subscribers.push(c);
312        }
313    }
314}
315
316/// Holds all node stores.
317#[derive(Debug)]
318pub struct Stores<D>(D);
319
320impl<D> Stores<D>
321where
322    D: Store,
323{
324    /// Get the database as a routing store.
325    pub fn routing(&self) -> &impl routing::Store {
326        &self.0
327    }
328
329    /// Get the database as a routing store, mutably.
330    pub fn routing_mut(&mut self) -> &mut impl routing::Store {
331        &mut self.0
332    }
333
334    /// Get the database as an address store.
335    pub fn addresses(&self) -> &impl address::Store {
336        &self.0
337    }
338
339    /// Get the database as an address store, mutably.
340    pub fn addresses_mut(&mut self) -> &mut impl address::Store {
341        &mut self.0
342    }
343
344    /// Get the database as a gossip store.
345    pub fn gossip(&self) -> &impl gossip::Store {
346        &self.0
347    }
348
349    /// Get the database as a gossip store, mutably.
350    pub fn gossip_mut(&mut self) -> &mut impl gossip::Store {
351        &mut self.0
352    }
353
354    /// Get the database as a seed store.
355    pub fn seeds(&self) -> &impl seed::Store {
356        &self.0
357    }
358
359    /// Get the database as a seed store, mutably.
360    pub fn seeds_mut(&mut self) -> &mut impl seed::Store {
361        &mut self.0
362    }
363
364    /// Get the database as a refs db.
365    pub fn refs(&self) -> &impl node::refs::Store {
366        &self.0
367    }
368
369    /// Get the database as a refs db, mutably.
370    pub fn refs_mut(&mut self) -> &mut impl node::refs::Store {
371        &mut self.0
372    }
373}
374
375impl<D> AsMut<D> for Stores<D> {
376    fn as_mut(&mut self) -> &mut D {
377        &mut self.0
378    }
379}
380
381impl<D> From<D> for Stores<D> {
382    fn from(db: D) -> Self {
383        Self(db)
384    }
385}
386
387/// The node service.
388#[derive(Debug)]
389pub struct Service<D, S, G> {
390    /// Service configuration.
391    config: Config,
392    /// Our cryptographic signer and key.
393    signer: Device<G>,
394    /// Project storage.
395    storage: S,
396    /// Node database.
397    db: Stores<D>,
398    /// Policy configuration.
399    policies: policy::Config<Write>,
400    /// Peer sessions, currently or recently connected.
401    sessions: Sessions,
402    /// Clock. Tells the time.
403    clock: LocalTime,
404    /// Who relayed what announcement to us. We keep track of this to ensure that
405    /// we don't relay messages to nodes that already know about these messages.
406    relayed_by: HashMap<gossip::AnnouncementId, Vec<NodeId>>,
407    /// I/O outbox.
408    outbox: Outbox,
409    /// Cached local node announcement.
410    node: NodeAnnouncement,
411    /// Cached local inventory announcement.
412    inventory: InventoryAnnouncement,
413    /// Source of entropy.
414    rng: Rng,
415    /// Ongoing fetches.
416    fetching: HashMap<RepoId, FetchState>,
417    /// Request/connection rate limiter.
418    limiter: RateLimiter,
419    /// Current seeded repositories bloom filter.
420    filter: Filter,
421    /// Last time the service was idle.
422    last_idle: LocalTime,
423    /// Last time the gossip messages were relayed.
424    last_gossip: LocalTime,
425    /// Last time the service synced.
426    last_sync: LocalTime,
427    /// Last time the service routing table was pruned.
428    last_prune: LocalTime,
429    /// Last time the announcement task was run.
430    last_announce: LocalTime,
431    /// Timestamp of last local inventory announced.
432    last_inventory: LocalTime,
433    /// Last timestamp used for announcements.
434    last_timestamp: Timestamp,
435    /// Time when the service was initialized, or `None` if it wasn't initialized.
436    started_at: Option<LocalTime>,
437    /// Time when the service was last online, or `None` if this is the first time.
438    last_online_at: Option<LocalTime>,
439    /// Publishes events to subscribers.
440    emitter: Emitter<Event>,
441    /// Local listening addresses.
442    listening: Vec<net::SocketAddr>,
443    /// Latest metrics for all nodes connected to since the last start.
444    metrics: Metrics,
445}
446
447impl<D, S, G> Service<D, S, G> {
448    /// Get the local node id.
449    pub fn node_id(&self) -> NodeId {
450        *self.signer.public_key()
451    }
452
453    /// Get the local service time.
454    pub fn local_time(&self) -> LocalTime {
455        self.clock
456    }
457
458    pub fn emitter(&self) -> Emitter<Event> {
459        self.emitter.clone()
460    }
461}
462
463impl<D, S, G> Service<D, S, G>
464where
465    D: Store,
466    S: ReadStorage + 'static,
467    G: crypto::signature::Signer<crypto::Signature>,
468{
469    pub fn new(
470        config: Config,
471        db: Stores<D>,
472        storage: S,
473        policies: policy::Config<Write>,
474        signer: Device<G>,
475        rng: Rng,
476        node: NodeAnnouncement,
477        emitter: Emitter<Event>,
478    ) -> Self {
479        let sessions = Sessions::new(rng.clone());
480        let limiter = RateLimiter::new(config.peers());
481        let last_timestamp = node.timestamp;
482        let clock = LocalTime::default(); // Updated on initialize.
483        let inventory = gossip::inventory(clock.into(), []); // Updated on initialize.
484
485        Self {
486            config,
487            storage,
488            policies,
489            signer,
490            rng,
491            inventory,
492            node,
493            clock,
494            db,
495            outbox: Outbox::default(),
496            limiter,
497            sessions,
498            fetching: HashMap::new(),
499            filter: Filter::empty(),
500            relayed_by: HashMap::default(),
501            last_idle: LocalTime::default(),
502            last_gossip: LocalTime::default(),
503            last_sync: LocalTime::default(),
504            last_prune: LocalTime::default(),
505            last_timestamp,
506            last_announce: LocalTime::default(),
507            last_inventory: LocalTime::default(),
508            started_at: None,     // Updated on initialize.
509            last_online_at: None, // Updated on initialize.
510            emitter,
511            listening: vec![],
512            metrics: Metrics::default(),
513        }
514    }
515
516    /// Whether the service was started (initialized) and if so, at what time.
517    pub fn started(&self) -> Option<LocalTime> {
518        self.started_at
519    }
520
521    /// Return the next i/o action to execute.
522    #[allow(clippy::should_implement_trait)]
523    pub fn next(&mut self) -> Option<io::Io> {
524        self.outbox.next()
525    }
526
527    /// Seed a repository.
528    /// Returns whether or not the repo policy was updated.
529    pub fn seed(&mut self, id: &RepoId, scope: Scope) -> Result<bool, policy::Error> {
530        let updated = self.policies.seed(id, scope)?;
531        self.filter.insert(id);
532
533        Ok(updated)
534    }
535
536    /// Unseed a repository.
537    /// Returns whether or not the repo policy was updated.
538    /// Note that when unseeding, we don't announce anything to the network. This is because by
539    /// simply not announcing it anymore, it will eventually be pruned by nodes.
540    pub fn unseed(&mut self, id: &RepoId) -> Result<bool, policy::Error> {
541        let updated = self.policies.unseed(id)?;
542
543        if updated {
544            // Nb. This is potentially slow if we have lots of repos. We should probably
545            // only re-compute the filter when we've unseeded a certain amount of repos
546            // and the filter is really out of date.
547            //
548            // TODO: Share this code with initialization code.
549            self.filter = Filter::new(
550                self.policies
551                    .seed_policies()?
552                    .filter_map(|t| (t.policy.is_allow()).then_some(t.rid)),
553            );
554            // Update and announce new inventory.
555            if let Err(e) = self.remove_inventory(id) {
556                error!(target: "service", "Error updating inventory after unseed: {e}");
557            }
558        }
559        Ok(updated)
560    }
561
562    /// Find the closest `n` peers by proximity in seeding graphs.
563    /// Returns a sorted list from the closest peer to the furthest.
564    /// Peers with more seedings in common score score higher.
565    #[allow(unused)]
566    pub fn closest_peers(&self, n: usize) -> Vec<NodeId> {
567        todo!()
568    }
569
570    /// Get the database.
571    pub fn database(&self) -> &Stores<D> {
572        &self.db
573    }
574
575    /// Get the mutable database.
576    pub fn database_mut(&mut self) -> &mut Stores<D> {
577        &mut self.db
578    }
579
580    /// Get the storage instance.
581    pub fn storage(&self) -> &S {
582        &self.storage
583    }
584
585    /// Get the mutable storage instance.
586    pub fn storage_mut(&mut self) -> &mut S {
587        &mut self.storage
588    }
589
590    /// Get the node policies.
591    pub fn policies(&self) -> &policy::Config<Write> {
592        &self.policies
593    }
594
595    /// Get the local signer.
596    pub fn signer(&self) -> &Device<G> {
597        &self.signer
598    }
599
600    /// Subscriber to inner `Emitter` events.
601    pub fn events(&mut self) -> Events {
602        Events::from(self.emitter.subscribe())
603    }
604
605    /// Get I/O outbox.
606    pub fn outbox(&mut self) -> &mut Outbox {
607        &mut self.outbox
608    }
609
610    /// Get configuration.
611    pub fn config(&self) -> &Config {
612        &self.config
613    }
614
615    /// Lookup a repository, both locally and in the routing table.
616    pub fn lookup(&self, rid: RepoId) -> Result<Lookup, LookupError> {
617        let this = self.nid();
618        let local = self.storage.get(rid)?;
619        let remote = self
620            .db
621            .routing()
622            .get(&rid)?
623            .iter()
624            .filter(|nid| nid != &this)
625            .cloned()
626            .collect();
627
628        Ok(Lookup { local, remote })
629    }
630
631    /// Initialize service with current time. Call this once.
632    pub fn initialize(&mut self, time: LocalTime) -> Result<(), Error> {
633        debug!(target: "service", "Init @{}", time.as_millis());
634        assert_ne!(time, LocalTime::default());
635
636        let nid = self.node_id();
637
638        self.clock = time;
639        self.started_at = Some(time);
640        self.last_online_at = match self.db.gossip().last() {
641            Ok(Some(last)) => Some(last.to_local_time()),
642            Ok(None) => None,
643            Err(e) => {
644                error!(target: "service", "Error getting the lastest gossip message from db: {e}");
645                None
646            }
647        };
648
649        // Populate refs database. This is only useful as part of the upgrade process for nodes
650        // that have been online since before the refs database was created.
651        match self.db.refs().count() {
652            Ok(0) => {
653                info!(target: "service", "Empty refs database, populating from storage..");
654                if let Err(e) = self.db.refs_mut().populate(&self.storage) {
655                    error!(target: "service", "Failed to populate refs database: {e}");
656                }
657            }
658            Ok(n) => debug!(target: "service", "Refs database has {n} cached references"),
659            Err(e) => error!(target: "service", "Error checking refs database: {e}"),
660        }
661
662        let announced = self
663            .db
664            .seeds()
665            .seeded_by(&nid)?
666            .collect::<Result<HashMap<_, _>, _>>()?;
667        let mut inventory = BTreeSet::new();
668        let mut private = BTreeSet::new();
669
670        for repo in self.storage.repositories()? {
671            let rid = repo.rid;
672
673            // If we're not seeding this repo, just skip it.
674            if !self.policies.is_seeding(&rid)? {
675                warn!(target: "service", "Local repository {rid} is not seeded");
676                continue;
677            }
678            // Add public repositories to inventory.
679            if repo.doc.is_public() {
680                inventory.insert(rid);
681            } else {
682                private.insert(rid);
683            }
684            // If we have no owned refs for this repo, then there's nothing to announce.
685            let Some(updated_at) = repo.synced_at else {
686                continue;
687            };
688            // Skip this repo if the sync status matches what we have in storage.
689            if let Some(announced) = announced.get(&rid) {
690                if updated_at.oid == announced.oid {
691                    continue;
692                }
693            }
694            // Make sure our local node's sync status is up to date with storage.
695            if self.db.seeds_mut().synced(
696                &rid,
697                &nid,
698                updated_at.oid,
699                updated_at.timestamp.into(),
700            )? {
701                debug!(target: "service", "Saved local sync status for {rid}..");
702            }
703            // If we got here, it likely means a repo was updated while the node was stopped.
704            // Therefore, we pre-load a refs announcement for this repo, so that it is included in
705            // the historical gossip messages when a node connects and subscribes to this repo.
706            if let Ok((ann, _)) = self.refs_announcement_for(rid, [nid]) {
707                debug!(target: "service", "Adding refs announcement for {rid} to historical gossip messages..");
708                self.db.gossip_mut().announced(&nid, &ann)?;
709            }
710        }
711
712        // Ensure that our inventory is recorded in our routing table, and we are seeding
713        // all of it. It can happen that inventory is not properly seeded if for eg. the
714        // user creates a new repository while the node is stopped.
715        self.db
716            .routing_mut()
717            .add_inventory(inventory.iter(), nid, time.into())?;
718        self.inventory = gossip::inventory(self.timestamp(), inventory);
719
720        // Ensure that private repositories are not in our inventory. It's possible that
721        // a repository was public and then it was made private.
722        self.db
723            .routing_mut()
724            .remove_inventories(private.iter(), &nid)?;
725
726        // Setup subscription filter for seeded repos.
727        self.filter = Filter::new(
728            self.policies
729                .seed_policies()?
730                .filter_map(|t| (t.policy.is_allow()).then_some(t.rid)),
731        );
732        // Connect to configured peers.
733        let addrs = self.config.connect.clone();
734        for (id, addr) in addrs.into_iter().map(|ca| ca.into()) {
735            self.connect(id, addr);
736        }
737        // Try to establish some connections.
738        self.maintain_connections();
739        // Start periodic tasks.
740        self.outbox.wakeup(IDLE_INTERVAL);
741        self.outbox.wakeup(GOSSIP_INTERVAL);
742
743        Ok(())
744    }
745
746    pub fn tick(&mut self, now: LocalTime, metrics: &Metrics) {
747        trace!(
748            target: "service",
749            "Tick +{}",
750            now - self.started_at.expect("Service::tick: service must be initialized")
751        );
752        if now >= self.clock {
753            self.clock = now;
754        } else {
755            // Nb. In tests, we often move the clock forwards in time to test different behaviors,
756            // so this warning isn't applicable there.
757            #[cfg(not(test))]
758            warn!(
759                target: "service",
760                "System clock is not monotonic: {now} is not greater or equal to {}", self.clock
761            );
762        }
763        self.metrics = metrics.clone();
764    }
765
766    pub fn wake(&mut self) {
767        let now = self.clock;
768
769        trace!(
770            target: "service",
771            "Wake +{}",
772            now - self.started_at.expect("Service::wake: service must be initialized")
773        );
774
775        if now - self.last_idle >= IDLE_INTERVAL {
776            trace!(target: "service", "Running 'idle' task...");
777
778            self.keep_alive(&now);
779            self.disconnect_unresponsive_peers(&now);
780            self.idle_connections();
781            self.maintain_connections();
782            self.dequeue_fetches();
783            self.outbox.wakeup(IDLE_INTERVAL);
784            self.last_idle = now;
785        }
786        if now - self.last_gossip >= GOSSIP_INTERVAL {
787            trace!(target: "service", "Running 'gossip' task...");
788
789            if let Err(e) = self.relay_announcements() {
790                error!(target: "service", "Error relaying stored announcements: {e}");
791            }
792            self.outbox.wakeup(GOSSIP_INTERVAL);
793            self.last_gossip = now;
794        }
795        if now - self.last_sync >= SYNC_INTERVAL {
796            trace!(target: "service", "Running 'sync' task...");
797
798            if let Err(e) = self.fetch_missing_repositories() {
799                error!(target: "service", "Error fetching missing inventory: {e}");
800            }
801            self.outbox.wakeup(SYNC_INTERVAL);
802            self.last_sync = now;
803        }
804        if now - self.last_announce >= ANNOUNCE_INTERVAL {
805            trace!(target: "service", "Running 'announce' task...");
806
807            self.announce_inventory();
808            self.outbox.wakeup(ANNOUNCE_INTERVAL);
809            self.last_announce = now;
810        }
811        if now - self.last_prune >= PRUNE_INTERVAL {
812            trace!(target: "service", "Running 'prune' task...");
813
814            if let Err(err) = self.prune_routing_entries(&now) {
815                error!(target: "service", "Error pruning routing entries: {err}");
816            }
817            if let Err(err) = self
818                .db
819                .gossip_mut()
820                .prune((now - self.config.limits.gossip_max_age).into())
821            {
822                error!(target: "service", "Error pruning gossip entries: {err}");
823            }
824
825            self.outbox.wakeup(PRUNE_INTERVAL);
826            self.last_prune = now;
827        }
828
829        // Always check whether there are persistent peers that need reconnecting.
830        self.maintain_persistent();
831    }
832
833    pub fn command(&mut self, cmd: Command) {
834        info!(target: "service", "Received command {:?}", cmd);
835
836        match cmd {
837            Command::Connect(nid, addr, opts) => {
838                if opts.persistent {
839                    self.config.connect.insert((nid, addr.clone()).into());
840                }
841                if !self.connect(nid, addr) {
842                    // TODO: Return error to command.
843                }
844            }
845            Command::Disconnect(nid) => {
846                self.outbox.disconnect(nid, DisconnectReason::Command);
847            }
848            Command::Config(resp) => {
849                resp.send(self.config.clone()).ok();
850            }
851            Command::ListenAddrs(resp) => {
852                resp.send(self.listening.clone()).ok();
853            }
854            Command::Seeds(rid, resp) => match self.seeds(&rid) {
855                Ok(seeds) => {
856                    let (connected, disconnected) = seeds.partition();
857                    debug!(
858                        target: "service",
859                        "Found {} connected seed(s) and {} disconnected seed(s) for {}",
860                        connected.len(), disconnected.len(),  rid
861                    );
862                    resp.send(seeds).ok();
863                }
864                Err(e) => {
865                    error!(target: "service", "Error getting seeds for {rid}: {e}");
866                }
867            },
868            Command::Fetch(rid, seed, timeout, resp) => {
869                self.fetch(rid, seed, timeout, Some(resp));
870            }
871            Command::Seed(rid, scope, resp) => {
872                // Update our seeding policy.
873                let seeded = self
874                    .seed(&rid, scope)
875                    .expect("Service::command: error seeding repository");
876                resp.send(seeded).ok();
877
878                // Let all our peers know that we're interested in this repo from now on.
879                self.outbox.broadcast(
880                    Message::subscribe(self.filter(), self.clock.into(), Timestamp::MAX),
881                    self.sessions.connected().map(|(_, s)| s),
882                );
883            }
884            Command::Unseed(id, resp) => {
885                let updated = self
886                    .unseed(&id)
887                    .expect("Service::command: error unseeding repository");
888                resp.send(updated).ok();
889            }
890            Command::Follow(id, alias, resp) => {
891                let seeded = self
892                    .policies
893                    .follow(&id, alias.as_ref())
894                    .expect("Service::command: error following node");
895                resp.send(seeded).ok();
896            }
897            Command::Unfollow(id, resp) => {
898                let updated = self
899                    .policies
900                    .unfollow(&id)
901                    .expect("Service::command: error unfollowing node");
902                resp.send(updated).ok();
903            }
904            Command::AnnounceRefs(id, resp) => {
905                let doc = match self.storage.get(id) {
906                    Ok(Some(doc)) => doc,
907                    Ok(None) => {
908                        error!(target: "service", "Error announcing refs: repository {id} not found");
909                        return;
910                    }
911                    Err(e) => {
912                        error!(target: "service", "Error announcing refs: doc error: {e}");
913                        return;
914                    }
915                };
916
917                match self.announce_own_refs(id, doc) {
918                    Ok(refs) => match refs.as_slice() {
919                        &[refs] => {
920                            resp.send(refs).ok();
921                        }
922                        // SAFETY: Since we passed in one NID, we should get exactly one item back.
923                        [..] => panic!("Service::command: unexpected refs returned"),
924                    },
925                    Err(err) => {
926                        error!(target: "service", "Error announcing refs: {err}");
927                    }
928                }
929            }
930            Command::AnnounceInventory => {
931                self.announce_inventory();
932            }
933            Command::AddInventory(rid, resp) => match self.add_inventory(rid) {
934                Ok(updated) => {
935                    resp.send(updated).ok();
936                }
937                Err(e) => {
938                    error!(target: "service", "Error adding {rid} to inventory: {e}");
939                }
940            },
941            Command::QueryState(query, sender) => {
942                sender.send(query(self)).ok();
943            }
944        }
945    }
946
947    /// Initiate an outgoing fetch for some repository, based on another node's announcement.
948    /// Returns `true` if the fetch was initiated and `false` if it was skipped.
949    fn fetch_refs_at(
950        &mut self,
951        rid: RepoId,
952        from: NodeId,
953        refs: NonEmpty<RefsAt>,
954        scope: Scope,
955        timeout: time::Duration,
956        channel: Option<chan::Sender<FetchResult>>,
957    ) -> bool {
958        match self.refs_status_of(rid, refs, &scope) {
959            Ok(status) => {
960                if status.want.is_empty() {
961                    debug!(target: "service", "Skipping fetch for {rid}, all refs are already in storage");
962                } else {
963                    return self._fetch(rid, from, status.want, timeout, channel);
964                }
965            }
966            Err(e) => {
967                error!(target: "service", "Error getting the refs status of {rid}: {e}");
968            }
969        }
970        // We didn't try to fetch anything.
971        false
972    }
973
974    /// Initiate an outgoing fetch for some repository.
975    fn fetch(
976        &mut self,
977        rid: RepoId,
978        from: NodeId,
979        timeout: time::Duration,
980        channel: Option<chan::Sender<FetchResult>>,
981    ) -> bool {
982        self._fetch(rid, from, vec![], timeout, channel)
983    }
984
985    fn _fetch(
986        &mut self,
987        rid: RepoId,
988        from: NodeId,
989        refs_at: Vec<RefsAt>,
990        timeout: time::Duration,
991        channel: Option<chan::Sender<FetchResult>>,
992    ) -> bool {
993        match self.try_fetch(rid, &from, refs_at.clone(), timeout) {
994            Ok(fetching) => {
995                if let Some(c) = channel {
996                    fetching.subscribe(c);
997                }
998                return true;
999            }
1000            Err(TryFetchError::AlreadyFetching(fetching)) => {
1001                // If we're already fetching the same refs from the requested peer, there's nothing
1002                // to do, we simply add the supplied channel to the list of subscribers so that it
1003                // is notified on completion. Otherwise, we queue a fetch with the requested peer.
1004                if fetching.from == from && fetching.refs_at == refs_at {
1005                    debug!(target: "service", "Ignoring redundant fetch of {rid} from {from}");
1006
1007                    if let Some(c) = channel {
1008                        fetching.subscribe(c);
1009                    }
1010                } else {
1011                    let fetch = QueuedFetch {
1012                        rid,
1013                        refs_at,
1014                        from,
1015                        timeout,
1016                        channel,
1017                    };
1018                    debug!(target: "service", "Queueing fetch for {rid} with {from} (already fetching)..");
1019
1020                    self.queue_fetch(fetch);
1021                }
1022            }
1023            Err(TryFetchError::SessionCapacityReached) => {
1024                debug!(target: "service", "Fetch capacity reached for {from}, queueing {rid}..");
1025                self.queue_fetch(QueuedFetch {
1026                    rid,
1027                    refs_at,
1028                    from,
1029                    timeout,
1030                    channel,
1031                });
1032            }
1033            Err(e) => {
1034                if let Some(c) = channel {
1035                    c.send(FetchResult::Failed {
1036                        reason: e.to_string(),
1037                    })
1038                    .ok();
1039                }
1040            }
1041        }
1042        false
1043    }
1044
1045    fn queue_fetch(&mut self, fetch: QueuedFetch) {
1046        let Some(s) = self.sessions.get_mut(&fetch.from) else {
1047            log::error!(target: "service", "Cannot queue fetch for unknown session {}", fetch.from);
1048            return;
1049        };
1050        if let Err(e) = s.queue_fetch(fetch) {
1051            let fetch = e.inner();
1052            log::debug!(target: "service", "Unable to queue fetch for {} with {}: {e}", &fetch.rid, &fetch.from);
1053        }
1054    }
1055
1056    // TODO: Buffer/throttle fetches.
1057    fn try_fetch(
1058        &mut self,
1059        rid: RepoId,
1060        from: &NodeId,
1061        refs_at: Vec<RefsAt>,
1062        timeout: time::Duration,
1063    ) -> Result<&mut FetchState, TryFetchError> {
1064        let from = *from;
1065        let Some(session) = self.sessions.get_mut(&from) else {
1066            return Err(TryFetchError::SessionNotConnected);
1067        };
1068        let fetching = self.fetching.entry(rid);
1069
1070        trace!(target: "service", "Trying to fetch {refs_at:?} for {rid}..");
1071
1072        let fetching = match fetching {
1073            Entry::Vacant(fetching) => fetching,
1074            Entry::Occupied(fetching) => {
1075                // We're already fetching this repo from some peer.
1076                return Err(TryFetchError::AlreadyFetching(fetching.into_mut()));
1077            }
1078        };
1079        // Sanity check: We shouldn't be fetching from this session, since we return above if we're
1080        // fetching from any session.
1081        debug_assert!(!session.is_fetching(&rid));
1082
1083        if !session.is_connected() {
1084            // This can happen if a session disconnects in the time between asking for seeds to
1085            // fetch from, and initiating the fetch from one of those seeds.
1086            return Err(TryFetchError::SessionNotConnected);
1087        }
1088        if session.is_at_capacity() {
1089            // If we're already fetching multiple repos from this peer.
1090            return Err(TryFetchError::SessionCapacityReached);
1091        }
1092
1093        let fetching = fetching.insert(FetchState {
1094            from,
1095            refs_at: refs_at.clone(),
1096            subscribers: vec![],
1097        });
1098        self.outbox.fetch(
1099            session,
1100            rid,
1101            refs_at,
1102            timeout,
1103            self.config.limits.fetch_pack_receive,
1104        );
1105
1106        Ok(fetching)
1107    }
1108
1109    pub fn fetched(
1110        &mut self,
1111        rid: RepoId,
1112        remote: NodeId,
1113        result: Result<fetch::FetchResult, FetchError>,
1114    ) {
1115        let Some(fetching) = self.fetching.remove(&rid) else {
1116            error!(target: "service", "Received unexpected fetch result for {rid}, from {remote}");
1117            return;
1118        };
1119        debug_assert_eq!(fetching.from, remote);
1120
1121        if let Some(s) = self.sessions.get_mut(&remote) {
1122            // Mark this RID as fetched for this session.
1123            s.fetched(rid);
1124        }
1125
1126        // Notify all fetch subscribers of the fetch result. This is used when the user requests
1127        // a fetch via the CLI, for example.
1128        for sub in &fetching.subscribers {
1129            debug!(target: "service", "Found existing fetch request from {remote}, sending result..");
1130
1131            let result = match &result {
1132                Ok(success) => FetchResult::Success {
1133                    updated: success.updated.clone(),
1134                    namespaces: success.namespaces.clone(),
1135                    clone: success.clone,
1136                },
1137                Err(e) => FetchResult::Failed {
1138                    reason: e.to_string(),
1139                },
1140            };
1141            if sub.send(result).is_err() {
1142                error!(target: "service", "Error sending fetch result for {rid} from {remote}..");
1143            } else {
1144                debug!(target: "service", "Sent fetch result for {rid} from {remote}..");
1145            }
1146        }
1147
1148        match result {
1149            Ok(fetch::FetchResult {
1150                updated,
1151                namespaces,
1152                clone,
1153                doc,
1154            }) => {
1155                info!(target: "service", "Fetched {rid} from {remote} successfully");
1156                // Update our routing table in case this fetch was user-initiated and doesn't
1157                // come from an announcement.
1158                self.seed_discovered(rid, remote, self.clock.into());
1159
1160                for update in &updated {
1161                    if update.is_skipped() {
1162                        trace!(target: "service", "Ref skipped: {update} for {rid}");
1163                    } else {
1164                        debug!(target: "service", "Ref updated: {update} for {rid}");
1165                    }
1166                }
1167                self.emitter.emit(Event::RefsFetched {
1168                    remote,
1169                    rid,
1170                    updated: updated.clone(),
1171                });
1172
1173                // Announce our new inventory if this fetch was a full clone.
1174                // Only update and announce inventory for public repositories.
1175                if clone && doc.is_public() {
1176                    debug!(target: "service", "Updating and announcing inventory for cloned repository {rid}..");
1177
1178                    if let Err(e) = self.add_inventory(rid) {
1179                        error!(target: "service", "Error announcing inventory for {rid}: {e}");
1180                    }
1181                }
1182
1183                // It's possible for a fetch to succeed but nothing was updated.
1184                if updated.is_empty() || updated.iter().all(|u| u.is_skipped()) {
1185                    debug!(target: "service", "Nothing to announce, no refs were updated..");
1186                } else {
1187                    // Finally, announce the refs. This is useful for nodes to know what we've synced,
1188                    // beyond just knowing that we have added an item to our inventory.
1189                    if let Err(e) = self.announce_refs(rid, doc.into(), namespaces) {
1190                        error!(target: "service", "Failed to announce new refs: {e}");
1191                    }
1192                }
1193            }
1194            Err(err) => {
1195                error!(target: "service", "Fetch failed for {rid} from {remote}: {err}");
1196
1197                // For now, we only disconnect the remote in case of timeout. In the future,
1198                // there may be other reasons to disconnect.
1199                if err.is_timeout() {
1200                    self.outbox.disconnect(remote, DisconnectReason::Fetch(err));
1201                }
1202            }
1203        }
1204        // We can now try to dequeue more fetches.
1205        self.dequeue_fetches();
1206    }
1207
1208    /// Attempt to dequeue fetches from all peers.
1209    /// At most one fetch is dequeued per peer. If the fetch cannot be processed,
1210    /// it is put back on the queue for that peer.
1211    ///
1212    /// Fetches are queued for two reasons:
1213    /// 1. The RID was already being fetched.
1214    /// 2. The session was already at fetch capacity.
1215    pub fn dequeue_fetches(&mut self) {
1216        let sessions = self
1217            .sessions
1218            .shuffled()
1219            .map(|(k, _)| *k)
1220            .collect::<Vec<_>>();
1221
1222        // Try to dequeue once per session.
1223        for nid in sessions {
1224            // SAFETY: All the keys we are iterating on exist.
1225            #[allow(clippy::unwrap_used)]
1226            let sess = self.sessions.get_mut(&nid).unwrap();
1227            if !sess.is_connected() || sess.is_at_capacity() {
1228                continue;
1229            }
1230
1231            if let Some(QueuedFetch {
1232                rid,
1233                from,
1234                refs_at,
1235                timeout,
1236                channel,
1237            }) = sess.dequeue_fetch()
1238            {
1239                debug!(target: "service", "Dequeued fetch for {rid} from session {from}..");
1240
1241                if let Some(refs) = NonEmpty::from_vec(refs_at) {
1242                    let repo_entry = self.policies.seed_policy(&rid).expect(
1243                        "Service::dequeue_fetch: error accessing repo seeding configuration",
1244                    );
1245                    let SeedingPolicy::Allow { scope } = repo_entry.policy else {
1246                        debug!(target: "service", "Repository {rid} is no longer seeded, skipping..");
1247                        continue;
1248                    };
1249                    self.fetch_refs_at(rid, from, refs, scope, timeout, channel);
1250                } else {
1251                    // If no refs are specified, always do a full fetch.
1252                    self.fetch(rid, from, timeout, channel);
1253                }
1254            }
1255        }
1256    }
1257
1258    /// Inbound connection attempt.
1259    pub fn accepted(&mut self, ip: IpAddr) -> bool {
1260        // Always accept localhost connections, even if we already reached
1261        // our inbound connection limit.
1262        if ip.is_loopback() || ip.is_unspecified() {
1263            return true;
1264        }
1265        // Check for inbound connection limit.
1266        if self.sessions.inbound().count() >= self.config.limits.connection.inbound {
1267            return false;
1268        }
1269        match self.db.addresses().is_ip_banned(ip) {
1270            Ok(banned) => {
1271                if banned {
1272                    debug!(target: "service", "Rejecting inbound connection from banned ip {ip}");
1273                    return false;
1274                }
1275            }
1276            Err(e) => error!(target: "service", "Error querying ban status for {ip}: {e}"),
1277        }
1278        let host: HostName = ip.into();
1279
1280        if self.limiter.limit(
1281            host.clone(),
1282            None,
1283            &self.config.limits.rate.inbound,
1284            self.clock,
1285        ) {
1286            trace!(target: "service", "Rate limiting inbound connection from {host}..");
1287            return false;
1288        }
1289        true
1290    }
1291
1292    pub fn attempted(&mut self, nid: NodeId, addr: Address) {
1293        debug!(target: "service", "Attempted connection to {nid} ({addr})");
1294
1295        if let Some(sess) = self.sessions.get_mut(&nid) {
1296            sess.to_attempted();
1297        } else {
1298            #[cfg(debug_assertions)]
1299            panic!("Service::attempted: unknown session {nid}@{addr}");
1300        }
1301    }
1302
1303    pub fn listening(&mut self, local_addr: net::SocketAddr) {
1304        info!(target: "node", "Listening on {local_addr}..");
1305
1306        self.listening.push(local_addr);
1307    }
1308
1309    pub fn connected(&mut self, remote: NodeId, addr: Address, link: Link) {
1310        info!(target: "service", "Connected to {remote} ({addr}) ({link:?})");
1311        self.emitter.emit(Event::PeerConnected { nid: remote });
1312
1313        let msgs = self.initial(link);
1314
1315        if link.is_outbound() {
1316            if let Some(peer) = self.sessions.get_mut(&remote) {
1317                peer.to_connected(self.clock);
1318                self.outbox.write_all(peer, msgs);
1319            }
1320        } else {
1321            match self.sessions.entry(remote) {
1322                Entry::Occupied(mut e) => {
1323                    // In this scenario, it's possible that our peer is persistent, and
1324                    // disconnected. We get an inbound connection before we attempt a re-connection,
1325                    // and therefore we treat it as a regular inbound connection.
1326                    //
1327                    // It's also possible that a disconnection hasn't gone through yet and our
1328                    // peer is still in connected state here, while a new inbound connection from
1329                    // that same peer is made. This results in a new connection from a peer that is
1330                    // already connected from the perspective of the service. This appears to be
1331                    // a bug in the underlying networking library.
1332                    let peer = e.get_mut();
1333                    debug!(
1334                        target: "service",
1335                        "Connecting peer {remote} already has a session open ({peer})"
1336                    );
1337                    peer.link = link;
1338                    peer.to_connected(self.clock);
1339                    self.outbox.write_all(peer, msgs);
1340                }
1341                Entry::Vacant(e) => {
1342                    if let HostName::Ip(ip) = addr.host {
1343                        if !address::is_local(&ip) {
1344                            if let Err(e) =
1345                                self.db
1346                                    .addresses_mut()
1347                                    .record_ip(&remote, ip, self.clock.into())
1348                            {
1349                                log::error!(target: "service", "Error recording IP address for {remote}: {e}");
1350                            }
1351                        }
1352                    }
1353                    let peer = e.insert(Session::inbound(
1354                        remote,
1355                        addr,
1356                        self.config.is_persistent(&remote),
1357                        self.rng.clone(),
1358                        self.clock,
1359                        self.config.limits.clone(),
1360                    ));
1361                    self.outbox.write_all(peer, msgs);
1362                }
1363            }
1364        }
1365    }
1366
1367    pub fn disconnected(&mut self, remote: NodeId, link: Link, reason: &DisconnectReason) {
1368        let since = self.local_time();
1369        let Some(session) = self.sessions.get_mut(&remote) else {
1370            // Since we sometimes disconnect the service eagerly, it's not unusual to get a second
1371            // disconnection event once the transport is dropped.
1372            trace!(target: "service", "Redundant disconnection for {} ({})", remote, reason);
1373            return;
1374        };
1375        // In cases of connection conflicts, there may be disconnections of one of the two
1376        // connections. In that case we don't want the service to remove the session.
1377        if session.link != link {
1378            return;
1379        }
1380
1381        info!(target: "service", "Disconnected from {} ({})", remote, reason);
1382        self.emitter.emit(Event::PeerDisconnected {
1383            nid: remote,
1384            reason: reason.to_string(),
1385        });
1386
1387        let link = session.link;
1388        let addr = session.addr.clone();
1389
1390        self.fetching.retain(|_, fetching| {
1391            if fetching.from != remote {
1392                return true;
1393            }
1394            // Remove and fail any pending fetches from this remote node.
1395            for resp in &fetching.subscribers {
1396                resp.send(FetchResult::Failed {
1397                    reason: format!("disconnected: {reason}"),
1398                })
1399                .ok();
1400            }
1401            false
1402        });
1403
1404        // Attempt to re-connect to persistent peers.
1405        if self.config.peer(&remote).is_some() {
1406            let delay = LocalDuration::from_secs(2u64.saturating_pow(session.attempts() as u32))
1407                .clamp(MIN_RECONNECTION_DELTA, MAX_RECONNECTION_DELTA);
1408
1409            // Nb. We always try to reconnect to persistent peers, even when the error appears
1410            // to not be transient.
1411            session.to_disconnected(since, since + delay);
1412
1413            debug!(target: "service", "Reconnecting to {remote} in {delay}..");
1414
1415            self.outbox.wakeup(delay);
1416        } else {
1417            debug!(target: "service", "Dropping peer {remote}..");
1418            self.sessions.remove(&remote);
1419
1420            let severity = match reason {
1421                DisconnectReason::Dial(_)
1422                | DisconnectReason::Fetch(_)
1423                | DisconnectReason::Connection(_) => {
1424                    if self.is_online() {
1425                        // If we're "online", there's something wrong with this
1426                        // peer connection specifically.
1427                        Severity::Medium
1428                    } else {
1429                        Severity::Low
1430                    }
1431                }
1432                DisconnectReason::Session(e) => e.severity(),
1433                DisconnectReason::Command
1434                | DisconnectReason::Conflict
1435                | DisconnectReason::SelfConnection => Severity::Low,
1436            };
1437
1438            if let Err(e) = self
1439                .db
1440                .addresses_mut()
1441                .disconnected(&remote, &addr, severity)
1442            {
1443                error!(target: "service", "Error updating address store: {e}");
1444            }
1445            // Only re-attempt outbound connections, since we don't care if an inbound connection
1446            // is dropped.
1447            if link.is_outbound() {
1448                self.maintain_connections();
1449            }
1450        }
1451        self.dequeue_fetches();
1452    }
1453
1454    pub fn received_message(&mut self, remote: NodeId, message: Message) {
1455        if let Err(err) = self.handle_message(&remote, message) {
1456            // If there's an error, stop processing messages from this peer.
1457            // However, we still relay messages returned up to this point.
1458            self.outbox
1459                .disconnect(remote, DisconnectReason::Session(err));
1460
1461            // FIXME: The peer should be set in a state such that we don't
1462            // process further messages.
1463        }
1464    }
1465
1466    /// Handle an announcement message.
1467    ///
1468    /// Returns `true` if this announcement should be stored and relayed to connected peers,
1469    /// and `false` if it should not.
1470    pub fn handle_announcement(
1471        &mut self,
1472        relayer: &NodeId,
1473        relayer_addr: &Address,
1474        announcement: &Announcement,
1475    ) -> Result<Option<gossip::AnnouncementId>, session::Error> {
1476        if !announcement.verify() {
1477            return Err(session::Error::Misbehavior);
1478        }
1479        let Announcement {
1480            node: announcer,
1481            message,
1482            ..
1483        } = announcement;
1484
1485        // Ignore our own announcements, in case the relayer sent one by mistake.
1486        if announcer == self.nid() {
1487            return Ok(None);
1488        }
1489        let now = self.clock;
1490        let timestamp = message.timestamp();
1491
1492        // Don't allow messages from too far in the future.
1493        if timestamp.saturating_sub(now.as_millis()) > MAX_TIME_DELTA.as_millis() as u64 {
1494            return Err(session::Error::InvalidTimestamp(timestamp));
1495        }
1496
1497        // We don't process announcements from nodes we don't know, since the node announcement is
1498        // what provides DoS protection.
1499        //
1500        // Note that it's possible to *not* receive the node announcement, but receive the
1501        // subsequent announcements of a node in the case of historical gossip messages requested
1502        // from the `subscribe` message. This can happen if the cut-off time is after the node
1503        // announcement timestamp, but before the other announcements. In that case, we simply
1504        // ignore all announcements of that node until we get a node announcement.
1505        if let AnnouncementMessage::Inventory(_) | AnnouncementMessage::Refs(_) = message {
1506            match self.db.addresses().get(announcer) {
1507                Ok(node) => {
1508                    if node.is_none() {
1509                        debug!(target: "service", "Ignoring announcement from unknown node {announcer} (t={timestamp})");
1510                        return Ok(None);
1511                    }
1512                }
1513                Err(e) => {
1514                    error!(target: "service", "Error looking up node in address book: {e}");
1515                    return Ok(None);
1516                }
1517            }
1518        }
1519
1520        // Discard announcement messages we've already seen, otherwise update our last seen time.
1521        let relay = match self.db.gossip_mut().announced(announcer, announcement) {
1522            Ok(Some(id)) => {
1523                log::debug!(
1524                    target: "service",
1525                    "Stored announcement from {announcer} to be broadcast in {} (t={timestamp})",
1526                    (self.last_gossip + GOSSIP_INTERVAL) - self.clock
1527                );
1528                // Keep track of who relayed the message for later.
1529                self.relayed_by.entry(id).or_default().push(*relayer);
1530
1531                // Decide whether or not to relay this message, if it's fresh.
1532                // To avoid spamming peers on startup with historical gossip messages,
1533                // don't relay messages that are too old. We make an exception for node announcements,
1534                // since they are cached, and will hence often carry old timestamps.
1535                let relay = message.is_node_announcement()
1536                    || now - timestamp.to_local_time() <= MAX_TIME_DELTA;
1537                relay.then_some(id)
1538            }
1539            Ok(None) => {
1540                // FIXME: Still mark as relayed by this peer.
1541                // FIXME: Refs announcements should not be delayed, since they are only sent
1542                // to subscribers.
1543                debug!(target: "service", "Ignoring stale announcement from {announcer} (t={timestamp})");
1544                return Ok(None);
1545            }
1546            Err(e) => {
1547                error!(target: "service", "Error updating gossip entry from {announcer}: {e}");
1548                return Ok(None);
1549            }
1550        };
1551
1552        match message {
1553            // Process a peer inventory update announcement by (maybe) fetching.
1554            AnnouncementMessage::Inventory(message) => {
1555                self.emitter.emit(Event::InventoryAnnounced {
1556                    nid: *announcer,
1557                    inventory: message.inventory.to_vec(),
1558                    timestamp: message.timestamp,
1559                });
1560                match self.sync_routing(
1561                    message.inventory.iter().cloned(),
1562                    *announcer,
1563                    message.timestamp,
1564                ) {
1565                    Ok(synced) => {
1566                        if synced.is_empty() {
1567                            trace!(target: "service", "No routes updated by inventory announcement from {announcer}");
1568                            return Ok(None);
1569                        }
1570                    }
1571                    Err(e) => {
1572                        error!(target: "service", "Error processing inventory from {announcer}: {e}");
1573                        return Ok(None);
1574                    }
1575                }
1576                let mut missing = Vec::new();
1577                let nid = *self.nid();
1578
1579                // Here we handle the special case where the inventory we received is that of
1580                // a connected peer, as opposed to being relayed to us.
1581                if let Some(sess) = self.sessions.get_mut(announcer) {
1582                    for id in message.inventory.as_slice() {
1583                        // If we are connected to the announcer of this inventory, update the peer's
1584                        // subscription filter to include all inventory items. This way, we'll
1585                        // relay messages relating to the peer's inventory.
1586                        if let Some(sub) = &mut sess.subscribe {
1587                            sub.filter.insert(id);
1588                        }
1589
1590                        // If we're seeding and connected to the announcer, and we don't have
1591                        // the inventory, fetch it from the announcer.
1592                        if self.policies.is_seeding(id).expect(
1593                            "Service::handle_announcement: error accessing seeding configuration",
1594                        ) {
1595                            // Only if we do not have the repository locally do we fetch here.
1596                            // If we do have it, only fetch after receiving a ref announcement.
1597                            match self.db.routing().entry(id, &nid) {
1598                                Ok(entry) => {
1599                                    if entry.is_none() {
1600                                        missing.push(*id);
1601                                    }
1602                                }
1603                                Err(e) => error!(
1604                                    target: "service",
1605                                    "Error checking local inventory for {id}: {e}"
1606                                ),
1607                            }
1608                        }
1609                    }
1610                }
1611                // Since we have limited fetch capacity, it may be that we can't fetch an entire
1612                // inventory from a peer. Therefore we randomize the order of the RIDs to fetch
1613                // different RIDs from different peers in case multiple peers announce the same
1614                // RIDs.
1615                self.rng.shuffle(&mut missing);
1616
1617                for rid in missing {
1618                    debug!(target: "service", "Missing seeded inventory {rid}; initiating fetch..");
1619                    self.fetch(rid, *announcer, FETCH_TIMEOUT, None);
1620                }
1621                return Ok(relay);
1622            }
1623            AnnouncementMessage::Refs(message) => {
1624                self.emitter.emit(Event::RefsAnnounced {
1625                    nid: *announcer,
1626                    rid: message.rid,
1627                    refs: message.refs.to_vec(),
1628                    timestamp: message.timestamp,
1629                });
1630                // Empty announcements can be safely ignored.
1631                let Some(refs) = NonEmpty::from_vec(message.refs.to_vec()) else {
1632                    debug!(target: "service", "Skipping fetch, no refs in announcement for {} (t={timestamp})", message.rid);
1633                    return Ok(None);
1634                };
1635                // We update inventories when receiving ref announcements, as these could come
1636                // from a new repository being initialized.
1637                self.seed_discovered(message.rid, *announcer, message.timestamp);
1638
1639                // Update sync status of announcer for this repo.
1640                if let Some(refs) = refs.iter().find(|r| &r.remote == self.nid()) {
1641                    debug!(
1642                        target: "service",
1643                        "Refs announcement of {announcer} for {} contains our own remote at {} (t={})",
1644                        message.rid, refs.at, message.timestamp
1645                    );
1646                    match self.db.seeds_mut().synced(
1647                        &message.rid,
1648                        announcer,
1649                        refs.at,
1650                        message.timestamp,
1651                    ) {
1652                        Ok(updated) => {
1653                            if updated {
1654                                debug!(
1655                                    target: "service",
1656                                    "Updating sync status of {announcer} for {} to {}",
1657                                    message.rid, refs.at
1658                                );
1659                                self.emitter.emit(Event::RefsSynced {
1660                                    rid: message.rid,
1661                                    remote: *announcer,
1662                                    at: refs.at,
1663                                });
1664                            } else {
1665                                debug!(
1666                                    target: "service",
1667                                    "Sync status of {announcer} was not updated for {}",
1668                                    message.rid,
1669                                );
1670                            }
1671                        }
1672                        Err(e) => {
1673                            error!(target: "service", "Error updating sync status for {}: {e}", message.rid);
1674                        }
1675                    }
1676                }
1677                let repo_entry = self.policies.seed_policy(&message.rid).expect(
1678                    "Service::handle_announcement: error accessing repo seeding configuration",
1679                );
1680                let SeedingPolicy::Allow { scope } = repo_entry.policy else {
1681                    debug!(
1682                        target: "service",
1683                        "Ignoring refs announcement from {announcer}: repository {} isn't seeded (t={timestamp})",
1684                        message.rid
1685                    );
1686                    return Ok(None);
1687                };
1688                // Refs can be relayed by peers who don't have the data in storage,
1689                // therefore we only check whether we are connected to the *announcer*,
1690                // which is required by the protocol to only announce refs it has.
1691                let Some(remote) = self.sessions.get(announcer).cloned() else {
1692                    trace!(
1693                        target: "service",
1694                        "Skipping fetch of {}, no sessions connected to {announcer}",
1695                        message.rid
1696                    );
1697                    return Ok(relay);
1698                };
1699                // Finally, start the fetch.
1700                self.fetch_refs_at(message.rid, remote.id, refs, scope, FETCH_TIMEOUT, None);
1701
1702                return Ok(relay);
1703            }
1704            AnnouncementMessage::Node(
1705                ann @ NodeAnnouncement {
1706                    features,
1707                    addresses,
1708                    ..
1709                },
1710            ) => {
1711                self.emitter.emit(Event::NodeAnnounced {
1712                    nid: *announcer,
1713                    alias: ann.alias.clone(),
1714                    timestamp: ann.timestamp,
1715                    features: *features,
1716                    addresses: addresses.to_vec(),
1717                });
1718                // If this node isn't a seed, we're not interested in adding it
1719                // to our address book, but other nodes may be, so we relay the message anyway.
1720                if !features.has(Features::SEED) {
1721                    return Ok(relay);
1722                }
1723
1724                match self.db.addresses_mut().insert(
1725                    announcer,
1726                    ann.version,
1727                    ann.features,
1728                    &ann.alias,
1729                    ann.work(),
1730                    &ann.agent,
1731                    timestamp,
1732                    addresses
1733                        .iter()
1734                        // Ignore non-routable addresses unless received from a local network
1735                        // peer. This allows the node to function in a local network.
1736                        .filter(|a| a.is_routable() || relayer_addr.is_local())
1737                        .map(|a| KnownAddress::new(a.clone(), address::Source::Peer)),
1738                ) {
1739                    Ok(updated) => {
1740                        // Only relay if we received new information.
1741                        if updated {
1742                            debug!(
1743                                target: "service",
1744                                "Address store entry for node {announcer} updated at {timestamp}"
1745                            );
1746                            return Ok(relay);
1747                        }
1748                    }
1749                    Err(err) => {
1750                        // An error here is due to a fault in our address store.
1751                        error!(target: "service", "Error processing node announcement from {announcer}: {err}");
1752                    }
1753                }
1754            }
1755        }
1756        Ok(None)
1757    }
1758
1759    pub fn handle_info(&mut self, remote: NodeId, info: &Info) -> Result<(), session::Error> {
1760        match info {
1761            // Nb. We don't currently send this message.
1762            Info::RefsAlreadySynced { rid, at } => {
1763                debug!(target: "service", "Refs already synced for {rid} by {remote}");
1764                self.emitter.emit(Event::RefsSynced {
1765                    rid: *rid,
1766                    remote,
1767                    at: *at,
1768                });
1769            }
1770        }
1771
1772        Ok(())
1773    }
1774
1775    pub fn handle_message(
1776        &mut self,
1777        remote: &NodeId,
1778        message: Message,
1779    ) -> Result<(), session::Error> {
1780        let local = self.node_id();
1781        let relay = self.config.is_relay();
1782        let Some(peer) = self.sessions.get_mut(remote) else {
1783            warn!(target: "service", "Session not found for {remote}");
1784            return Ok(());
1785        };
1786        peer.last_active = self.clock;
1787
1788        let limit = match peer.link {
1789            Link::Outbound => &self.config.limits.rate.outbound,
1790            Link::Inbound => &self.config.limits.rate.inbound,
1791        };
1792        if self
1793            .limiter
1794            .limit(peer.addr.clone().into(), Some(remote), limit, self.clock)
1795        {
1796            debug!(target: "service", "Rate limiting message from {remote} ({})", peer.addr);
1797            return Ok(());
1798        }
1799        message.log(log::Level::Debug, remote, Link::Inbound);
1800
1801        let connected = match &mut peer.state {
1802            session::State::Disconnected { .. } => {
1803                debug!(target: "service", "Ignoring message from disconnected peer {}", peer.id);
1804                return Ok(());
1805            }
1806            // In case of a discrepancy between the service state and the state of the underlying
1807            // wire protocol, we may receive a message from a peer that we consider not fully connected
1808            // at the service level. To remedy this, we simply transition the peer to a connected state.
1809            //
1810            // This is not ideal, but until the wire protocol and service are unified, it's the simplest
1811            // solution to converge towards the same state.
1812            session::State::Attempted { .. } | session::State::Initial => {
1813                debug!(target: "service", "Received unexpected message from connecting peer {}", peer.id);
1814                debug!(target: "service", "Transitioning peer {} to 'connected' state", peer.id);
1815
1816                peer.to_connected(self.clock);
1817
1818                None
1819            }
1820            session::State::Connected {
1821                ping, latencies, ..
1822            } => Some((ping, latencies)),
1823        };
1824
1825        trace!(target: "service", "Received message {message:?} from {remote}");
1826
1827        match message {
1828            // Process a peer announcement.
1829            Message::Announcement(ann) => {
1830                let relayer = remote;
1831                let relayer_addr = peer.addr.clone();
1832
1833                if let Some(id) = self.handle_announcement(relayer, &relayer_addr, &ann)? {
1834                    if self.config.is_relay() {
1835                        if let AnnouncementMessage::Inventory(_) = ann.message {
1836                            if let Err(e) = self
1837                                .database_mut()
1838                                .gossip_mut()
1839                                .set_relay(id, gossip::RelayStatus::Relay)
1840                            {
1841                                error!(target: "service", "Error setting relay flag for message: {e}");
1842                                return Ok(());
1843                            }
1844                        } else {
1845                            self.relay(id, ann);
1846                        }
1847                    }
1848                }
1849            }
1850            Message::Subscribe(subscribe) => {
1851                // Filter announcements by interest.
1852                match self
1853                    .db
1854                    .gossip()
1855                    .filtered(&subscribe.filter, subscribe.since, subscribe.until)
1856                {
1857                    Ok(anns) => {
1858                        for ann in anns {
1859                            let ann = match ann {
1860                                Ok(a) => a,
1861                                Err(e) => {
1862                                    error!(target: "service", "Error reading gossip message from store: {e}");
1863                                    continue;
1864                                }
1865                            };
1866                            // Don't send announcements authored by the remote, back to the remote.
1867                            if ann.node == *remote {
1868                                continue;
1869                            }
1870                            // Only send messages if we're a relay, or it's our own messages.
1871                            if relay || ann.node == local {
1872                                self.outbox.write(peer, ann.into());
1873                            }
1874                        }
1875                    }
1876                    Err(e) => {
1877                        error!(target: "service", "Error querying gossip messages from store: {e}");
1878                    }
1879                }
1880                peer.subscribe = Some(subscribe);
1881            }
1882            Message::Info(info) => {
1883                self.handle_info(*remote, &info)?;
1884            }
1885            Message::Ping(Ping { ponglen, .. }) => {
1886                // Ignore pings which ask for too much data.
1887                if ponglen > Ping::MAX_PONG_ZEROES {
1888                    return Ok(());
1889                }
1890                self.outbox.write(
1891                    peer,
1892                    Message::Pong {
1893                        zeroes: ZeroBytes::new(ponglen),
1894                    },
1895                );
1896            }
1897            Message::Pong { zeroes } => {
1898                if let Some((ping, latencies)) = connected {
1899                    if let session::PingState::AwaitingResponse {
1900                        len: ponglen,
1901                        since,
1902                    } = *ping
1903                    {
1904                        if (ponglen as usize) == zeroes.len() {
1905                            *ping = session::PingState::Ok;
1906                            // Keep track of peer latency.
1907                            latencies.push_back(self.clock - since);
1908                            if latencies.len() > MAX_LATENCIES {
1909                                latencies.pop_front();
1910                            }
1911                        }
1912                    }
1913                }
1914            }
1915        }
1916        Ok(())
1917    }
1918
1919    /// A convenient method to check if we should fetch from a `RefsAnnouncement` with `scope`.
1920    fn refs_status_of(
1921        &self,
1922        rid: RepoId,
1923        refs: NonEmpty<RefsAt>,
1924        scope: &policy::Scope,
1925    ) -> Result<RefsStatus, Error> {
1926        let mut refs = RefsStatus::new(rid, refs, self.db.refs())?;
1927        // Check that there's something we want.
1928        if refs.want.is_empty() {
1929            return Ok(refs);
1930        }
1931        // Check scope.
1932        let mut refs = match scope {
1933            policy::Scope::All => refs,
1934            policy::Scope::Followed => match self.policies.namespaces_for(&self.storage, &rid) {
1935                Ok(Namespaces::All) => refs,
1936                Ok(Namespaces::Followed(followed)) => {
1937                    refs.want.retain(|r| followed.contains(&r.remote));
1938                    refs
1939                }
1940                Err(e) => return Err(e.into()),
1941            },
1942        };
1943        // Remove our own remote, we don't want to fetch that.
1944        refs.want.retain(|r| r.remote != self.node_id());
1945
1946        Ok(refs)
1947    }
1948
1949    /// Add a seed to our routing table.
1950    fn seed_discovered(&mut self, rid: RepoId, nid: NodeId, time: Timestamp) {
1951        if let Ok(result) = self.db.routing_mut().add_inventory([&rid], nid, time) {
1952            if let &[(_, InsertResult::SeedAdded)] = result.as_slice() {
1953                self.emitter.emit(Event::SeedDiscovered { rid, nid });
1954                info!(target: "service", "Routing table updated for {} with seed {nid}", rid);
1955            }
1956        }
1957    }
1958
1959    /// Set of initial messages to send to a peer.
1960    fn initial(&mut self, _link: Link) -> Vec<Message> {
1961        let now = self.clock();
1962        let filter = self.filter();
1963
1964        // TODO: Only subscribe to outbound connections, otherwise we will consume too
1965        // much bandwidth.
1966
1967        // If we've been previously connected to the network, we'll have received gossip messages.
1968        // Instead of simply taking the last timestamp we try to ensure we don't miss any
1969        // messages due un-synchronized clocks.
1970        //
1971        // If this is our first connection to the network, we just ask for a fixed backlog
1972        // of messages to get us started.
1973        let since = if let Some(last) = self.last_online_at {
1974            Timestamp::from(last - SUBSCRIBE_BACKLOG_DELTA)
1975        } else {
1976            (*now - INITIAL_SUBSCRIBE_BACKLOG_DELTA).into()
1977        };
1978        debug!(target: "service", "Subscribing to messages since timestamp {since}..");
1979
1980        vec![
1981            Message::node(self.node.clone(), &self.signer),
1982            Message::inventory(self.inventory.clone(), &self.signer),
1983            Message::subscribe(filter, since, Timestamp::MAX),
1984        ]
1985    }
1986
1987    /// Try to guess whether we're online or not.
1988    fn is_online(&self) -> bool {
1989        self.sessions
1990            .connected()
1991            .filter(|(_, s)| s.addr.is_routable() && s.last_active >= self.clock - IDLE_INTERVAL)
1992            .count()
1993            > 0
1994    }
1995
1996    /// Remove a local repository from our inventory.
1997    fn remove_inventory(&mut self, rid: &RepoId) -> Result<bool, Error> {
1998        let node = self.node_id();
1999        let now = self.timestamp();
2000
2001        let removed = self.db.routing_mut().remove_inventory(rid, &node)?;
2002        if removed {
2003            self.refresh_and_announce_inventory(now)?;
2004        }
2005        Ok(removed)
2006    }
2007
2008    /// Add a local repository to our inventory.
2009    fn add_inventory(&mut self, rid: RepoId) -> Result<bool, Error> {
2010        let node = self.node_id();
2011        let now = self.timestamp();
2012
2013        if !self.storage.contains(&rid)? {
2014            error!(target: "service", "Attempt to add non-existing inventory {rid}: repository not found in storage");
2015            return Ok(false);
2016        }
2017        // Add to our local inventory.
2018        let updates = self.db.routing_mut().add_inventory([&rid], node, now)?;
2019        let updated = !updates.is_empty();
2020
2021        if updated {
2022            self.refresh_and_announce_inventory(now)?;
2023        }
2024        Ok(updated)
2025    }
2026
2027    /// Update cached inventory message, and announce new inventory to peers.
2028    fn refresh_and_announce_inventory(&mut self, time: Timestamp) -> Result<(), Error> {
2029        let inventory = self.inventory()?;
2030
2031        self.inventory = gossip::inventory(time, inventory);
2032        self.announce_inventory();
2033
2034        Ok(())
2035    }
2036
2037    /// Get our local inventory.
2038    ///
2039    /// A node's inventory is the advertized list of repositories offered by a node.
2040    ///
2041    /// A node's inventory consists of *public* repositories that are seeded and available locally
2042    /// in the node's storage. We use the routing table as the canonical state of all inventories,
2043    /// including the local node's.
2044    ///
2045    /// When a repository is unseeded, it is also removed from the inventory. Private repositories
2046    /// are *not* part of a node's inventory.
2047    fn inventory(&self) -> Result<HashSet<RepoId>, Error> {
2048        self.db
2049            .routing()
2050            .get_inventory(self.nid())
2051            .map_err(Error::from)
2052    }
2053
2054    /// Process a peer inventory announcement by updating our routing table.
2055    /// This function expects the peer's full inventory, and prunes entries that are not in the
2056    /// given inventory.
2057    fn sync_routing(
2058        &mut self,
2059        inventory: impl IntoIterator<Item = RepoId>,
2060        from: NodeId,
2061        timestamp: Timestamp,
2062    ) -> Result<SyncedRouting, Error> {
2063        let mut synced = SyncedRouting::default();
2064        let included = inventory.into_iter().collect::<BTreeSet<_>>();
2065
2066        for (rid, result) in
2067            self.db
2068                .routing_mut()
2069                .add_inventory(included.iter(), from, timestamp)?
2070        {
2071            match result {
2072                InsertResult::SeedAdded => {
2073                    info!(target: "service", "Routing table updated for {rid} with seed {from}");
2074                    self.emitter.emit(Event::SeedDiscovered { rid, nid: from });
2075
2076                    if self
2077                        .policies
2078                        .is_seeding(&rid)
2079                        .expect("Service::process_inventory: error accessing seeding configuration")
2080                    {
2081                        // TODO: We should fetch here if we're already connected, case this seed has
2082                        // refs we don't have.
2083                    }
2084                    synced.added.push(rid);
2085                }
2086                InsertResult::TimeUpdated => {
2087                    synced.updated.push(rid);
2088                }
2089                InsertResult::NotUpdated => {}
2090            }
2091        }
2092        for rid in self.db.routing().get_inventory(&from)?.into_iter() {
2093            if !included.contains(&rid) {
2094                if self.db.routing_mut().remove_inventory(&rid, &from)? {
2095                    synced.removed.push(rid);
2096                    self.emitter.emit(Event::SeedDropped { rid, nid: from });
2097                }
2098            }
2099        }
2100        Ok(synced)
2101    }
2102
2103    /// Return a refs announcement including the given remotes.
2104    fn refs_announcement_for(
2105        &mut self,
2106        rid: RepoId,
2107        remotes: impl IntoIterator<Item = NodeId>,
2108    ) -> Result<(Announcement, Vec<RefsAt>), Error> {
2109        let repo = self.storage.repository(rid)?;
2110        let timestamp = self.timestamp();
2111        let mut refs = BoundedVec::<_, REF_REMOTE_LIMIT>::new();
2112
2113        for remote_id in remotes.into_iter() {
2114            let refs_at = RefsAt::new(&repo, remote_id)?;
2115
2116            if refs.push(refs_at).is_err() {
2117                warn!(
2118                    target: "service",
2119                    "refs announcement limit ({}) exceeded, peers will see only some of your repository references",
2120                    REF_REMOTE_LIMIT,
2121                );
2122                break;
2123            }
2124        }
2125
2126        let msg = AnnouncementMessage::from(RefsAnnouncement {
2127            rid,
2128            refs: refs.clone(),
2129            timestamp,
2130        });
2131        Ok((msg.signed(&self.signer), refs.into()))
2132    }
2133
2134    /// Announce our own refs for the given repo.
2135    fn announce_own_refs(&mut self, rid: RepoId, doc: Doc) -> Result<Vec<RefsAt>, Error> {
2136        let (refs, timestamp) = self.announce_refs(rid, doc, [self.node_id()])?;
2137
2138        // Update refs database with our signed refs branches.
2139        // This isn't strictly necessary for now, as we only use the database for fetches, and
2140        // we don't fetch our own refs that are announced, but it's for good measure.
2141        if let &[r] = refs.as_slice() {
2142            self.emitter.emit(Event::LocalRefsAnnounced {
2143                rid,
2144                refs: r,
2145                timestamp,
2146            });
2147            if let Err(e) = self.database_mut().refs_mut().set(
2148                &rid,
2149                &r.remote,
2150                &SIGREFS_BRANCH,
2151                r.at,
2152                timestamp.to_local_time(),
2153            ) {
2154                error!(
2155                    target: "service",
2156                    "Error updating refs database for `rad/sigrefs` of {} in {rid}: {e}",
2157                    r.remote
2158                );
2159            }
2160        }
2161        Ok(refs)
2162    }
2163
2164    /// Announce local refs for given repo.
2165    fn announce_refs(
2166        &mut self,
2167        rid: RepoId,
2168        doc: Doc,
2169        remotes: impl IntoIterator<Item = NodeId>,
2170    ) -> Result<(Vec<RefsAt>, Timestamp), Error> {
2171        let (ann, refs) = self.refs_announcement_for(rid, remotes)?;
2172        let timestamp = ann.timestamp();
2173        let peers = self.sessions.connected().map(|(_, p)| p);
2174
2175        // Update our sync status for our own refs. This is useful for determining if refs were
2176        // updated while the node was stopped.
2177        if let Some(refs) = refs.iter().find(|r| r.remote == ann.node) {
2178            info!(
2179                target: "service",
2180                "Announcing own refs for {rid} to peers ({}) (t={timestamp})..",
2181                refs.at
2182            );
2183            // Update our local node's sync status to mark the refs as announced.
2184            if let Err(e) = self
2185                .db
2186                .seeds_mut()
2187                .synced(&rid, &ann.node, refs.at, timestamp)
2188            {
2189                error!(target: "service", "Error updating sync status for local node: {e}");
2190            } else {
2191                debug!(target: "service", "Saved local sync status for {rid}..");
2192            }
2193        }
2194
2195        self.outbox.announce(
2196            ann,
2197            peers.filter(|p| {
2198                // Only announce to peers who are allowed to view this repo.
2199                doc.is_visible_to(&p.id.into())
2200            }),
2201            self.db.gossip_mut(),
2202        );
2203        Ok((refs, timestamp))
2204    }
2205
2206    fn reconnect(&mut self, nid: NodeId, addr: Address) -> bool {
2207        if let Some(sess) = self.sessions.get_mut(&nid) {
2208            sess.to_initial();
2209            self.outbox.connect(nid, addr);
2210
2211            return true;
2212        }
2213        false
2214    }
2215
2216    fn connect(&mut self, nid: NodeId, addr: Address) -> bool {
2217        debug!(target: "service", "Connecting to {nid} ({addr})..");
2218
2219        if self.sessions.contains_key(&nid) {
2220            warn!(target: "service", "Attempted connection to peer {nid} which already has a session");
2221            return false;
2222        }
2223        if nid == self.node_id() {
2224            error!(target: "service", "Attempted connection to self");
2225            return false;
2226        }
2227        if self.sessions.outbound().count() >= self.config.limits.connection.outbound {
2228            error!(target: "service", "Outbound connection limit reached when attempting {nid} ({addr})");
2229            return false;
2230        }
2231        let persistent = self.config.is_persistent(&nid);
2232        let timestamp: Timestamp = self.clock.into();
2233
2234        if let Err(e) = self.db.addresses_mut().attempted(&nid, &addr, timestamp) {
2235            error!(target: "service", "Error updating address book with connection attempt: {e}");
2236        }
2237        self.sessions.insert(
2238            nid,
2239            Session::outbound(
2240                nid,
2241                addr.clone(),
2242                persistent,
2243                self.rng.clone(),
2244                self.config.limits.clone(),
2245            ),
2246        );
2247        self.outbox.connect(nid, addr);
2248
2249        true
2250    }
2251
2252    fn seeds(&self, rid: &RepoId) -> Result<Seeds, Error> {
2253        let mut seeds = Seeds::new(self.rng.clone());
2254
2255        // First build a list from peers that have synced our own refs, if any.
2256        // This step is skipped if we don't have the repository yet, or don't have
2257        // our own refs.
2258        if let Ok(repo) = self.storage.repository(*rid) {
2259            if let Ok(local) = RefsAt::new(&repo, self.node_id()) {
2260                for seed in self.db.seeds().seeds_for(rid)? {
2261                    let seed = seed?;
2262                    let state = self.sessions.get(&seed.nid).map(|s| s.state.clone());
2263                    let synced = if local.at == seed.synced_at.oid {
2264                        SyncStatus::Synced { at: seed.synced_at }
2265                    } else {
2266                        let local = SyncedAt::new(local.at, &repo)?;
2267
2268                        SyncStatus::OutOfSync {
2269                            local,
2270                            remote: seed.synced_at,
2271                        }
2272                    };
2273                    seeds.insert(Seed::new(seed.nid, seed.addresses, state, Some(synced)));
2274                }
2275            }
2276        }
2277
2278        // Then, add peers we know about but have no information about the sync status.
2279        // These peers have announced that they seed the repository via an inventory
2280        // announcement, but we haven't received any ref announcements from them.
2281        for nid in self.db.routing().get(rid)? {
2282            if nid == self.node_id() {
2283                continue;
2284            }
2285            if seeds.contains(&nid) {
2286                // We already have a richer entry for this node.
2287                continue;
2288            }
2289            let addrs = self.db.addresses().addresses_of(&nid)?;
2290            let state = self.sessions.get(&nid).map(|s| s.state.clone());
2291
2292            seeds.insert(Seed::new(nid, addrs, state, None));
2293        }
2294        Ok(seeds)
2295    }
2296
2297    /// Return a new filter object, based on our seeding policy.
2298    fn filter(&self) -> Filter {
2299        if self.config.seeding_policy.is_allow() {
2300            // TODO: Remove bits for blocked repos.
2301            Filter::default()
2302        } else {
2303            self.filter.clone()
2304        }
2305    }
2306
2307    /// Get a timestamp for using in announcements.
2308    /// Never returns the same timestamp twice.
2309    fn timestamp(&mut self) -> Timestamp {
2310        let now = Timestamp::from(self.clock);
2311        if *now > *self.last_timestamp {
2312            self.last_timestamp = now;
2313        } else {
2314            self.last_timestamp = self.last_timestamp + 1;
2315        }
2316        self.last_timestamp
2317    }
2318
2319    fn relay(&mut self, id: gossip::AnnouncementId, ann: Announcement) {
2320        let announcer = ann.node;
2321        let relayed_by = self.relayed_by.get(&id);
2322        let rid = if let AnnouncementMessage::Refs(RefsAnnouncement { rid, .. }) = ann.message {
2323            Some(rid)
2324        } else {
2325            None
2326        };
2327        // Choose peers we should relay this message to.
2328        // 1. Don't relay to a peer who sent us this message.
2329        // 2. Don't relay to the peer who signed this announcement.
2330        let relay_to = self
2331            .sessions
2332            .connected()
2333            .filter(|(id, _)| {
2334                relayed_by
2335                    .map(|relayers| !relayers.contains(id))
2336                    .unwrap_or(true) // If there are no relayers we let it through.
2337            })
2338            .filter(|(id, _)| **id != announcer)
2339            .filter(|(id, _)| {
2340                if let Some(rid) = rid {
2341                    // Only relay this message if the peer is allowed to know about the
2342                    // repository. If we don't have the repository, return `false` because
2343                    // we can't determine if it's private or public.
2344                    self.storage
2345                        .get(rid)
2346                        .ok()
2347                        .flatten()
2348                        .map(|doc| doc.is_visible_to(&(*id).into()))
2349                        .unwrap_or(false)
2350                } else {
2351                    // Announcement doesn't concern a specific repository, let it through.
2352                    true
2353                }
2354            })
2355            .map(|(_, p)| p);
2356
2357        self.outbox.relay(ann, relay_to);
2358    }
2359
2360    ////////////////////////////////////////////////////////////////////////////
2361    // Periodic tasks
2362    ////////////////////////////////////////////////////////////////////////////
2363
2364    fn relay_announcements(&mut self) -> Result<(), Error> {
2365        let now = self.clock.into();
2366        let rows = self.database_mut().gossip_mut().relays(now)?;
2367        let local = self.node_id();
2368
2369        for (id, msg) in rows {
2370            let announcer = msg.node;
2371            if announcer == local {
2372                // Don't relay our own stored gossip messages.
2373                continue;
2374            }
2375            self.relay(id, msg);
2376        }
2377        Ok(())
2378    }
2379
2380    /// Announce our inventory to all connected peers, unless it was already announced.
2381    fn announce_inventory(&mut self) {
2382        let timestamp = self.inventory.timestamp.to_local_time();
2383
2384        if self.last_inventory == timestamp {
2385            debug!(target: "service", "Skipping redundant inventory announcement (t={})", self.inventory.timestamp);
2386            return;
2387        }
2388        let msg = AnnouncementMessage::from(self.inventory.clone());
2389
2390        self.outbox.announce(
2391            msg.signed(&self.signer),
2392            self.sessions.connected().map(|(_, p)| p),
2393            self.db.gossip_mut(),
2394        );
2395        self.last_inventory = timestamp;
2396    }
2397
2398    fn prune_routing_entries(&mut self, now: &LocalTime) -> Result<(), routing::Error> {
2399        let count = self.db.routing().len()?;
2400        if count <= self.config.limits.routing_max_size {
2401            return Ok(());
2402        }
2403
2404        let delta = count - self.config.limits.routing_max_size;
2405        let nid = self.node_id();
2406        self.db.routing_mut().prune(
2407            (*now - self.config.limits.routing_max_age).into(),
2408            Some(delta),
2409            &nid,
2410        )?;
2411        Ok(())
2412    }
2413
2414    fn disconnect_unresponsive_peers(&mut self, now: &LocalTime) {
2415        let stale = self
2416            .sessions
2417            .connected()
2418            .filter(|(_, session)| *now - session.last_active >= STALE_CONNECTION_TIMEOUT);
2419
2420        for (_, session) in stale {
2421            debug!(target: "service", "Disconnecting unresponsive peer {}..", session.id);
2422
2423            // TODO: Should we switch the session state to "disconnected" even before receiving
2424            // an official "disconnect"? Otherwise we keep pinging until we get the disconnection.
2425
2426            self.outbox.disconnect(
2427                session.id,
2428                DisconnectReason::Session(session::Error::Timeout),
2429            );
2430        }
2431    }
2432
2433    /// Ensure connection health by pinging connected peers.
2434    fn keep_alive(&mut self, now: &LocalTime) {
2435        let inactive_sessions = self
2436            .sessions
2437            .connected_mut()
2438            .filter(|(_, session)| *now - session.last_active >= KEEP_ALIVE_DELTA)
2439            .map(|(_, session)| session);
2440        for session in inactive_sessions {
2441            session.ping(self.clock, &mut self.outbox).ok();
2442        }
2443    }
2444
2445    /// Get a list of peers available to connect to, sorted by lowest penalty.
2446    fn available_peers(&mut self) -> Vec<Peer> {
2447        match self.db.addresses().entries() {
2448            Ok(entries) => {
2449                // Nb. we don't want to connect to any peers that already have a session with us,
2450                // even if it's in a disconnected state. Those sessions are re-attempted automatically.
2451                let mut peers = entries
2452                    .filter(|entry| entry.version == PROTOCOL_VERSION)
2453                    .filter(|entry| !entry.address.banned)
2454                    .filter(|entry| !entry.penalty.is_connect_threshold_reached())
2455                    .filter(|entry| !self.sessions.contains_key(&entry.node))
2456                    .filter(|entry| !self.config.external_addresses.contains(&entry.address.addr))
2457                    .filter(|entry| &entry.node != self.nid())
2458                    .fold(HashMap::new(), |mut acc, entry| {
2459                        acc.entry(entry.node)
2460                            .and_modify(|e: &mut Peer| e.addresses.push(entry.address.clone()))
2461                            .or_insert_with(|| Peer {
2462                                nid: entry.node,
2463                                addresses: vec![entry.address],
2464                                penalty: entry.penalty,
2465                            });
2466                        acc
2467                    })
2468                    .into_values()
2469                    .collect::<Vec<_>>();
2470                peers.sort_by_key(|p| p.penalty);
2471                peers
2472            }
2473            Err(e) => {
2474                error!(target: "service", "Unable to lookup available peers in address book: {e}");
2475                Vec::new()
2476            }
2477        }
2478    }
2479
2480    /// Fetch all repositories that are seeded but missing from storage.
2481    fn fetch_missing_repositories(&mut self) -> Result<(), Error> {
2482        // TODO(finto): could filter the policies based on the continue checks
2483        // below, but `storage.contains` is fallible
2484        let policies = self.policies.seed_policies()?.collect::<Vec<_>>();
2485        for policy in policies {
2486            let rid = policy.rid;
2487
2488            if !policy.is_allow() {
2489                continue;
2490            }
2491            if self.storage.contains(&rid)? {
2492                continue;
2493            }
2494            match self.seeds(&rid) {
2495                Ok(seeds) => {
2496                    if let Some(connected) = NonEmpty::from_vec(seeds.connected().collect()) {
2497                        for seed in connected {
2498                            self.fetch(rid, seed.nid, FETCH_TIMEOUT, None);
2499                        }
2500                    } else {
2501                        // TODO: We should make sure that this fetch is retried later, either
2502                        // when we connect to a seed, or when we discover a new seed.
2503                        // Since new connections and routing table updates are both conditions for
2504                        // fetching, we should trigger fetches when those conditions appear.
2505                        // Another way to handle this would be to update our database, saying
2506                        // that we're trying to fetch a certain repo. We would then just
2507                        // iterate over those entries in the above circumstances. This is
2508                        // merely an optimization though, we can also iterate over all seeded
2509                        // repos and check which ones are not in our inventory.
2510                        debug!(target: "service", "No connected seeds found for {rid}..");
2511                    }
2512                }
2513                Err(e) => {
2514                    error!(target: "service", "Couldn't fetch missing repo {rid}: failed to lookup seeds: {e}");
2515                }
2516            }
2517        }
2518        Ok(())
2519    }
2520
2521    /// Run idle task for all connections.
2522    fn idle_connections(&mut self) {
2523        for (_, sess) in self.sessions.iter_mut() {
2524            sess.idle(self.clock);
2525
2526            if sess.is_stable() {
2527                // Mark as connected once connection is stable.
2528                if let Err(e) =
2529                    self.db
2530                        .addresses_mut()
2531                        .connected(&sess.id, &sess.addr, self.clock.into())
2532                {
2533                    error!(target: "service", "Error updating address book with connection: {e}");
2534                }
2535            }
2536        }
2537    }
2538
2539    /// Try to maintain a target number of connections.
2540    fn maintain_connections(&mut self) {
2541        let PeerConfig::Dynamic = self.config.peers else {
2542            return;
2543        };
2544        trace!(target: "service", "Maintaining connections..");
2545
2546        let target = TARGET_OUTBOUND_PEERS;
2547        let now = self.clock;
2548        let outbound = self
2549            .sessions
2550            .values()
2551            .filter(|s| s.link.is_outbound())
2552            .filter(|s| s.is_connected() || s.is_connecting())
2553            .count();
2554        let wanted = target.saturating_sub(outbound);
2555
2556        // Don't connect to more peers than needed.
2557        if wanted == 0 {
2558            return;
2559        }
2560
2561        // Peers available to connect to.
2562        let available = self
2563            .available_peers()
2564            .into_iter()
2565            .filter_map(|peer| {
2566                peer.addresses
2567                    .into_iter()
2568                    .find(|ka| match (ka.last_success, ka.last_attempt) {
2569                        // If we succeeded the last time we tried, this is a good address.
2570                        // If it's been long enough that we failed to connect, we also try again.
2571                        (Some(success), Some(attempt)) => {
2572                            success >= attempt || now - attempt >= CONNECTION_RETRY_DELTA
2573                        }
2574                        // If we haven't succeeded yet, and we waited long enough, we can try this address.
2575                        (None, Some(attempt)) => now - attempt >= CONNECTION_RETRY_DELTA,
2576                        // If we have no failed attempts for this address, it's worth a try.
2577                        (_, None) => true,
2578                    })
2579                    .map(|ka| (peer.nid, ka))
2580            })
2581            .filter(|(_, ka)| match AddressType::from(&ka.addr) {
2582                // Only consider onion addresses if configured.
2583                AddressType::Onion => self.config.onion.is_some(),
2584                AddressType::Dns | AddressType::Ipv4 | AddressType::Ipv6 => true,
2585            });
2586
2587        // Peers we are going to attempt connections to.
2588        let connect = available.take(wanted).collect::<Vec<_>>();
2589        if connect.len() < wanted {
2590            log::debug!(
2591                target: "service",
2592                "Not enough available peers to connect to (available={}, wanted={wanted})",
2593                connect.len()
2594            );
2595        }
2596        for (id, ka) in connect {
2597            self.connect(id, ka.addr.clone());
2598        }
2599    }
2600
2601    /// Maintain persistent peer connections.
2602    fn maintain_persistent(&mut self) {
2603        trace!(target: "service", "Maintaining persistent peers..");
2604
2605        let now = self.local_time();
2606        let mut reconnect = Vec::new();
2607
2608        for (nid, session) in self.sessions.iter_mut() {
2609            if let Some(addr) = self.config.peer(nid) {
2610                if let session::State::Disconnected { retry_at, .. } = &mut session.state {
2611                    // TODO: Try to reconnect only if the peer was attempted. A disconnect without
2612                    // even a successful attempt means that we're unlikely to be able to reconnect.
2613
2614                    if now >= *retry_at {
2615                        reconnect.push((*nid, addr.clone(), session.attempts()));
2616                    }
2617                }
2618            }
2619        }
2620
2621        for (nid, addr, attempts) in reconnect {
2622            if self.reconnect(nid, addr) {
2623                debug!(target: "service", "Reconnecting to {nid} (attempts={attempts})...");
2624            }
2625        }
2626    }
2627}
2628
2629/// Gives read access to the service state.
2630pub trait ServiceState {
2631    /// Get the Node ID.
2632    fn nid(&self) -> &NodeId;
2633    /// Get the existing sessions.
2634    fn sessions(&self) -> &Sessions;
2635    /// Get fetch state.
2636    fn fetching(&self) -> &HashMap<RepoId, FetchState>;
2637    /// Get outbox.
2638    fn outbox(&self) -> &Outbox;
2639    /// Get rate limiter.
2640    fn limiter(&self) -> &RateLimiter;
2641    /// Get event emitter.
2642    fn emitter(&self) -> &Emitter<Event>;
2643    /// Get a repository from storage.
2644    fn get(&self, rid: RepoId) -> Result<Option<Doc>, RepositoryError>;
2645    /// Get the clock.
2646    fn clock(&self) -> &LocalTime;
2647    /// Get the clock mutably.
2648    fn clock_mut(&mut self) -> &mut LocalTime;
2649    /// Get service configuration.
2650    fn config(&self) -> &Config;
2651    /// Get service metrics.
2652    fn metrics(&self) -> &Metrics;
2653}
2654
2655impl<D, S, G> ServiceState for Service<D, S, G>
2656where
2657    D: routing::Store,
2658    G: crypto::signature::Signer<crypto::Signature>,
2659    S: ReadStorage,
2660{
2661    fn nid(&self) -> &NodeId {
2662        self.signer.public_key()
2663    }
2664
2665    fn sessions(&self) -> &Sessions {
2666        &self.sessions
2667    }
2668
2669    fn fetching(&self) -> &HashMap<RepoId, FetchState> {
2670        &self.fetching
2671    }
2672
2673    fn outbox(&self) -> &Outbox {
2674        &self.outbox
2675    }
2676
2677    fn limiter(&self) -> &RateLimiter {
2678        &self.limiter
2679    }
2680
2681    fn emitter(&self) -> &Emitter<Event> {
2682        &self.emitter
2683    }
2684
2685    fn get(&self, rid: RepoId) -> Result<Option<Doc>, RepositoryError> {
2686        self.storage.get(rid)
2687    }
2688
2689    fn clock(&self) -> &LocalTime {
2690        &self.clock
2691    }
2692
2693    fn clock_mut(&mut self) -> &mut LocalTime {
2694        &mut self.clock
2695    }
2696
2697    fn config(&self) -> &Config {
2698        &self.config
2699    }
2700
2701    fn metrics(&self) -> &Metrics {
2702        &self.metrics
2703    }
2704}
2705
2706/// Disconnect reason.
2707#[derive(Debug)]
2708pub enum DisconnectReason {
2709    /// Error while dialing the remote. This error occures before a connection is
2710    /// even established. Errors of this kind are usually not transient.
2711    Dial(Arc<dyn std::error::Error + Sync + Send>),
2712    /// Error with an underlying established connection. Sometimes, reconnecting
2713    /// after such an error is possible.
2714    Connection(Arc<dyn std::error::Error + Sync + Send>),
2715    /// Error with a fetch.
2716    Fetch(FetchError),
2717    /// Session error.
2718    Session(session::Error),
2719    /// Session conflicts with existing session.
2720    Conflict,
2721    /// Connection to self.
2722    SelfConnection,
2723    /// User requested disconnect
2724    Command,
2725}
2726
2727impl DisconnectReason {
2728    pub fn is_dial_err(&self) -> bool {
2729        matches!(self, Self::Dial(_))
2730    }
2731
2732    pub fn is_connection_err(&self) -> bool {
2733        matches!(self, Self::Connection(_))
2734    }
2735
2736    pub fn connection() -> Self {
2737        DisconnectReason::Connection(Arc::new(std::io::Error::from(
2738            std::io::ErrorKind::ConnectionReset,
2739        )))
2740    }
2741}
2742
2743impl fmt::Display for DisconnectReason {
2744    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2745        match self {
2746            Self::Dial(err) => write!(f, "{err}"),
2747            Self::Connection(err) => write!(f, "{err}"),
2748            Self::Command => write!(f, "command"),
2749            Self::SelfConnection => write!(f, "self-connection"),
2750            Self::Conflict => write!(f, "conflict"),
2751            Self::Session(err) => write!(f, "{err}"),
2752            Self::Fetch(err) => write!(f, "fetch: {err}"),
2753        }
2754    }
2755}
2756
2757/// Result of a project lookup.
2758#[derive(Debug)]
2759pub struct Lookup {
2760    /// Whether the project was found locally or not.
2761    pub local: Option<Doc>,
2762    /// A list of remote peers on which the project is known to exist.
2763    pub remote: Vec<NodeId>,
2764}
2765
2766#[derive(thiserror::Error, Debug)]
2767pub enum LookupError {
2768    #[error(transparent)]
2769    Routing(#[from] routing::Error),
2770    #[error(transparent)]
2771    Repository(#[from] RepositoryError),
2772}
2773
2774#[derive(Debug, Clone)]
2775/// Holds currently (or recently) connected peers.
2776pub struct Sessions(AddressBook<NodeId, Session>);
2777
2778impl Sessions {
2779    pub fn new(rng: Rng) -> Self {
2780        Self(AddressBook::new(rng))
2781    }
2782
2783    /// Iterator over fully connected peers.
2784    pub fn connected(&self) -> impl Iterator<Item = (&NodeId, &Session)> + Clone {
2785        self.0
2786            .iter()
2787            .filter_map(move |(id, sess)| match &sess.state {
2788                session::State::Connected { .. } => Some((id, sess)),
2789                _ => None,
2790            })
2791    }
2792
2793    /// Iterator over connected inbound peers.
2794    pub fn inbound(&self) -> impl Iterator<Item = (&NodeId, &Session)> + Clone {
2795        self.connected().filter(|(_, s)| s.link.is_inbound())
2796    }
2797
2798    /// Iterator over outbound peers.
2799    pub fn outbound(&self) -> impl Iterator<Item = (&NodeId, &Session)> + Clone {
2800        self.connected().filter(|(_, s)| s.link.is_outbound())
2801    }
2802
2803    /// Iterator over mutable fully connected peers.
2804    pub fn connected_mut(&mut self) -> impl Iterator<Item = (&NodeId, &mut Session)> {
2805        self.0.iter_mut().filter(move |(_, s)| s.is_connected())
2806    }
2807
2808    /// Iterator over disconnected peers.
2809    pub fn disconnected_mut(&mut self) -> impl Iterator<Item = (&NodeId, &mut Session)> {
2810        self.0.iter_mut().filter(move |(_, s)| s.is_disconnected())
2811    }
2812
2813    /// Return whether this node has a fully established session.
2814    pub fn is_connected(&self, id: &NodeId) -> bool {
2815        self.0.get(id).map(|s| s.is_connected()).unwrap_or(false)
2816    }
2817
2818    /// Return whether this node can be connected to.
2819    pub fn is_disconnected(&self, id: &NodeId) -> bool {
2820        self.0.get(id).map(|s| s.is_disconnected()).unwrap_or(true)
2821    }
2822}
2823
2824impl Deref for Sessions {
2825    type Target = AddressBook<NodeId, Session>;
2826
2827    fn deref(&self) -> &Self::Target {
2828        &self.0
2829    }
2830}
2831
2832impl DerefMut for Sessions {
2833    fn deref_mut(&mut self) -> &mut Self::Target {
2834        &mut self.0
2835    }
2836}