radicle_protocol/
service.rs

1#![allow(clippy::too_many_arguments)]
2#![allow(clippy::collapsible_match)]
3#![allow(clippy::collapsible_if)]
4#![warn(clippy::unwrap_used)]
5pub mod filter;
6pub mod gossip;
7pub mod io;
8pub mod limiter;
9pub mod message;
10pub mod session;
11
12use std::collections::hash_map::Entry;
13use std::collections::{BTreeSet, HashMap, HashSet};
14use std::net::IpAddr;
15use std::ops::{Deref, DerefMut};
16use std::sync::Arc;
17use std::{fmt, net, time};
18
19use crossbeam_channel as chan;
20use fastrand::Rng;
21use localtime::{LocalDuration, LocalTime};
22use log::*;
23use nonempty::NonEmpty;
24
25use radicle::identity::Doc;
26use radicle::node;
27use radicle::node::address;
28use radicle::node::address::Store as _;
29use radicle::node::address::{AddressBook, AddressType, KnownAddress};
30use radicle::node::config::{PeerConfig, RateLimit};
31use radicle::node::device::Device;
32use radicle::node::refs::Store as _;
33use radicle::node::routing::Store as _;
34use radicle::node::seed;
35use radicle::node::seed::Store as _;
36use radicle::node::{ConnectOptions, Penalty, Severity};
37use radicle::storage::refs::SIGREFS_BRANCH;
38use radicle::storage::RepositoryError;
39use radicle_fetch::policy::SeedingPolicy;
40
41use crate::service::gossip::Store as _;
42use crate::service::message::{
43    Announcement, AnnouncementMessage, Info, NodeAnnouncement, Ping, RefsAnnouncement, RefsStatus,
44};
45use crate::service::policy::{store::Write, Scope};
46use radicle::identity::RepoId;
47use radicle::node::events::Emitter;
48use radicle::node::routing;
49use radicle::node::routing::InsertResult;
50use radicle::node::{
51    Address, Alias, Features, FetchResult, HostName, Seed, Seeds, SyncStatus, SyncedAt,
52};
53use radicle::prelude::*;
54use radicle::storage;
55use radicle::storage::{refs::RefsAt, Namespaces, ReadStorage};
56// use radicle::worker::fetch;
57// use crate::worker::FetchError;
58use radicle::crypto;
59use radicle::node::Link;
60use radicle::node::PROTOCOL_VERSION;
61
62use crate::bounded::BoundedVec;
63use crate::service::filter::Filter;
64pub use crate::service::message::{Message, ZeroBytes};
65pub use crate::service::session::{QueuedFetch, Session};
66use crate::worker::FetchError;
67use radicle::node::events::{Event, Events};
68use radicle::node::{Config, NodeId};
69
70use radicle::node::policy::config as policy;
71
72use self::io::Outbox;
73use self::limiter::RateLimiter;
74use self::message::InventoryAnnouncement;
75use self::policy::NamespacesError;
76
77/// How often to run the "idle" task.
78pub const IDLE_INTERVAL: LocalDuration = LocalDuration::from_secs(30);
79/// How often to run the "gossip" task.
80pub const GOSSIP_INTERVAL: LocalDuration = LocalDuration::from_secs(6);
81/// How often to run the "announce" task.
82pub const ANNOUNCE_INTERVAL: LocalDuration = LocalDuration::from_mins(60);
83/// How often to run the "sync" task.
84pub const SYNC_INTERVAL: LocalDuration = LocalDuration::from_secs(60);
85/// How often to run the "prune" task.
86pub const PRUNE_INTERVAL: LocalDuration = LocalDuration::from_mins(30);
87/// Duration to wait on an unresponsive peer before dropping its connection.
88pub const STALE_CONNECTION_TIMEOUT: LocalDuration = LocalDuration::from_mins(2);
89/// How much time should pass after a peer was last active for a *ping* to be sent.
90pub const KEEP_ALIVE_DELTA: LocalDuration = LocalDuration::from_mins(1);
91/// Maximum number of latency values to keep for a session.
92pub const MAX_LATENCIES: usize = 16;
93/// Maximum time difference between the local time, and an announcement timestamp.
94pub const MAX_TIME_DELTA: LocalDuration = LocalDuration::from_mins(60);
95/// Maximum attempts to connect to a peer before we give up.
96pub const MAX_CONNECTION_ATTEMPTS: usize = 3;
97/// How far back from the present time should we request gossip messages when connecting to a peer,
98/// when we come online for the first time.
99pub const INITIAL_SUBSCRIBE_BACKLOG_DELTA: LocalDuration = LocalDuration::from_mins(60 * 24);
100/// When subscribing, what margin of error do we give ourselves. A igher delta means we ask for
101/// messages further back than strictly necessary, to account for missed messages.
102pub const SUBSCRIBE_BACKLOG_DELTA: LocalDuration = LocalDuration::from_mins(3);
103/// Minimum amount of time to wait before reconnecting to a peer.
104pub const MIN_RECONNECTION_DELTA: LocalDuration = LocalDuration::from_secs(3);
105/// Maximum amount of time to wait before reconnecting to a peer.
106pub const MAX_RECONNECTION_DELTA: LocalDuration = LocalDuration::from_mins(60);
107/// Connection retry delta used for ephemeral peers that failed to connect previously.
108pub const CONNECTION_RETRY_DELTA: LocalDuration = LocalDuration::from_mins(10);
109/// How long to wait for a fetch to stall before aborting, default is 3s.
110pub const FETCH_TIMEOUT: time::Duration = time::Duration::from_secs(3);
111/// Target number of peers to maintain connections to.
112pub const TARGET_OUTBOUND_PEERS: usize = 8;
113
114/// Maximum external address limit imposed by message size limits.
115pub use message::ADDRESS_LIMIT;
116/// Maximum inventory limit imposed by message size limits.
117pub use message::INVENTORY_LIMIT;
118/// Maximum number of project git references imposed by message size limits.
119pub use message::REF_REMOTE_LIMIT;
120
121/// Metrics we track.
122#[derive(Clone, Debug, Default, serde::Serialize)]
123#[serde(rename_all = "camelCase")]
124pub struct Metrics {
125    /// Metrics for each peer.
126    pub peers: HashMap<NodeId, PeerMetrics>,
127    /// Tasks queued in worker queue.
128    pub worker_queue_size: usize,
129    /// Current open channel count.
130    pub open_channels: usize,
131}
132
133impl Metrics {
134    /// Get metrics for the given peer.
135    pub fn peer(&mut self, nid: NodeId) -> &mut PeerMetrics {
136        self.peers.entry(nid).or_default()
137    }
138}
139
140/// Per-peer metrics we track.
141#[derive(Clone, Debug, Default, serde::Serialize)]
142#[serde(rename_all = "camelCase")]
143pub struct PeerMetrics {
144    pub received_git_bytes: usize,
145    pub received_fetch_requests: usize,
146    pub received_bytes: usize,
147    pub received_gossip_messages: usize,
148    pub sent_bytes: usize,
149    pub sent_fetch_requests: usize,
150    pub sent_git_bytes: usize,
151    pub sent_gossip_messages: usize,
152    pub streams_opened: usize,
153    pub inbound_connection_attempts: usize,
154    pub outbound_connection_attempts: usize,
155    pub disconnects: usize,
156}
157
158/// Result of syncing our routing table with a node's inventory.
159#[derive(Default)]
160struct SyncedRouting {
161    /// Repo entries added.
162    added: Vec<RepoId>,
163    /// Repo entries removed.
164    removed: Vec<RepoId>,
165    /// Repo entries updated (time).
166    updated: Vec<RepoId>,
167}
168
169impl SyncedRouting {
170    fn is_empty(&self) -> bool {
171        self.added.is_empty() && self.removed.is_empty() && self.updated.is_empty()
172    }
173}
174
175/// A peer we can connect to.
176#[derive(Debug, Clone)]
177struct Peer {
178    nid: NodeId,
179    addresses: Vec<KnownAddress>,
180    penalty: Penalty,
181}
182
183/// General service error.
184#[derive(thiserror::Error, Debug)]
185pub enum Error {
186    #[error(transparent)]
187    Git(#[from] radicle::git::raw::Error),
188    #[error(transparent)]
189    GitExt(#[from] radicle::git::ext::Error),
190    #[error(transparent)]
191    Storage(#[from] storage::Error),
192    #[error(transparent)]
193    Gossip(#[from] gossip::Error),
194    #[error(transparent)]
195    Refs(#[from] storage::refs::Error),
196    #[error(transparent)]
197    Routing(#[from] routing::Error),
198    #[error(transparent)]
199    Address(#[from] address::Error),
200    #[error(transparent)]
201    Database(#[from] node::db::Error),
202    #[error(transparent)]
203    Seeds(#[from] seed::Error),
204    #[error(transparent)]
205    Policy(#[from] policy::Error),
206    #[error(transparent)]
207    Repository(#[from] radicle::storage::RepositoryError),
208    #[error("namespaces error: {0}")]
209    Namespaces(Box<NamespacesError>),
210}
211
212impl From<NamespacesError> for Error {
213    fn from(e: NamespacesError) -> Self {
214        Self::Namespaces(Box::new(e))
215    }
216}
217
218#[derive(thiserror::Error, Debug)]
219pub enum ConnectError {
220    #[error("attempted connection to peer {nid} which already has a session")]
221    SessionExists { nid: NodeId },
222    #[error("attempted connection to self")]
223    SelfConnection,
224    #[error("outbound connection limit reached when attempting {nid} ({addr})")]
225    LimitReached { nid: NodeId, addr: Address },
226}
227
228/// A store for all node data.
229pub trait Store:
230    address::Store + gossip::Store + routing::Store + seed::Store + node::refs::Store
231{
232}
233
234impl Store for radicle::node::Database {}
235
236/// Function used to query internal service state.
237pub type QueryState = dyn Fn(&dyn ServiceState) -> Result<(), CommandError> + Send + Sync;
238
239/// Commands sent to the service by the operator.
240pub enum Command {
241    /// Announce repository references for given repository to peers.
242    AnnounceRefs(RepoId, chan::Sender<RefsAt>),
243    /// Announce local repositories to peers.
244    AnnounceInventory,
245    /// Add repository to local inventory.
246    AddInventory(RepoId, chan::Sender<bool>),
247    /// Connect to node with the given address.
248    Connect(NodeId, Address, ConnectOptions),
249    /// Disconnect from node.
250    Disconnect(NodeId),
251    /// Get the node configuration.
252    Config(chan::Sender<Config>),
253    /// Get the node's listen addresses.
254    ListenAddrs(chan::Sender<Vec<std::net::SocketAddr>>),
255    /// Lookup seeds for the given repository in the routing table.
256    Seeds(RepoId, chan::Sender<Seeds>),
257    /// Fetch the given repository from the network.
258    Fetch(RepoId, NodeId, time::Duration, chan::Sender<FetchResult>),
259    /// Seed the given repository.
260    Seed(RepoId, Scope, chan::Sender<bool>),
261    /// Unseed the given repository.
262    Unseed(RepoId, chan::Sender<bool>),
263    /// Follow the given node.
264    Follow(NodeId, Option<Alias>, chan::Sender<bool>),
265    /// Unfollow the given node.
266    Unfollow(NodeId, chan::Sender<bool>),
267    /// Query the internal service state.
268    QueryState(Arc<QueryState>, chan::Sender<Result<(), CommandError>>),
269}
270
271impl fmt::Debug for Command {
272    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
273        match self {
274            Self::AnnounceRefs(id, _) => write!(f, "AnnounceRefs({id})"),
275            Self::AnnounceInventory => write!(f, "AnnounceInventory"),
276            Self::AddInventory(rid, _) => write!(f, "AddInventory({rid})"),
277            Self::Connect(id, addr, opts) => write!(f, "Connect({id}, {addr}, {opts:?})"),
278            Self::Disconnect(id) => write!(f, "Disconnect({id})"),
279            Self::Config(_) => write!(f, "Config"),
280            Self::ListenAddrs(_) => write!(f, "ListenAddrs"),
281            Self::Seeds(id, _) => write!(f, "Seeds({id})"),
282            Self::Fetch(id, node, _, _) => write!(f, "Fetch({id}, {node})"),
283            Self::Seed(id, scope, _) => write!(f, "Seed({id}, {scope})"),
284            Self::Unseed(id, _) => write!(f, "Unseed({id})"),
285            Self::Follow(id, _, _) => write!(f, "Follow({id})"),
286            Self::Unfollow(id, _) => write!(f, "Unfollow({id})"),
287            Self::QueryState { .. } => write!(f, "QueryState(..)"),
288        }
289    }
290}
291
292/// Command-related errors.
293#[derive(thiserror::Error, Debug)]
294pub enum CommandError {
295    #[error(transparent)]
296    Storage(#[from] storage::Error),
297    #[error(transparent)]
298    Routing(#[from] routing::Error),
299    #[error(transparent)]
300    Policy(#[from] policy::Error),
301}
302
303/// Error returned by [`Service::try_fetch`].
304#[derive(thiserror::Error, Debug)]
305enum TryFetchError<'a> {
306    #[error("ongoing fetch for repository exists")]
307    AlreadyFetching(&'a mut FetchState),
308    #[error("peer is not connected; cannot initiate fetch")]
309    SessionNotConnected,
310    #[error("peer fetch capacity reached; cannot initiate fetch")]
311    SessionCapacityReached,
312    #[error(transparent)]
313    Namespaces(Box<NamespacesError>),
314}
315
316impl From<NamespacesError> for TryFetchError<'_> {
317    fn from(e: NamespacesError) -> Self {
318        Self::Namespaces(Box::new(e))
319    }
320}
321
322/// Fetch state for an ongoing fetch.
323#[derive(Debug)]
324pub struct FetchState {
325    /// Node we're fetching from.
326    pub from: NodeId,
327    /// What refs we're fetching.
328    pub refs_at: Vec<RefsAt>,
329    /// Channels waiting for fetch results.
330    pub subscribers: Vec<chan::Sender<FetchResult>>,
331}
332
333impl FetchState {
334    /// Add a subscriber to this fetch.
335    fn subscribe(&mut self, c: chan::Sender<FetchResult>) {
336        if !self.subscribers.iter().any(|s| s.same_channel(&c)) {
337            self.subscribers.push(c);
338        }
339    }
340}
341
342/// Holds all node stores.
343#[derive(Debug)]
344pub struct Stores<D>(D);
345
346impl<D> Stores<D>
347where
348    D: Store,
349{
350    /// Get the database as a routing store.
351    pub fn routing(&self) -> &impl routing::Store {
352        &self.0
353    }
354
355    /// Get the database as a routing store, mutably.
356    pub fn routing_mut(&mut self) -> &mut impl routing::Store {
357        &mut self.0
358    }
359
360    /// Get the database as an address store.
361    pub fn addresses(&self) -> &impl address::Store {
362        &self.0
363    }
364
365    /// Get the database as an address store, mutably.
366    pub fn addresses_mut(&mut self) -> &mut impl address::Store {
367        &mut self.0
368    }
369
370    /// Get the database as a gossip store.
371    pub fn gossip(&self) -> &impl gossip::Store {
372        &self.0
373    }
374
375    /// Get the database as a gossip store, mutably.
376    pub fn gossip_mut(&mut self) -> &mut impl gossip::Store {
377        &mut self.0
378    }
379
380    /// Get the database as a seed store.
381    pub fn seeds(&self) -> &impl seed::Store {
382        &self.0
383    }
384
385    /// Get the database as a seed store, mutably.
386    pub fn seeds_mut(&mut self) -> &mut impl seed::Store {
387        &mut self.0
388    }
389
390    /// Get the database as a refs db.
391    pub fn refs(&self) -> &impl node::refs::Store {
392        &self.0
393    }
394
395    /// Get the database as a refs db, mutably.
396    pub fn refs_mut(&mut self) -> &mut impl node::refs::Store {
397        &mut self.0
398    }
399}
400
401impl<D> AsMut<D> for Stores<D> {
402    fn as_mut(&mut self) -> &mut D {
403        &mut self.0
404    }
405}
406
407impl<D> From<D> for Stores<D> {
408    fn from(db: D) -> Self {
409        Self(db)
410    }
411}
412
413/// The node service.
414#[derive(Debug)]
415pub struct Service<D, S, G> {
416    /// Service configuration.
417    config: Config,
418    /// Our cryptographic signer and key.
419    signer: Device<G>,
420    /// Project storage.
421    storage: S,
422    /// Node database.
423    db: Stores<D>,
424    /// Policy configuration.
425    policies: policy::Config<Write>,
426    /// Peer sessions, currently or recently connected.
427    sessions: Sessions,
428    /// Clock. Tells the time.
429    clock: LocalTime,
430    /// Who relayed what announcement to us. We keep track of this to ensure that
431    /// we don't relay messages to nodes that already know about these messages.
432    relayed_by: HashMap<gossip::AnnouncementId, Vec<NodeId>>,
433    /// I/O outbox.
434    outbox: Outbox,
435    /// Cached local node announcement.
436    node: NodeAnnouncement,
437    /// Cached local inventory announcement.
438    inventory: InventoryAnnouncement,
439    /// Source of entropy.
440    rng: Rng,
441    /// Ongoing fetches.
442    fetching: HashMap<RepoId, FetchState>,
443    /// Request/connection rate limiter.
444    limiter: RateLimiter,
445    /// Current seeded repositories bloom filter.
446    filter: Filter,
447    /// Last time the service was idle.
448    last_idle: LocalTime,
449    /// Last time the gossip messages were relayed.
450    last_gossip: LocalTime,
451    /// Last time the service synced.
452    last_sync: LocalTime,
453    /// Last time the service routing table was pruned.
454    last_prune: LocalTime,
455    /// Last time the announcement task was run.
456    last_announce: LocalTime,
457    /// Timestamp of last local inventory announced.
458    last_inventory: LocalTime,
459    /// Last timestamp used for announcements.
460    last_timestamp: Timestamp,
461    /// Time when the service was initialized, or `None` if it wasn't initialized.
462    started_at: Option<LocalTime>,
463    /// Time when the service was last online, or `None` if this is the first time.
464    last_online_at: Option<LocalTime>,
465    /// Publishes events to subscribers.
466    emitter: Emitter<Event>,
467    /// Local listening addresses.
468    listening: Vec<net::SocketAddr>,
469    /// Latest metrics for all nodes connected to since the last start.
470    metrics: Metrics,
471}
472
473impl<D, S, G> Service<D, S, G> {
474    /// Get the local node id.
475    pub fn node_id(&self) -> NodeId {
476        *self.signer.public_key()
477    }
478
479    /// Get the local service time.
480    pub fn local_time(&self) -> LocalTime {
481        self.clock
482    }
483
484    pub fn emitter(&self) -> Emitter<Event> {
485        self.emitter.clone()
486    }
487}
488
489impl<D, S, G> Service<D, S, G>
490where
491    D: Store,
492    S: ReadStorage + 'static,
493    G: crypto::signature::Signer<crypto::Signature>,
494{
495    pub fn new(
496        config: Config,
497        db: Stores<D>,
498        storage: S,
499        policies: policy::Config<Write>,
500        signer: Device<G>,
501        rng: Rng,
502        node: NodeAnnouncement,
503        emitter: Emitter<Event>,
504    ) -> Self {
505        let sessions = Sessions::new(rng.clone());
506        let limiter = RateLimiter::new(config.peers());
507        let last_timestamp = node.timestamp;
508        let clock = LocalTime::default(); // Updated on initialize.
509        let inventory = gossip::inventory(clock.into(), []); // Updated on initialize.
510
511        Self {
512            config,
513            storage,
514            policies,
515            signer,
516            rng,
517            inventory,
518            node,
519            clock,
520            db,
521            outbox: Outbox::default(),
522            limiter,
523            sessions,
524            fetching: HashMap::new(),
525            filter: Filter::empty(),
526            relayed_by: HashMap::default(),
527            last_idle: LocalTime::default(),
528            last_gossip: LocalTime::default(),
529            last_sync: LocalTime::default(),
530            last_prune: LocalTime::default(),
531            last_timestamp,
532            last_announce: LocalTime::default(),
533            last_inventory: LocalTime::default(),
534            started_at: None,     // Updated on initialize.
535            last_online_at: None, // Updated on initialize.
536            emitter,
537            listening: vec![],
538            metrics: Metrics::default(),
539        }
540    }
541
542    /// Whether the service was started (initialized) and if so, at what time.
543    pub fn started(&self) -> Option<LocalTime> {
544        self.started_at
545    }
546
547    /// Return the next i/o action to execute.
548    #[allow(clippy::should_implement_trait)]
549    pub fn next(&mut self) -> Option<io::Io> {
550        self.outbox.next()
551    }
552
553    /// Seed a repository.
554    /// Returns whether or not the repo policy was updated.
555    pub fn seed(&mut self, id: &RepoId, scope: Scope) -> Result<bool, policy::Error> {
556        let updated = self.policies.seed(id, scope)?;
557        self.filter.insert(id);
558
559        Ok(updated)
560    }
561
562    /// Unseed a repository.
563    /// Returns whether or not the repo policy was updated.
564    /// Note that when unseeding, we don't announce anything to the network. This is because by
565    /// simply not announcing it anymore, it will eventually be pruned by nodes.
566    pub fn unseed(&mut self, id: &RepoId) -> Result<bool, policy::Error> {
567        let updated = self.policies.unseed(id)?;
568
569        if updated {
570            // Nb. This is potentially slow if we have lots of repos. We should probably
571            // only re-compute the filter when we've unseeded a certain amount of repos
572            // and the filter is really out of date.
573            self.filter = Filter::allowed_by(self.policies.seed_policies()?);
574            // Update and announce new inventory.
575            if let Err(e) = self.remove_inventory(id) {
576                error!(target: "service", "Error updating inventory after unseed: {e}");
577            }
578        }
579        Ok(updated)
580    }
581
582    /// Find the closest `n` peers by proximity in seeding graphs.
583    /// Returns a sorted list from the closest peer to the furthest.
584    /// Peers with more seedings in common score score higher.
585    #[allow(unused)]
586    pub fn closest_peers(&self, n: usize) -> Vec<NodeId> {
587        todo!()
588    }
589
590    /// Get the database.
591    pub fn database(&self) -> &Stores<D> {
592        &self.db
593    }
594
595    /// Get the mutable database.
596    pub fn database_mut(&mut self) -> &mut Stores<D> {
597        &mut self.db
598    }
599
600    /// Get the storage instance.
601    pub fn storage(&self) -> &S {
602        &self.storage
603    }
604
605    /// Get the mutable storage instance.
606    pub fn storage_mut(&mut self) -> &mut S {
607        &mut self.storage
608    }
609
610    /// Get the node policies.
611    pub fn policies(&self) -> &policy::Config<Write> {
612        &self.policies
613    }
614
615    /// Get the local signer.
616    pub fn signer(&self) -> &Device<G> {
617        &self.signer
618    }
619
620    /// Subscriber to inner `Emitter` events.
621    pub fn events(&mut self) -> Events {
622        Events::from(self.emitter.subscribe())
623    }
624
625    /// Get I/O outbox.
626    pub fn outbox(&mut self) -> &mut Outbox {
627        &mut self.outbox
628    }
629
630    /// Get configuration.
631    pub fn config(&self) -> &Config {
632        &self.config
633    }
634
635    /// Lookup a repository, both locally and in the routing table.
636    pub fn lookup(&self, rid: RepoId) -> Result<Lookup, LookupError> {
637        let this = self.nid();
638        let local = self.storage.get(rid)?;
639        let remote = self
640            .db
641            .routing()
642            .get(&rid)?
643            .iter()
644            .filter(|nid| nid != &this)
645            .cloned()
646            .collect();
647
648        Ok(Lookup { local, remote })
649    }
650
651    /// Initialize service with current time. Call this once.
652    pub fn initialize(&mut self, time: LocalTime) -> Result<(), Error> {
653        debug!(target: "service", "Init @{}", time.as_millis());
654        assert_ne!(time, LocalTime::default());
655
656        let nid = self.node_id();
657
658        self.clock = time;
659        self.started_at = Some(time);
660        self.last_online_at = match self.db.gossip().last() {
661            Ok(Some(last)) => Some(last.to_local_time()),
662            Ok(None) => None,
663            Err(e) => {
664                error!(target: "service", "Error getting the lastest gossip message from db: {e}");
665                None
666            }
667        };
668
669        // Populate refs database. This is only useful as part of the upgrade process for nodes
670        // that have been online since before the refs database was created.
671        match self.db.refs().count() {
672            Ok(0) => {
673                info!(target: "service", "Empty refs database, populating from storage..");
674                if let Err(e) = self.db.refs_mut().populate(&self.storage) {
675                    error!(target: "service", "Failed to populate refs database: {e}");
676                }
677            }
678            Ok(n) => debug!(target: "service", "Refs database has {n} cached references"),
679            Err(e) => error!(target: "service", "Error checking refs database: {e}"),
680        }
681
682        let announced = self
683            .db
684            .seeds()
685            .seeded_by(&nid)?
686            .collect::<Result<HashMap<_, _>, _>>()?;
687        let mut inventory = BTreeSet::new();
688        let mut private = BTreeSet::new();
689
690        for repo in self.storage.repositories()? {
691            let rid = repo.rid;
692
693            // If we're not seeding this repo, just skip it.
694            if !self.policies.is_seeding(&rid)? {
695                warn!(target: "service", "Local repository {rid} is not seeded");
696                continue;
697            }
698            // Add public repositories to inventory.
699            if repo.doc.is_public() {
700                inventory.insert(rid);
701            } else {
702                private.insert(rid);
703            }
704            // If we have no owned refs for this repo, then there's nothing to announce.
705            let Some(updated_at) = repo.synced_at else {
706                continue;
707            };
708            // Skip this repo if the sync status matches what we have in storage.
709            if let Some(announced) = announced.get(&rid) {
710                if updated_at.oid == announced.oid {
711                    continue;
712                }
713            }
714            // Make sure our local node's sync status is up to date with storage.
715            if self.db.seeds_mut().synced(
716                &rid,
717                &nid,
718                updated_at.oid,
719                updated_at.timestamp.into(),
720            )? {
721                debug!(target: "service", "Saved local sync status for {rid}..");
722            }
723            // If we got here, it likely means a repo was updated while the node was stopped.
724            // Therefore, we pre-load a refs announcement for this repo, so that it is included in
725            // the historical gossip messages when a node connects and subscribes to this repo.
726            if let Ok((ann, _)) = self.refs_announcement_for(rid, [nid]) {
727                debug!(target: "service", "Adding refs announcement for {rid} to historical gossip messages..");
728                self.db.gossip_mut().announced(&nid, &ann)?;
729            }
730        }
731
732        // Ensure that our inventory is recorded in our routing table, and we are seeding
733        // all of it. It can happen that inventory is not properly seeded if for eg. the
734        // user creates a new repository while the node is stopped.
735        self.db
736            .routing_mut()
737            .add_inventory(inventory.iter(), nid, time.into())?;
738        self.inventory = gossip::inventory(self.timestamp(), inventory);
739
740        // Ensure that private repositories are not in our inventory. It's possible that
741        // a repository was public and then it was made private.
742        self.db
743            .routing_mut()
744            .remove_inventories(private.iter(), &nid)?;
745
746        // Setup subscription filter for seeded repos.
747        self.filter = Filter::allowed_by(self.policies.seed_policies()?);
748        // Connect to configured peers.
749        let addrs = self.config.connect.clone();
750        for (id, addr) in addrs.into_iter().map(|ca| ca.into()) {
751            if let Err(e) = self.connect(id, addr) {
752                error!(target: "service", "Service::initialization connection error: {e}");
753            }
754        }
755        // Try to establish some connections.
756        self.maintain_connections();
757        // Start periodic tasks.
758        self.outbox.wakeup(IDLE_INTERVAL);
759        self.outbox.wakeup(GOSSIP_INTERVAL);
760
761        Ok(())
762    }
763
764    pub fn tick(&mut self, now: LocalTime, metrics: &Metrics) {
765        trace!(
766            target: "service",
767            "Tick +{}",
768            now - self.started_at.expect("Service::tick: service must be initialized")
769        );
770        if now >= self.clock {
771            self.clock = now;
772        } else {
773            // Nb. In tests, we often move the clock forwards in time to test different behaviors,
774            // so this warning isn't applicable there.
775            #[cfg(not(test))]
776            warn!(
777                target: "service",
778                "System clock is not monotonic: {now} is not greater or equal to {}", self.clock
779            );
780        }
781        self.metrics = metrics.clone();
782    }
783
784    pub fn wake(&mut self) {
785        let now = self.clock;
786
787        trace!(
788            target: "service",
789            "Wake +{}",
790            now - self.started_at.expect("Service::wake: service must be initialized")
791        );
792
793        if now - self.last_idle >= IDLE_INTERVAL {
794            trace!(target: "service", "Running 'idle' task...");
795
796            self.keep_alive(&now);
797            self.disconnect_unresponsive_peers(&now);
798            self.idle_connections();
799            self.maintain_connections();
800            self.dequeue_fetches();
801            self.outbox.wakeup(IDLE_INTERVAL);
802            self.last_idle = now;
803        }
804        if now - self.last_gossip >= GOSSIP_INTERVAL {
805            trace!(target: "service", "Running 'gossip' task...");
806
807            if let Err(e) = self.relay_announcements() {
808                error!(target: "service", "Error relaying stored announcements: {e}");
809            }
810            self.outbox.wakeup(GOSSIP_INTERVAL);
811            self.last_gossip = now;
812        }
813        if now - self.last_sync >= SYNC_INTERVAL {
814            trace!(target: "service", "Running 'sync' task...");
815
816            if let Err(e) = self.fetch_missing_repositories() {
817                error!(target: "service", "Error fetching missing inventory: {e}");
818            }
819            self.outbox.wakeup(SYNC_INTERVAL);
820            self.last_sync = now;
821        }
822        if now - self.last_announce >= ANNOUNCE_INTERVAL {
823            trace!(target: "service", "Running 'announce' task...");
824
825            self.announce_inventory();
826            self.outbox.wakeup(ANNOUNCE_INTERVAL);
827            self.last_announce = now;
828        }
829        if now - self.last_prune >= PRUNE_INTERVAL {
830            trace!(target: "service", "Running 'prune' task...");
831
832            if let Err(err) = self.prune_routing_entries(&now) {
833                error!(target: "service", "Error pruning routing entries: {err}");
834            }
835            if let Err(err) = self
836                .db
837                .gossip_mut()
838                .prune((now - LocalDuration::from(self.config.limits.gossip_max_age)).into())
839            {
840                error!(target: "service", "Error pruning gossip entries: {err}");
841            }
842
843            self.outbox.wakeup(PRUNE_INTERVAL);
844            self.last_prune = now;
845        }
846
847        // Always check whether there are persistent peers that need reconnecting.
848        self.maintain_persistent();
849    }
850
851    pub fn command(&mut self, cmd: Command) {
852        info!(target: "service", "Received command {cmd:?}");
853
854        match cmd {
855            Command::Connect(nid, addr, opts) => {
856                if opts.persistent {
857                    self.config.connect.insert((nid, addr.clone()).into());
858                }
859                if let Err(e) = self.connect(nid, addr) {
860                    match e {
861                        ConnectError::SessionExists { nid } => {
862                            self.emitter.emit(Event::PeerConnected { nid });
863                        }
864                        e => {
865                            // N.b. using the fact that the call to connect waits for an event
866                            self.emitter.emit(Event::PeerDisconnected {
867                                nid,
868                                reason: e.to_string(),
869                            });
870                        }
871                    }
872                }
873            }
874            Command::Disconnect(nid) => {
875                self.outbox.disconnect(nid, DisconnectReason::Command);
876            }
877            Command::Config(resp) => {
878                resp.send(self.config.clone()).ok();
879            }
880            Command::ListenAddrs(resp) => {
881                resp.send(self.listening.clone()).ok();
882            }
883            Command::Seeds(rid, resp) => match self.seeds(&rid) {
884                Ok(seeds) => {
885                    let (connected, disconnected) = seeds.partition();
886                    debug!(
887                        target: "service",
888                        "Found {} connected seed(s) and {} disconnected seed(s) for {}",
889                        connected.len(), disconnected.len(),  rid
890                    );
891                    resp.send(seeds).ok();
892                }
893                Err(e) => {
894                    error!(target: "service", "Error getting seeds for {rid}: {e}");
895                }
896            },
897            Command::Fetch(rid, seed, timeout, resp) => {
898                self.fetch(rid, seed, timeout, Some(resp));
899            }
900            Command::Seed(rid, scope, resp) => {
901                // Update our seeding policy.
902                let seeded = self
903                    .seed(&rid, scope)
904                    .expect("Service::command: error seeding repository");
905                resp.send(seeded).ok();
906
907                // Let all our peers know that we're interested in this repo from now on.
908                self.outbox.broadcast(
909                    Message::subscribe(self.filter(), self.clock.into(), Timestamp::MAX),
910                    self.sessions.connected().map(|(_, s)| s),
911                );
912            }
913            Command::Unseed(id, resp) => {
914                let updated = self
915                    .unseed(&id)
916                    .expect("Service::command: error unseeding repository");
917                resp.send(updated).ok();
918            }
919            Command::Follow(id, alias, resp) => {
920                let seeded = self
921                    .policies
922                    .follow(&id, alias.as_ref())
923                    .expect("Service::command: error following node");
924                resp.send(seeded).ok();
925            }
926            Command::Unfollow(id, resp) => {
927                let updated = self
928                    .policies
929                    .unfollow(&id)
930                    .expect("Service::command: error unfollowing node");
931                resp.send(updated).ok();
932            }
933            Command::AnnounceRefs(id, resp) => {
934                let doc = match self.storage.get(id) {
935                    Ok(Some(doc)) => doc,
936                    Ok(None) => {
937                        error!(target: "service", "Error announcing refs: repository {id} not found");
938                        return;
939                    }
940                    Err(e) => {
941                        error!(target: "service", "Error announcing refs: doc error: {e}");
942                        return;
943                    }
944                };
945
946                match self.announce_own_refs(id, doc) {
947                    Ok(refs) => match refs.as_slice() {
948                        &[refs] => {
949                            resp.send(refs).ok();
950                        }
951                        // SAFETY: Since we passed in one NID, we should get exactly one item back.
952                        [..] => panic!("Service::command: unexpected refs returned"),
953                    },
954                    Err(err) => {
955                        error!(target: "service", "Error announcing refs: {err}");
956                    }
957                }
958            }
959            Command::AnnounceInventory => {
960                self.announce_inventory();
961            }
962            Command::AddInventory(rid, resp) => match self.add_inventory(rid) {
963                Ok(updated) => {
964                    resp.send(updated).ok();
965                }
966                Err(e) => {
967                    error!(target: "service", "Error adding {rid} to inventory: {e}");
968                }
969            },
970            Command::QueryState(query, sender) => {
971                sender.send(query(self)).ok();
972            }
973        }
974    }
975
976    /// Initiate an outgoing fetch for some repository, based on another node's announcement.
977    /// Returns `true` if the fetch was initiated and `false` if it was skipped.
978    fn fetch_refs_at(
979        &mut self,
980        rid: RepoId,
981        from: NodeId,
982        refs: NonEmpty<RefsAt>,
983        scope: Scope,
984        timeout: time::Duration,
985        channel: Option<chan::Sender<FetchResult>>,
986    ) -> bool {
987        match self.refs_status_of(rid, refs, &scope) {
988            Ok(status) => {
989                if status.want.is_empty() {
990                    debug!(target: "service", "Skipping fetch for {rid}, all refs are already in storage");
991                } else {
992                    return self._fetch(rid, from, status.want, timeout, channel);
993                }
994            }
995            Err(e) => {
996                error!(target: "service", "Error getting the refs status of {rid}: {e}");
997            }
998        }
999        // We didn't try to fetch anything.
1000        false
1001    }
1002
1003    /// Initiate an outgoing fetch for some repository.
1004    fn fetch(
1005        &mut self,
1006        rid: RepoId,
1007        from: NodeId,
1008        timeout: time::Duration,
1009        channel: Option<chan::Sender<FetchResult>>,
1010    ) -> bool {
1011        self._fetch(rid, from, vec![], timeout, channel)
1012    }
1013
1014    fn _fetch(
1015        &mut self,
1016        rid: RepoId,
1017        from: NodeId,
1018        refs_at: Vec<RefsAt>,
1019        timeout: time::Duration,
1020        channel: Option<chan::Sender<FetchResult>>,
1021    ) -> bool {
1022        match self.try_fetch(rid, &from, refs_at.clone(), timeout) {
1023            Ok(fetching) => {
1024                if let Some(c) = channel {
1025                    fetching.subscribe(c);
1026                }
1027                return true;
1028            }
1029            Err(TryFetchError::AlreadyFetching(fetching)) => {
1030                // If we're already fetching the same refs from the requested peer, there's nothing
1031                // to do, we simply add the supplied channel to the list of subscribers so that it
1032                // is notified on completion. Otherwise, we queue a fetch with the requested peer.
1033                if fetching.from == from && fetching.refs_at == refs_at {
1034                    debug!(target: "service", "Ignoring redundant fetch of {rid} from {from}");
1035
1036                    if let Some(c) = channel {
1037                        fetching.subscribe(c);
1038                    }
1039                } else {
1040                    let fetch = QueuedFetch {
1041                        rid,
1042                        refs_at,
1043                        from,
1044                        timeout,
1045                        channel,
1046                    };
1047                    debug!(target: "service", "Queueing fetch for {rid} with {from} (already fetching)..");
1048
1049                    self.queue_fetch(fetch);
1050                }
1051            }
1052            Err(TryFetchError::SessionCapacityReached) => {
1053                debug!(target: "service", "Fetch capacity reached for {from}, queueing {rid}..");
1054                self.queue_fetch(QueuedFetch {
1055                    rid,
1056                    refs_at,
1057                    from,
1058                    timeout,
1059                    channel,
1060                });
1061            }
1062            Err(e) => {
1063                if let Some(c) = channel {
1064                    c.send(FetchResult::Failed {
1065                        reason: e.to_string(),
1066                    })
1067                    .ok();
1068                }
1069            }
1070        }
1071        false
1072    }
1073
1074    fn queue_fetch(&mut self, fetch: QueuedFetch) {
1075        let Some(s) = self.sessions.get_mut(&fetch.from) else {
1076            log::error!(target: "service", "Cannot queue fetch for unknown session {}", fetch.from);
1077            return;
1078        };
1079        if let Err(e) = s.queue_fetch(fetch) {
1080            let fetch = e.inner();
1081            log::debug!(target: "service", "Unable to queue fetch for {} with {}: {e}", &fetch.rid, &fetch.from);
1082        }
1083    }
1084
1085    // TODO: Buffer/throttle fetches.
1086    fn try_fetch(
1087        &mut self,
1088        rid: RepoId,
1089        from: &NodeId,
1090        refs_at: Vec<RefsAt>,
1091        timeout: time::Duration,
1092    ) -> Result<&mut FetchState, TryFetchError> {
1093        let from = *from;
1094        let Some(session) = self.sessions.get_mut(&from) else {
1095            return Err(TryFetchError::SessionNotConnected);
1096        };
1097        let fetching = self.fetching.entry(rid);
1098
1099        trace!(target: "service", "Trying to fetch {refs_at:?} for {rid}..");
1100
1101        let fetching = match fetching {
1102            Entry::Vacant(fetching) => fetching,
1103            Entry::Occupied(fetching) => {
1104                // We're already fetching this repo from some peer.
1105                return Err(TryFetchError::AlreadyFetching(fetching.into_mut()));
1106            }
1107        };
1108        // Sanity check: We shouldn't be fetching from this session, since we return above if we're
1109        // fetching from any session.
1110        debug_assert!(!session.is_fetching(&rid));
1111
1112        if !session.is_connected() {
1113            // This can happen if a session disconnects in the time between asking for seeds to
1114            // fetch from, and initiating the fetch from one of those seeds.
1115            return Err(TryFetchError::SessionNotConnected);
1116        }
1117        if session.is_at_capacity() {
1118            // If we're already fetching multiple repos from this peer.
1119            return Err(TryFetchError::SessionCapacityReached);
1120        }
1121
1122        let fetching = fetching.insert(FetchState {
1123            from,
1124            refs_at: refs_at.clone(),
1125            subscribers: vec![],
1126        });
1127        self.outbox.fetch(
1128            session,
1129            rid,
1130            refs_at,
1131            timeout,
1132            self.config.limits.fetch_pack_receive,
1133        );
1134
1135        Ok(fetching)
1136    }
1137
1138    pub fn fetched(
1139        &mut self,
1140        rid: RepoId,
1141        remote: NodeId,
1142        result: Result<crate::worker::fetch::FetchResult, crate::worker::FetchError>,
1143    ) {
1144        let Some(fetching) = self.fetching.remove(&rid) else {
1145            error!(target: "service", "Received unexpected fetch result for {rid}, from {remote}");
1146            return;
1147        };
1148        debug_assert_eq!(fetching.from, remote);
1149
1150        if let Some(s) = self.sessions.get_mut(&remote) {
1151            // Mark this RID as fetched for this session.
1152            s.fetched(rid);
1153        }
1154
1155        // Notify all fetch subscribers of the fetch result. This is used when the user requests
1156        // a fetch via the CLI, for example.
1157        for sub in &fetching.subscribers {
1158            debug!(target: "service", "Found existing fetch request from {remote}, sending result..");
1159
1160            let result = match &result {
1161                Ok(success) => FetchResult::Success {
1162                    updated: success.updated.clone(),
1163                    namespaces: success.namespaces.clone(),
1164                    clone: success.clone,
1165                },
1166                Err(e) => FetchResult::Failed {
1167                    reason: e.to_string(),
1168                },
1169            };
1170            if sub.send(result).is_err() {
1171                error!(target: "service", "Error sending fetch result for {rid} from {remote}..");
1172            } else {
1173                debug!(target: "service", "Sent fetch result for {rid} from {remote}..");
1174            }
1175        }
1176
1177        match result {
1178            Ok(crate::worker::fetch::FetchResult {
1179                updated,
1180                canonical,
1181                namespaces,
1182                clone,
1183                doc,
1184            }) => {
1185                info!(target: "service", "Fetched {rid} from {remote} successfully");
1186                // Update our routing table in case this fetch was user-initiated and doesn't
1187                // come from an announcement.
1188                self.seed_discovered(rid, remote, self.clock.into());
1189
1190                for update in &updated {
1191                    if update.is_skipped() {
1192                        trace!(target: "service", "Ref skipped: {update} for {rid}");
1193                    } else {
1194                        debug!(target: "service", "Ref updated: {update} for {rid}");
1195                    }
1196                }
1197                self.emitter.emit(Event::RefsFetched {
1198                    remote,
1199                    rid,
1200                    updated: updated.clone(),
1201                });
1202                self.emitter.emit_all(
1203                    canonical
1204                        .into_iter()
1205                        .map(|(refname, target)| Event::CanonicalRefUpdated {
1206                            rid,
1207                            refname,
1208                            target,
1209                        })
1210                        .collect(),
1211                );
1212
1213                // Announce our new inventory if this fetch was a full clone.
1214                // Only update and announce inventory for public repositories.
1215                if clone && doc.is_public() {
1216                    debug!(target: "service", "Updating and announcing inventory for cloned repository {rid}..");
1217
1218                    if let Err(e) = self.add_inventory(rid) {
1219                        error!(target: "service", "Error announcing inventory for {rid}: {e}");
1220                    }
1221                }
1222
1223                // It's possible for a fetch to succeed but nothing was updated.
1224                if updated.is_empty() || updated.iter().all(|u| u.is_skipped()) {
1225                    debug!(target: "service", "Nothing to announce, no refs were updated..");
1226                } else {
1227                    // Finally, announce the refs. This is useful for nodes to know what we've synced,
1228                    // beyond just knowing that we have added an item to our inventory.
1229                    if let Err(e) = self.announce_refs(rid, doc.into(), namespaces) {
1230                        error!(target: "service", "Failed to announce new refs: {e}");
1231                    }
1232                }
1233            }
1234            Err(err) => {
1235                error!(target: "service", "Fetch failed for {rid} from {remote}: {err}");
1236
1237                // For now, we only disconnect the remote in case of timeout. In the future,
1238                // there may be other reasons to disconnect.
1239                if err.is_timeout() {
1240                    self.outbox.disconnect(remote, DisconnectReason::Fetch(err));
1241                }
1242            }
1243        }
1244        // We can now try to dequeue more fetches.
1245        self.dequeue_fetches();
1246    }
1247
1248    /// Attempt to dequeue fetches from all peers.
1249    /// At most one fetch is dequeued per peer. If the fetch cannot be processed,
1250    /// it is put back on the queue for that peer.
1251    ///
1252    /// Fetches are queued for two reasons:
1253    /// 1. The RID was already being fetched.
1254    /// 2. The session was already at fetch capacity.
1255    pub fn dequeue_fetches(&mut self) {
1256        let sessions = self
1257            .sessions
1258            .shuffled()
1259            .map(|(k, _)| *k)
1260            .collect::<Vec<_>>();
1261
1262        // Try to dequeue once per session.
1263        for nid in sessions {
1264            // SAFETY: All the keys we are iterating on exist.
1265            #[allow(clippy::unwrap_used)]
1266            let sess = self.sessions.get_mut(&nid).unwrap();
1267            if !sess.is_connected() || sess.is_at_capacity() {
1268                continue;
1269            }
1270
1271            if let Some(QueuedFetch {
1272                rid,
1273                from,
1274                refs_at,
1275                timeout,
1276                channel,
1277            }) = sess.dequeue_fetch()
1278            {
1279                debug!(target: "service", "Dequeued fetch for {rid} from session {from}..");
1280
1281                if let Some(refs) = NonEmpty::from_vec(refs_at) {
1282                    let repo_entry = self.policies.seed_policy(&rid).expect(
1283                        "Service::dequeue_fetch: error accessing repo seeding configuration",
1284                    );
1285                    let SeedingPolicy::Allow { scope } = repo_entry.policy else {
1286                        debug!(target: "service", "Repository {rid} is no longer seeded, skipping..");
1287                        continue;
1288                    };
1289                    self.fetch_refs_at(rid, from, refs, scope, timeout, channel);
1290                } else {
1291                    // If no refs are specified, always do a full fetch.
1292                    self.fetch(rid, from, timeout, channel);
1293                }
1294            }
1295        }
1296    }
1297
1298    /// Inbound connection attempt.
1299    pub fn accepted(&mut self, ip: IpAddr) -> bool {
1300        // Always accept localhost connections, even if we already reached
1301        // our inbound connection limit.
1302        if ip.is_loopback() || ip.is_unspecified() {
1303            return true;
1304        }
1305        // Check for inbound connection limit.
1306        if self.sessions.inbound().count() >= self.config.limits.connection.inbound.into() {
1307            return false;
1308        }
1309        match self.db.addresses().is_ip_banned(ip) {
1310            Ok(banned) => {
1311                if banned {
1312                    debug!(target: "service", "Rejecting inbound connection from banned ip {ip}");
1313                    return false;
1314                }
1315            }
1316            Err(e) => error!(target: "service", "Error querying ban status for {ip}: {e}"),
1317        }
1318        let host: HostName = ip.into();
1319        let tokens = RateLimit::from(self.config.limits.rate.inbound.clone());
1320
1321        if self.limiter.limit(host.clone(), None, &tokens, self.clock) {
1322            trace!(target: "service", "Rate limiting inbound connection from {host}..");
1323            return false;
1324        }
1325        true
1326    }
1327
1328    pub fn attempted(&mut self, nid: NodeId, addr: Address) {
1329        debug!(target: "service", "Attempted connection to {nid} ({addr})");
1330
1331        if let Some(sess) = self.sessions.get_mut(&nid) {
1332            sess.to_attempted();
1333        } else {
1334            #[cfg(debug_assertions)]
1335            panic!("Service::attempted: unknown session {nid}@{addr}");
1336        }
1337    }
1338
1339    pub fn listening(&mut self, local_addr: net::SocketAddr) {
1340        info!(target: "node", "Listening on {local_addr}..");
1341
1342        self.listening.push(local_addr);
1343    }
1344
1345    pub fn connected(&mut self, remote: NodeId, addr: Address, link: Link) {
1346        info!(target: "service", "Connected to {remote} ({addr}) ({link:?})");
1347        self.emitter.emit(Event::PeerConnected { nid: remote });
1348
1349        let msgs = self.initial(link);
1350
1351        if link.is_outbound() {
1352            if let Some(peer) = self.sessions.get_mut(&remote) {
1353                peer.to_connected(self.clock);
1354                self.outbox.write_all(peer, msgs);
1355            }
1356        } else {
1357            match self.sessions.entry(remote) {
1358                Entry::Occupied(mut e) => {
1359                    // In this scenario, it's possible that our peer is persistent, and
1360                    // disconnected. We get an inbound connection before we attempt a re-connection,
1361                    // and therefore we treat it as a regular inbound connection.
1362                    //
1363                    // It's also possible that a disconnection hasn't gone through yet and our
1364                    // peer is still in connected state here, while a new inbound connection from
1365                    // that same peer is made. This results in a new connection from a peer that is
1366                    // already connected from the perspective of the service. This appears to be
1367                    // a bug in the underlying networking library.
1368                    let peer = e.get_mut();
1369                    debug!(
1370                        target: "service",
1371                        "Connecting peer {remote} already has a session open ({peer})"
1372                    );
1373                    peer.link = link;
1374                    peer.to_connected(self.clock);
1375                    self.outbox.write_all(peer, msgs);
1376                }
1377                Entry::Vacant(e) => {
1378                    if let HostName::Ip(ip) = addr.host {
1379                        if !address::is_local(&ip) {
1380                            if let Err(e) =
1381                                self.db
1382                                    .addresses_mut()
1383                                    .record_ip(&remote, ip, self.clock.into())
1384                            {
1385                                log::error!(target: "service", "Error recording IP address for {remote}: {e}");
1386                            }
1387                        }
1388                    }
1389                    let peer = e.insert(Session::inbound(
1390                        remote,
1391                        addr,
1392                        self.config.is_persistent(&remote),
1393                        self.rng.clone(),
1394                        self.clock,
1395                        self.config.limits.clone(),
1396                    ));
1397                    self.outbox.write_all(peer, msgs);
1398                }
1399            }
1400        }
1401    }
1402
1403    pub fn disconnected(&mut self, remote: NodeId, link: Link, reason: &DisconnectReason) {
1404        let since = self.local_time();
1405        let Some(session) = self.sessions.get_mut(&remote) else {
1406            // Since we sometimes disconnect the service eagerly, it's not unusual to get a second
1407            // disconnection event once the transport is dropped.
1408            trace!(target: "service", "Redundant disconnection for {remote} ({reason})");
1409            return;
1410        };
1411        // In cases of connection conflicts, there may be disconnections of one of the two
1412        // connections. In that case we don't want the service to remove the session.
1413        if session.link != link {
1414            return;
1415        }
1416
1417        info!(target: "service", "Disconnected from {remote} ({reason})");
1418        self.emitter.emit(Event::PeerDisconnected {
1419            nid: remote,
1420            reason: reason.to_string(),
1421        });
1422
1423        let link = session.link;
1424        let addr = session.addr.clone();
1425
1426        self.fetching.retain(|_, fetching| {
1427            if fetching.from != remote {
1428                return true;
1429            }
1430            // Remove and fail any pending fetches from this remote node.
1431            for resp in &fetching.subscribers {
1432                resp.send(FetchResult::Failed {
1433                    reason: format!("disconnected: {reason}"),
1434                })
1435                .ok();
1436            }
1437            false
1438        });
1439
1440        // Attempt to re-connect to persistent peers.
1441        if self.config.peer(&remote).is_some() {
1442            let delay = LocalDuration::from_secs(2u64.saturating_pow(session.attempts() as u32))
1443                .clamp(MIN_RECONNECTION_DELTA, MAX_RECONNECTION_DELTA);
1444
1445            // Nb. We always try to reconnect to persistent peers, even when the error appears
1446            // to not be transient.
1447            session.to_disconnected(since, since + delay);
1448
1449            debug!(target: "service", "Reconnecting to {remote} in {delay}..");
1450
1451            self.outbox.wakeup(delay);
1452        } else {
1453            debug!(target: "service", "Dropping peer {remote}..");
1454            self.sessions.remove(&remote);
1455
1456            let severity = match reason {
1457                DisconnectReason::Dial(_)
1458                | DisconnectReason::Fetch(_)
1459                | DisconnectReason::Connection(_) => {
1460                    if self.is_online() {
1461                        // If we're "online", there's something wrong with this
1462                        // peer connection specifically.
1463                        Severity::Medium
1464                    } else {
1465                        Severity::Low
1466                    }
1467                }
1468                DisconnectReason::Session(e) => e.severity(),
1469                DisconnectReason::Command
1470                | DisconnectReason::Conflict
1471                | DisconnectReason::SelfConnection => Severity::Low,
1472            };
1473
1474            if let Err(e) = self
1475                .db
1476                .addresses_mut()
1477                .disconnected(&remote, &addr, severity)
1478            {
1479                error!(target: "service", "Error updating address store: {e}");
1480            }
1481            // Only re-attempt outbound connections, since we don't care if an inbound connection
1482            // is dropped.
1483            if link.is_outbound() {
1484                self.maintain_connections();
1485            }
1486        }
1487        self.dequeue_fetches();
1488    }
1489
1490    pub fn received_message(&mut self, remote: NodeId, message: Message) {
1491        if let Err(err) = self.handle_message(&remote, message) {
1492            // If there's an error, stop processing messages from this peer.
1493            // However, we still relay messages returned up to this point.
1494            self.outbox
1495                .disconnect(remote, DisconnectReason::Session(err));
1496
1497            // FIXME: The peer should be set in a state such that we don't
1498            // process further messages.
1499        }
1500    }
1501
1502    /// Handle an announcement message.
1503    ///
1504    /// Returns `true` if this announcement should be stored and relayed to connected peers,
1505    /// and `false` if it should not.
1506    pub fn handle_announcement(
1507        &mut self,
1508        relayer: &NodeId,
1509        relayer_addr: &Address,
1510        announcement: &Announcement,
1511    ) -> Result<Option<gossip::AnnouncementId>, session::Error> {
1512        if !announcement.verify() {
1513            return Err(session::Error::Misbehavior);
1514        }
1515        let Announcement {
1516            node: announcer,
1517            message,
1518            ..
1519        } = announcement;
1520
1521        // Ignore our own announcements, in case the relayer sent one by mistake.
1522        if announcer == self.nid() {
1523            return Ok(None);
1524        }
1525        let now = self.clock;
1526        let timestamp = message.timestamp();
1527
1528        // Don't allow messages from too far in the future.
1529        if timestamp.saturating_sub(now.as_millis()) > MAX_TIME_DELTA.as_millis() as u64 {
1530            return Err(session::Error::InvalidTimestamp(timestamp));
1531        }
1532
1533        // We don't process announcements from nodes we don't know, since the node announcement is
1534        // what provides DoS protection.
1535        //
1536        // Note that it's possible to *not* receive the node announcement, but receive the
1537        // subsequent announcements of a node in the case of historical gossip messages requested
1538        // from the `subscribe` message. This can happen if the cut-off time is after the node
1539        // announcement timestamp, but before the other announcements. In that case, we simply
1540        // ignore all announcements of that node until we get a node announcement.
1541        if let AnnouncementMessage::Inventory(_) | AnnouncementMessage::Refs(_) = message {
1542            match self.db.addresses().get(announcer) {
1543                Ok(node) => {
1544                    if node.is_none() {
1545                        debug!(target: "service", "Ignoring announcement from unknown node {announcer} (t={timestamp})");
1546                        return Ok(None);
1547                    }
1548                }
1549                Err(e) => {
1550                    error!(target: "service", "Error looking up node in address book: {e}");
1551                    return Ok(None);
1552                }
1553            }
1554        }
1555
1556        // Discard announcement messages we've already seen, otherwise update our last seen time.
1557        let relay = match self.db.gossip_mut().announced(announcer, announcement) {
1558            Ok(Some(id)) => {
1559                log::debug!(
1560                    target: "service",
1561                    "Stored announcement from {announcer} to be broadcast in {} (t={timestamp})",
1562                    (self.last_gossip + GOSSIP_INTERVAL) - self.clock
1563                );
1564                // Keep track of who relayed the message for later.
1565                self.relayed_by.entry(id).or_default().push(*relayer);
1566
1567                // Decide whether or not to relay this message, if it's fresh.
1568                // To avoid spamming peers on startup with historical gossip messages,
1569                // don't relay messages that are too old. We make an exception for node announcements,
1570                // since they are cached, and will hence often carry old timestamps.
1571                let relay = message.is_node_announcement()
1572                    || now - timestamp.to_local_time() <= MAX_TIME_DELTA;
1573                relay.then_some(id)
1574            }
1575            Ok(None) => {
1576                // FIXME: Still mark as relayed by this peer.
1577                // FIXME: Refs announcements should not be delayed, since they are only sent
1578                // to subscribers.
1579                debug!(target: "service", "Ignoring stale announcement from {announcer} (t={timestamp})");
1580                return Ok(None);
1581            }
1582            Err(e) => {
1583                error!(target: "service", "Error updating gossip entry from {announcer}: {e}");
1584                return Ok(None);
1585            }
1586        };
1587
1588        match message {
1589            // Process a peer inventory update announcement by (maybe) fetching.
1590            AnnouncementMessage::Inventory(message) => {
1591                self.emitter.emit(Event::InventoryAnnounced {
1592                    nid: *announcer,
1593                    inventory: message.inventory.to_vec(),
1594                    timestamp: message.timestamp,
1595                });
1596                match self.sync_routing(
1597                    message.inventory.iter().cloned(),
1598                    *announcer,
1599                    message.timestamp,
1600                ) {
1601                    Ok(synced) => {
1602                        if synced.is_empty() {
1603                            trace!(target: "service", "No routes updated by inventory announcement from {announcer}");
1604                            return Ok(None);
1605                        }
1606                    }
1607                    Err(e) => {
1608                        error!(target: "service", "Error processing inventory from {announcer}: {e}");
1609                        return Ok(None);
1610                    }
1611                }
1612                let mut missing = Vec::new();
1613                let nid = *self.nid();
1614
1615                // Here we handle the special case where the inventory we received is that of
1616                // a connected peer, as opposed to being relayed to us.
1617                if let Some(sess) = self.sessions.get_mut(announcer) {
1618                    for id in message.inventory.as_slice() {
1619                        // If we are connected to the announcer of this inventory, update the peer's
1620                        // subscription filter to include all inventory items. This way, we'll
1621                        // relay messages relating to the peer's inventory.
1622                        if let Some(sub) = &mut sess.subscribe {
1623                            sub.filter.insert(id);
1624                        }
1625
1626                        // If we're seeding and connected to the announcer, and we don't have
1627                        // the inventory, fetch it from the announcer.
1628                        if self.policies.is_seeding(id).expect(
1629                            "Service::handle_announcement: error accessing seeding configuration",
1630                        ) {
1631                            // Only if we do not have the repository locally do we fetch here.
1632                            // If we do have it, only fetch after receiving a ref announcement.
1633                            match self.db.routing().entry(id, &nid) {
1634                                Ok(entry) => {
1635                                    if entry.is_none() {
1636                                        missing.push(*id);
1637                                    }
1638                                }
1639                                Err(e) => error!(
1640                                    target: "service",
1641                                    "Error checking local inventory for {id}: {e}"
1642                                ),
1643                            }
1644                        }
1645                    }
1646                }
1647                // Since we have limited fetch capacity, it may be that we can't fetch an entire
1648                // inventory from a peer. Therefore we randomize the order of the RIDs to fetch
1649                // different RIDs from different peers in case multiple peers announce the same
1650                // RIDs.
1651                self.rng.shuffle(&mut missing);
1652
1653                for rid in missing {
1654                    debug!(target: "service", "Missing seeded inventory {rid}; initiating fetch..");
1655                    self.fetch(rid, *announcer, FETCH_TIMEOUT, None);
1656                }
1657                return Ok(relay);
1658            }
1659            AnnouncementMessage::Refs(message) => {
1660                self.emitter.emit(Event::RefsAnnounced {
1661                    nid: *announcer,
1662                    rid: message.rid,
1663                    refs: message.refs.to_vec(),
1664                    timestamp: message.timestamp,
1665                });
1666                // Empty announcements can be safely ignored.
1667                let Some(refs) = NonEmpty::from_vec(message.refs.to_vec()) else {
1668                    debug!(target: "service", "Skipping fetch, no refs in announcement for {} (t={timestamp})", message.rid);
1669                    return Ok(None);
1670                };
1671                // We update inventories when receiving ref announcements, as these could come
1672                // from a new repository being initialized.
1673                self.seed_discovered(message.rid, *announcer, message.timestamp);
1674
1675                // Update sync status of announcer for this repo.
1676                if let Some(refs) = refs.iter().find(|r| &r.remote == self.nid()) {
1677                    debug!(
1678                        target: "service",
1679                        "Refs announcement of {announcer} for {} contains our own remote at {} (t={})",
1680                        message.rid, refs.at, message.timestamp
1681                    );
1682                    match self.db.seeds_mut().synced(
1683                        &message.rid,
1684                        announcer,
1685                        refs.at,
1686                        message.timestamp,
1687                    ) {
1688                        Ok(updated) => {
1689                            if updated {
1690                                debug!(
1691                                    target: "service",
1692                                    "Updating sync status of {announcer} for {} to {}",
1693                                    message.rid, refs.at
1694                                );
1695                                self.emitter.emit(Event::RefsSynced {
1696                                    rid: message.rid,
1697                                    remote: *announcer,
1698                                    at: refs.at,
1699                                });
1700                            } else {
1701                                debug!(
1702                                    target: "service",
1703                                    "Sync status of {announcer} was not updated for {}",
1704                                    message.rid,
1705                                );
1706                            }
1707                        }
1708                        Err(e) => {
1709                            error!(target: "service", "Error updating sync status for {}: {e}", message.rid);
1710                        }
1711                    }
1712                }
1713                let repo_entry = self.policies.seed_policy(&message.rid).expect(
1714                    "Service::handle_announcement: error accessing repo seeding configuration",
1715                );
1716                let SeedingPolicy::Allow { scope } = repo_entry.policy else {
1717                    debug!(
1718                        target: "service",
1719                        "Ignoring refs announcement from {announcer}: repository {} isn't seeded (t={timestamp})",
1720                        message.rid
1721                    );
1722                    return Ok(None);
1723                };
1724                // Refs can be relayed by peers who don't have the data in storage,
1725                // therefore we only check whether we are connected to the *announcer*,
1726                // which is required by the protocol to only announce refs it has.
1727                let Some(remote) = self.sessions.get(announcer).cloned() else {
1728                    trace!(
1729                        target: "service",
1730                        "Skipping fetch of {}, no sessions connected to {announcer}",
1731                        message.rid
1732                    );
1733                    return Ok(relay);
1734                };
1735                // Finally, start the fetch.
1736                self.fetch_refs_at(message.rid, remote.id, refs, scope, FETCH_TIMEOUT, None);
1737
1738                return Ok(relay);
1739            }
1740            AnnouncementMessage::Node(
1741                ann @ NodeAnnouncement {
1742                    features,
1743                    addresses,
1744                    ..
1745                },
1746            ) => {
1747                self.emitter.emit(Event::NodeAnnounced {
1748                    nid: *announcer,
1749                    alias: ann.alias.clone(),
1750                    timestamp: ann.timestamp,
1751                    features: *features,
1752                    addresses: addresses.to_vec(),
1753                });
1754                // If this node isn't a seed, we're not interested in adding it
1755                // to our address book, but other nodes may be, so we relay the message anyway.
1756                if !features.has(Features::SEED) {
1757                    return Ok(relay);
1758                }
1759
1760                match self.db.addresses_mut().insert(
1761                    announcer,
1762                    ann.version,
1763                    ann.features,
1764                    &ann.alias,
1765                    ann.work(),
1766                    &ann.agent,
1767                    timestamp,
1768                    addresses
1769                        .iter()
1770                        // Ignore non-routable addresses unless received from a local network
1771                        // peer. This allows the node to function in a local network.
1772                        .filter(|a| a.is_routable() || relayer_addr.is_local())
1773                        .map(|a| KnownAddress::new(a.clone(), address::Source::Peer)),
1774                ) {
1775                    Ok(updated) => {
1776                        // Only relay if we received new information.
1777                        if updated {
1778                            debug!(
1779                                target: "service",
1780                                "Address store entry for node {announcer} updated at {timestamp}"
1781                            );
1782                            return Ok(relay);
1783                        }
1784                    }
1785                    Err(err) => {
1786                        // An error here is due to a fault in our address store.
1787                        error!(target: "service", "Error processing node announcement from {announcer}: {err}");
1788                    }
1789                }
1790            }
1791        }
1792        Ok(None)
1793    }
1794
1795    pub fn handle_info(&mut self, remote: NodeId, info: &Info) -> Result<(), session::Error> {
1796        match info {
1797            // Nb. We don't currently send this message.
1798            Info::RefsAlreadySynced { rid, at } => {
1799                debug!(target: "service", "Refs already synced for {rid} by {remote}");
1800                self.emitter.emit(Event::RefsSynced {
1801                    rid: *rid,
1802                    remote,
1803                    at: *at,
1804                });
1805            }
1806        }
1807
1808        Ok(())
1809    }
1810
1811    pub fn handle_message(
1812        &mut self,
1813        remote: &NodeId,
1814        message: Message,
1815    ) -> Result<(), session::Error> {
1816        let local = self.node_id();
1817        let relay = self.config.is_relay();
1818        let Some(peer) = self.sessions.get_mut(remote) else {
1819            warn!(target: "service", "Session not found for {remote}");
1820            return Ok(());
1821        };
1822        peer.last_active = self.clock;
1823
1824        let limit: RateLimit = match peer.link {
1825            Link::Outbound => self.config.limits.rate.outbound.clone().into(),
1826            Link::Inbound => self.config.limits.rate.inbound.clone().into(),
1827        };
1828        if self
1829            .limiter
1830            .limit(peer.addr.clone().into(), Some(remote), &limit, self.clock)
1831        {
1832            debug!(target: "service", "Rate limiting message from {remote} ({})", peer.addr);
1833            return Ok(());
1834        }
1835        message.log(log::Level::Debug, remote, Link::Inbound);
1836
1837        let connected = match &mut peer.state {
1838            session::State::Disconnected { .. } => {
1839                debug!(target: "service", "Ignoring message from disconnected peer {}", peer.id);
1840                return Ok(());
1841            }
1842            // In case of a discrepancy between the service state and the state of the underlying
1843            // wire protocol, we may receive a message from a peer that we consider not fully connected
1844            // at the service level. To remedy this, we simply transition the peer to a connected state.
1845            //
1846            // This is not ideal, but until the wire protocol and service are unified, it's the simplest
1847            // solution to converge towards the same state.
1848            session::State::Attempted | session::State::Initial => {
1849                debug!(target: "service", "Received unexpected message from connecting peer {}", peer.id);
1850                debug!(target: "service", "Transitioning peer {} to 'connected' state", peer.id);
1851
1852                peer.to_connected(self.clock);
1853
1854                None
1855            }
1856            session::State::Connected {
1857                ping, latencies, ..
1858            } => Some((ping, latencies)),
1859        };
1860
1861        trace!(target: "service", "Received message {message:?} from {remote}");
1862
1863        match message {
1864            // Process a peer announcement.
1865            Message::Announcement(ann) => {
1866                let relayer = remote;
1867                let relayer_addr = peer.addr.clone();
1868
1869                if let Some(id) = self.handle_announcement(relayer, &relayer_addr, &ann)? {
1870                    if self.config.is_relay() {
1871                        if let AnnouncementMessage::Inventory(_) = ann.message {
1872                            if let Err(e) = self
1873                                .database_mut()
1874                                .gossip_mut()
1875                                .set_relay(id, gossip::RelayStatus::Relay)
1876                            {
1877                                error!(target: "service", "Error setting relay flag for message: {e}");
1878                                return Ok(());
1879                            }
1880                        } else {
1881                            self.relay(id, ann);
1882                        }
1883                    }
1884                }
1885            }
1886            Message::Subscribe(subscribe) => {
1887                // Filter announcements by interest.
1888                match self
1889                    .db
1890                    .gossip()
1891                    .filtered(&subscribe.filter, subscribe.since, subscribe.until)
1892                {
1893                    Ok(anns) => {
1894                        for ann in anns {
1895                            let ann = match ann {
1896                                Ok(a) => a,
1897                                Err(e) => {
1898                                    error!(target: "service", "Error reading gossip message from store: {e}");
1899                                    continue;
1900                                }
1901                            };
1902                            // Don't send announcements authored by the remote, back to the remote.
1903                            if ann.node == *remote {
1904                                continue;
1905                            }
1906                            // Only send messages if we're a relay, or it's our own messages.
1907                            if relay || ann.node == local {
1908                                self.outbox.write(peer, ann.into());
1909                            }
1910                        }
1911                    }
1912                    Err(e) => {
1913                        error!(target: "service", "Error querying gossip messages from store: {e}");
1914                    }
1915                }
1916                peer.subscribe = Some(subscribe);
1917            }
1918            Message::Info(info) => {
1919                self.handle_info(*remote, &info)?;
1920            }
1921            Message::Ping(Ping { ponglen, .. }) => {
1922                // Ignore pings which ask for too much data.
1923                if ponglen > Ping::MAX_PONG_ZEROES {
1924                    return Ok(());
1925                }
1926                self.outbox.write(
1927                    peer,
1928                    Message::Pong {
1929                        zeroes: ZeroBytes::new(ponglen),
1930                    },
1931                );
1932            }
1933            Message::Pong { zeroes } => {
1934                if let Some((ping, latencies)) = connected {
1935                    if let session::PingState::AwaitingResponse {
1936                        len: ponglen,
1937                        since,
1938                    } = *ping
1939                    {
1940                        if (ponglen as usize) == zeroes.len() {
1941                            *ping = session::PingState::Ok;
1942                            // Keep track of peer latency.
1943                            latencies.push_back(self.clock - since);
1944                            if latencies.len() > MAX_LATENCIES {
1945                                latencies.pop_front();
1946                            }
1947                        }
1948                    }
1949                }
1950            }
1951        }
1952        Ok(())
1953    }
1954
1955    /// A convenient method to check if we should fetch from a `RefsAnnouncement` with `scope`.
1956    fn refs_status_of(
1957        &self,
1958        rid: RepoId,
1959        refs: NonEmpty<RefsAt>,
1960        scope: &policy::Scope,
1961    ) -> Result<RefsStatus, Error> {
1962        let mut refs = RefsStatus::new(rid, refs, self.db.refs())?;
1963        // Check that there's something we want.
1964        if refs.want.is_empty() {
1965            return Ok(refs);
1966        }
1967        // Check scope.
1968        let mut refs = match scope {
1969            policy::Scope::All => refs,
1970            policy::Scope::Followed => match self.policies.namespaces_for(&self.storage, &rid) {
1971                Ok(Namespaces::All) => refs,
1972                Ok(Namespaces::Followed(followed)) => {
1973                    refs.want.retain(|r| followed.contains(&r.remote));
1974                    refs
1975                }
1976                Err(e) => return Err(e.into()),
1977            },
1978        };
1979        // Remove our own remote, we don't want to fetch that.
1980        refs.want.retain(|r| r.remote != self.node_id());
1981
1982        Ok(refs)
1983    }
1984
1985    /// Add a seed to our routing table.
1986    fn seed_discovered(&mut self, rid: RepoId, nid: NodeId, time: Timestamp) {
1987        if let Ok(result) = self.db.routing_mut().add_inventory([&rid], nid, time) {
1988            if let &[(_, InsertResult::SeedAdded)] = result.as_slice() {
1989                self.emitter.emit(Event::SeedDiscovered { rid, nid });
1990                debug!(target: "service", "Routing table updated for {rid} with seed {nid}");
1991            }
1992        }
1993    }
1994
1995    /// Set of initial messages to send to a peer.
1996    fn initial(&mut self, _link: Link) -> Vec<Message> {
1997        let now = self.clock();
1998        let filter = self.filter();
1999
2000        // TODO: Only subscribe to outbound connections, otherwise we will consume too
2001        // much bandwidth.
2002
2003        // If we've been previously connected to the network, we'll have received gossip messages.
2004        // Instead of simply taking the last timestamp we try to ensure we don't miss any
2005        // messages due un-synchronized clocks.
2006        //
2007        // If this is our first connection to the network, we just ask for a fixed backlog
2008        // of messages to get us started.
2009        let since = if let Some(last) = self.last_online_at {
2010            Timestamp::from(last - SUBSCRIBE_BACKLOG_DELTA)
2011        } else {
2012            (*now - INITIAL_SUBSCRIBE_BACKLOG_DELTA).into()
2013        };
2014        debug!(target: "service", "Subscribing to messages since timestamp {since}..");
2015
2016        vec![
2017            Message::node(self.node.clone(), &self.signer),
2018            Message::inventory(self.inventory.clone(), &self.signer),
2019            Message::subscribe(filter, since, Timestamp::MAX),
2020        ]
2021    }
2022
2023    /// Try to guess whether we're online or not.
2024    fn is_online(&self) -> bool {
2025        self.sessions
2026            .connected()
2027            .filter(|(_, s)| s.addr.is_routable() && s.last_active >= self.clock - IDLE_INTERVAL)
2028            .count()
2029            > 0
2030    }
2031
2032    /// Remove a local repository from our inventory.
2033    fn remove_inventory(&mut self, rid: &RepoId) -> Result<bool, Error> {
2034        let node = self.node_id();
2035        let now = self.timestamp();
2036
2037        let removed = self.db.routing_mut().remove_inventory(rid, &node)?;
2038        if removed {
2039            self.refresh_and_announce_inventory(now)?;
2040        }
2041        Ok(removed)
2042    }
2043
2044    /// Add a local repository to our inventory.
2045    fn add_inventory(&mut self, rid: RepoId) -> Result<bool, Error> {
2046        let node = self.node_id();
2047        let now = self.timestamp();
2048
2049        if !self.storage.contains(&rid)? {
2050            error!(target: "service", "Attempt to add non-existing inventory {rid}: repository not found in storage");
2051            return Ok(false);
2052        }
2053        // Add to our local inventory.
2054        let updates = self.db.routing_mut().add_inventory([&rid], node, now)?;
2055        let updated = !updates.is_empty();
2056
2057        if updated {
2058            self.refresh_and_announce_inventory(now)?;
2059        }
2060        Ok(updated)
2061    }
2062
2063    /// Update cached inventory message, and announce new inventory to peers.
2064    fn refresh_and_announce_inventory(&mut self, time: Timestamp) -> Result<(), Error> {
2065        let inventory = self.inventory()?;
2066
2067        self.inventory = gossip::inventory(time, inventory);
2068        self.announce_inventory();
2069
2070        Ok(())
2071    }
2072
2073    /// Get our local inventory.
2074    ///
2075    /// A node's inventory is the advertized list of repositories offered by a node.
2076    ///
2077    /// A node's inventory consists of *public* repositories that are seeded and available locally
2078    /// in the node's storage. We use the routing table as the canonical state of all inventories,
2079    /// including the local node's.
2080    ///
2081    /// When a repository is unseeded, it is also removed from the inventory. Private repositories
2082    /// are *not* part of a node's inventory.
2083    fn inventory(&self) -> Result<HashSet<RepoId>, Error> {
2084        self.db
2085            .routing()
2086            .get_inventory(self.nid())
2087            .map_err(Error::from)
2088    }
2089
2090    /// Process a peer inventory announcement by updating our routing table.
2091    /// This function expects the peer's full inventory, and prunes entries that are not in the
2092    /// given inventory.
2093    fn sync_routing(
2094        &mut self,
2095        inventory: impl IntoIterator<Item = RepoId>,
2096        from: NodeId,
2097        timestamp: Timestamp,
2098    ) -> Result<SyncedRouting, Error> {
2099        let mut synced = SyncedRouting::default();
2100        let included = inventory.into_iter().collect::<BTreeSet<_>>();
2101
2102        for (rid, result) in
2103            self.db
2104                .routing_mut()
2105                .add_inventory(included.iter(), from, timestamp)?
2106        {
2107            match result {
2108                InsertResult::SeedAdded => {
2109                    debug!(target: "service", "Routing table updated for {rid} with seed {from}");
2110                    self.emitter.emit(Event::SeedDiscovered { rid, nid: from });
2111
2112                    if self
2113                        .policies
2114                        .is_seeding(&rid)
2115                        .expect("Service::process_inventory: error accessing seeding configuration")
2116                    {
2117                        // TODO: We should fetch here if we're already connected, case this seed has
2118                        // refs we don't have.
2119                    }
2120                    synced.added.push(rid);
2121                }
2122                InsertResult::TimeUpdated => {
2123                    synced.updated.push(rid);
2124                }
2125                InsertResult::NotUpdated => {}
2126            }
2127        }
2128        for rid in self.db.routing().get_inventory(&from)?.into_iter() {
2129            if !included.contains(&rid) {
2130                if self.db.routing_mut().remove_inventory(&rid, &from)? {
2131                    synced.removed.push(rid);
2132                    self.emitter.emit(Event::SeedDropped { rid, nid: from });
2133                }
2134            }
2135        }
2136        Ok(synced)
2137    }
2138
2139    /// Return a refs announcement including the given remotes.
2140    fn refs_announcement_for(
2141        &mut self,
2142        rid: RepoId,
2143        remotes: impl IntoIterator<Item = NodeId>,
2144    ) -> Result<(Announcement, Vec<RefsAt>), Error> {
2145        let repo = self.storage.repository(rid)?;
2146        let timestamp = self.timestamp();
2147        let mut refs = BoundedVec::<_, REF_REMOTE_LIMIT>::new();
2148
2149        for remote_id in remotes.into_iter() {
2150            let refs_at = RefsAt::new(&repo, remote_id)?;
2151
2152            if refs.push(refs_at).is_err() {
2153                warn!(
2154                    target: "service",
2155                    "refs announcement limit ({REF_REMOTE_LIMIT}) exceeded, peers will see only some of your repository references",
2156                );
2157                break;
2158            }
2159        }
2160
2161        let msg = AnnouncementMessage::from(RefsAnnouncement {
2162            rid,
2163            refs: refs.clone(),
2164            timestamp,
2165        });
2166        Ok((msg.signed(&self.signer), refs.into()))
2167    }
2168
2169    /// Announce our own refs for the given repo.
2170    fn announce_own_refs(&mut self, rid: RepoId, doc: Doc) -> Result<Vec<RefsAt>, Error> {
2171        let (refs, timestamp) = self.announce_refs(rid, doc, [self.node_id()])?;
2172
2173        // Update refs database with our signed refs branches.
2174        // This isn't strictly necessary for now, as we only use the database for fetches, and
2175        // we don't fetch our own refs that are announced, but it's for good measure.
2176        if let &[r] = refs.as_slice() {
2177            self.emitter.emit(Event::LocalRefsAnnounced {
2178                rid,
2179                refs: r,
2180                timestamp,
2181            });
2182            if let Err(e) = self.database_mut().refs_mut().set(
2183                &rid,
2184                &r.remote,
2185                &SIGREFS_BRANCH,
2186                r.at,
2187                timestamp.to_local_time(),
2188            ) {
2189                error!(
2190                    target: "service",
2191                    "Error updating refs database for `rad/sigrefs` of {} in {rid}: {e}",
2192                    r.remote
2193                );
2194            }
2195        }
2196        Ok(refs)
2197    }
2198
2199    /// Announce local refs for given repo.
2200    fn announce_refs(
2201        &mut self,
2202        rid: RepoId,
2203        doc: Doc,
2204        remotes: impl IntoIterator<Item = NodeId>,
2205    ) -> Result<(Vec<RefsAt>, Timestamp), Error> {
2206        let (ann, refs) = self.refs_announcement_for(rid, remotes)?;
2207        let timestamp = ann.timestamp();
2208        let peers = self.sessions.connected().map(|(_, p)| p);
2209
2210        // Update our sync status for our own refs. This is useful for determining if refs were
2211        // updated while the node was stopped.
2212        if let Some(refs) = refs.iter().find(|r| r.remote == ann.node) {
2213            info!(
2214                target: "service",
2215                "Announcing own refs for {rid} to peers ({}) (t={timestamp})..",
2216                refs.at
2217            );
2218            // Update our local node's sync status to mark the refs as announced.
2219            if let Err(e) = self
2220                .db
2221                .seeds_mut()
2222                .synced(&rid, &ann.node, refs.at, timestamp)
2223            {
2224                error!(target: "service", "Error updating sync status for local node: {e}");
2225            } else {
2226                debug!(target: "service", "Saved local sync status for {rid}..");
2227            }
2228        }
2229
2230        self.outbox.announce(
2231            ann,
2232            peers.filter(|p| {
2233                // Only announce to peers who are allowed to view this repo.
2234                doc.is_visible_to(&p.id.into())
2235            }),
2236            self.db.gossip_mut(),
2237        );
2238        Ok((refs, timestamp))
2239    }
2240
2241    fn reconnect(&mut self, nid: NodeId, addr: Address) -> bool {
2242        if let Some(sess) = self.sessions.get_mut(&nid) {
2243            sess.to_initial();
2244            self.outbox.connect(nid, addr);
2245
2246            return true;
2247        }
2248        false
2249    }
2250
2251    fn connect(&mut self, nid: NodeId, addr: Address) -> Result<(), ConnectError> {
2252        debug!(target: "service", "Connecting to {nid} ({addr})..");
2253
2254        if nid == self.node_id() {
2255            return Err(ConnectError::SelfConnection);
2256        }
2257        if self.sessions.contains_key(&nid) {
2258            return Err(ConnectError::SessionExists { nid });
2259        }
2260        if self.sessions.outbound().count() >= self.config.limits.connection.outbound.into() {
2261            return Err(ConnectError::LimitReached { nid, addr });
2262        }
2263        let persistent = self.config.is_persistent(&nid);
2264        let timestamp: Timestamp = self.clock.into();
2265
2266        if let Err(e) = self.db.addresses_mut().attempted(&nid, &addr, timestamp) {
2267            error!(target: "service", "Error updating address book with connection attempt: {e}");
2268        }
2269        self.sessions.insert(
2270            nid,
2271            Session::outbound(
2272                nid,
2273                addr.clone(),
2274                persistent,
2275                self.rng.clone(),
2276                self.config.limits.clone(),
2277            ),
2278        );
2279        self.outbox.connect(nid, addr);
2280
2281        Ok(())
2282    }
2283
2284    fn seeds(&self, rid: &RepoId) -> Result<Seeds, Error> {
2285        let mut seeds = Seeds::new(self.rng.clone());
2286
2287        // First build a list from peers that have synced our own refs, if any.
2288        // This step is skipped if we don't have the repository yet, or don't have
2289        // our own refs.
2290        if let Ok(repo) = self.storage.repository(*rid) {
2291            if let Ok(local) = RefsAt::new(&repo, self.node_id()) {
2292                for seed in self.db.seeds().seeds_for(rid)? {
2293                    let seed = seed?;
2294                    let state = self.sessions.get(&seed.nid).map(|s| s.state.clone());
2295                    let synced = if local.at == seed.synced_at.oid {
2296                        SyncStatus::Synced { at: seed.synced_at }
2297                    } else {
2298                        let local = SyncedAt::new(local.at, &repo)?;
2299
2300                        SyncStatus::OutOfSync {
2301                            local,
2302                            remote: seed.synced_at,
2303                        }
2304                    };
2305                    seeds.insert(Seed::new(seed.nid, seed.addresses, state, Some(synced)));
2306                }
2307            }
2308        }
2309
2310        // Then, add peers we know about but have no information about the sync status.
2311        // These peers have announced that they seed the repository via an inventory
2312        // announcement, but we haven't received any ref announcements from them.
2313        for nid in self.db.routing().get(rid)? {
2314            if nid == self.node_id() {
2315                continue;
2316            }
2317            if seeds.contains(&nid) {
2318                // We already have a richer entry for this node.
2319                continue;
2320            }
2321            let addrs = self.db.addresses().addresses_of(&nid)?;
2322            let state = self.sessions.get(&nid).map(|s| s.state.clone());
2323
2324            seeds.insert(Seed::new(nid, addrs, state, None));
2325        }
2326        Ok(seeds)
2327    }
2328
2329    /// Return a new filter object, based on our seeding policy.
2330    fn filter(&self) -> Filter {
2331        if self.config.seeding_policy.is_allow() {
2332            // TODO: Remove bits for blocked repos.
2333            Filter::default()
2334        } else {
2335            self.filter.clone()
2336        }
2337    }
2338
2339    /// Get a timestamp for using in announcements.
2340    /// Never returns the same timestamp twice.
2341    fn timestamp(&mut self) -> Timestamp {
2342        let now = Timestamp::from(self.clock);
2343        if *now > *self.last_timestamp {
2344            self.last_timestamp = now;
2345        } else {
2346            self.last_timestamp = self.last_timestamp + 1;
2347        }
2348        self.last_timestamp
2349    }
2350
2351    fn relay(&mut self, id: gossip::AnnouncementId, ann: Announcement) {
2352        let announcer = ann.node;
2353        let relayed_by = self.relayed_by.get(&id);
2354        let rid = if let AnnouncementMessage::Refs(RefsAnnouncement { rid, .. }) = ann.message {
2355            Some(rid)
2356        } else {
2357            None
2358        };
2359        // Choose peers we should relay this message to.
2360        // 1. Don't relay to a peer who sent us this message.
2361        // 2. Don't relay to the peer who signed this announcement.
2362        let relay_to = self
2363            .sessions
2364            .connected()
2365            .filter(|(id, _)| {
2366                relayed_by
2367                    .map(|relayers| !relayers.contains(id))
2368                    .unwrap_or(true) // If there are no relayers we let it through.
2369            })
2370            .filter(|(id, _)| **id != announcer)
2371            .filter(|(id, _)| {
2372                if let Some(rid) = rid {
2373                    // Only relay this message if the peer is allowed to know about the
2374                    // repository. If we don't have the repository, return `false` because
2375                    // we can't determine if it's private or public.
2376                    self.storage
2377                        .get(rid)
2378                        .ok()
2379                        .flatten()
2380                        .map(|doc| doc.is_visible_to(&(*id).into()))
2381                        .unwrap_or(false)
2382                } else {
2383                    // Announcement doesn't concern a specific repository, let it through.
2384                    true
2385                }
2386            })
2387            .map(|(_, p)| p);
2388
2389        self.outbox.relay(ann, relay_to);
2390    }
2391
2392    ////////////////////////////////////////////////////////////////////////////
2393    // Periodic tasks
2394    ////////////////////////////////////////////////////////////////////////////
2395
2396    fn relay_announcements(&mut self) -> Result<(), Error> {
2397        let now = self.clock.into();
2398        let rows = self.database_mut().gossip_mut().relays(now)?;
2399        let local = self.node_id();
2400
2401        for (id, msg) in rows {
2402            let announcer = msg.node;
2403            if announcer == local {
2404                // Don't relay our own stored gossip messages.
2405                continue;
2406            }
2407            self.relay(id, msg);
2408        }
2409        Ok(())
2410    }
2411
2412    /// Announce our inventory to all connected peers, unless it was already announced.
2413    fn announce_inventory(&mut self) {
2414        let timestamp = self.inventory.timestamp.to_local_time();
2415
2416        if self.last_inventory == timestamp {
2417            debug!(target: "service", "Skipping redundant inventory announcement (t={})", self.inventory.timestamp);
2418            return;
2419        }
2420        let msg = AnnouncementMessage::from(self.inventory.clone());
2421
2422        self.outbox.announce(
2423            msg.signed(&self.signer),
2424            self.sessions.connected().map(|(_, p)| p),
2425            self.db.gossip_mut(),
2426        );
2427        self.last_inventory = timestamp;
2428    }
2429
2430    fn prune_routing_entries(&mut self, now: &LocalTime) -> Result<(), routing::Error> {
2431        let count = self.db.routing().len()?;
2432        if count <= self.config.limits.routing_max_size.into() {
2433            return Ok(());
2434        }
2435
2436        let delta = count - usize::from(self.config.limits.routing_max_size);
2437        let nid = self.node_id();
2438        self.db.routing_mut().prune(
2439            (*now - LocalDuration::from(self.config.limits.routing_max_age)).into(),
2440            Some(delta),
2441            &nid,
2442        )?;
2443        Ok(())
2444    }
2445
2446    fn disconnect_unresponsive_peers(&mut self, now: &LocalTime) {
2447        let stale = self
2448            .sessions
2449            .connected()
2450            .filter(|(_, session)| *now - session.last_active >= STALE_CONNECTION_TIMEOUT);
2451
2452        for (_, session) in stale {
2453            debug!(target: "service", "Disconnecting unresponsive peer {}..", session.id);
2454
2455            // TODO: Should we switch the session state to "disconnected" even before receiving
2456            // an official "disconnect"? Otherwise we keep pinging until we get the disconnection.
2457
2458            self.outbox.disconnect(
2459                session.id,
2460                DisconnectReason::Session(session::Error::Timeout),
2461            );
2462        }
2463    }
2464
2465    /// Ensure connection health by pinging connected peers.
2466    fn keep_alive(&mut self, now: &LocalTime) {
2467        let inactive_sessions = self
2468            .sessions
2469            .connected_mut()
2470            .filter(|(_, session)| *now - session.last_active >= KEEP_ALIVE_DELTA)
2471            .map(|(_, session)| session);
2472        for session in inactive_sessions {
2473            session.ping(self.clock, &mut self.outbox).ok();
2474        }
2475    }
2476
2477    /// Get a list of peers available to connect to, sorted by lowest penalty.
2478    fn available_peers(&mut self) -> Vec<Peer> {
2479        match self.db.addresses().entries() {
2480            Ok(entries) => {
2481                // Nb. we don't want to connect to any peers that already have a session with us,
2482                // even if it's in a disconnected state. Those sessions are re-attempted automatically.
2483                let mut peers = entries
2484                    .filter(|entry| entry.version == PROTOCOL_VERSION)
2485                    .filter(|entry| !entry.address.banned)
2486                    .filter(|entry| !entry.penalty.is_connect_threshold_reached())
2487                    .filter(|entry| !self.sessions.contains_key(&entry.node))
2488                    .filter(|entry| !self.config.external_addresses.contains(&entry.address.addr))
2489                    .filter(|entry| &entry.node != self.nid())
2490                    .fold(HashMap::new(), |mut acc, entry| {
2491                        acc.entry(entry.node)
2492                            .and_modify(|e: &mut Peer| e.addresses.push(entry.address.clone()))
2493                            .or_insert_with(|| Peer {
2494                                nid: entry.node,
2495                                addresses: vec![entry.address],
2496                                penalty: entry.penalty,
2497                            });
2498                        acc
2499                    })
2500                    .into_values()
2501                    .collect::<Vec<_>>();
2502                peers.sort_by_key(|p| p.penalty);
2503                peers
2504            }
2505            Err(e) => {
2506                error!(target: "service", "Unable to lookup available peers in address book: {e}");
2507                Vec::new()
2508            }
2509        }
2510    }
2511
2512    /// Fetch all repositories that are seeded but missing from storage.
2513    fn fetch_missing_repositories(&mut self) -> Result<(), Error> {
2514        let policies = self.policies.seed_policies()?.collect::<Vec<_>>();
2515        for policy in policies {
2516            let policy = match policy {
2517                Ok(policy) => policy,
2518                Err(err) => {
2519                    log::error!(target: "protocol::filter", "Failed to read seed policy: {err}");
2520                    continue;
2521                }
2522            };
2523
2524            let rid = policy.rid;
2525
2526            if !policy.is_allow() {
2527                continue;
2528            }
2529            if self.storage.contains(&rid)? {
2530                continue;
2531            }
2532            match self.seeds(&rid) {
2533                Ok(seeds) => {
2534                    if let Some(connected) = NonEmpty::from_vec(seeds.connected().collect()) {
2535                        for seed in connected {
2536                            self.fetch(rid, seed.nid, FETCH_TIMEOUT, None);
2537                        }
2538                    } else {
2539                        // TODO: We should make sure that this fetch is retried later, either
2540                        // when we connect to a seed, or when we discover a new seed.
2541                        // Since new connections and routing table updates are both conditions for
2542                        // fetching, we should trigger fetches when those conditions appear.
2543                        // Another way to handle this would be to update our database, saying
2544                        // that we're trying to fetch a certain repo. We would then just
2545                        // iterate over those entries in the above circumstances. This is
2546                        // merely an optimization though, we can also iterate over all seeded
2547                        // repos and check which ones are not in our inventory.
2548                        debug!(target: "service", "No connected seeds found for {rid}..");
2549                    }
2550                }
2551                Err(e) => {
2552                    error!(target: "service", "Couldn't fetch missing repo {rid}: failed to lookup seeds: {e}");
2553                }
2554            }
2555        }
2556        Ok(())
2557    }
2558
2559    /// Run idle task for all connections.
2560    fn idle_connections(&mut self) {
2561        for (_, sess) in self.sessions.iter_mut() {
2562            sess.idle(self.clock);
2563
2564            if sess.is_stable() {
2565                // Mark as connected once connection is stable.
2566                if let Err(e) =
2567                    self.db
2568                        .addresses_mut()
2569                        .connected(&sess.id, &sess.addr, self.clock.into())
2570                {
2571                    error!(target: "service", "Error updating address book with connection: {e}");
2572                }
2573            }
2574        }
2575    }
2576
2577    /// Try to maintain a target number of connections.
2578    fn maintain_connections(&mut self) {
2579        let PeerConfig::Dynamic = self.config.peers else {
2580            return;
2581        };
2582        trace!(target: "service", "Maintaining connections..");
2583
2584        let target = TARGET_OUTBOUND_PEERS;
2585        let now = self.clock;
2586        let outbound = self
2587            .sessions
2588            .values()
2589            .filter(|s| s.link.is_outbound())
2590            .filter(|s| s.is_connected() || s.is_connecting())
2591            .count();
2592        let wanted = target.saturating_sub(outbound);
2593
2594        // Don't connect to more peers than needed.
2595        if wanted == 0 {
2596            return;
2597        }
2598
2599        // Peers available to connect to.
2600        let available = self
2601            .available_peers()
2602            .into_iter()
2603            .filter_map(|peer| {
2604                peer.addresses
2605                    .into_iter()
2606                    .find(|ka| match (ka.last_success, ka.last_attempt) {
2607                        // If we succeeded the last time we tried, this is a good address.
2608                        // If it's been long enough that we failed to connect, we also try again.
2609                        (Some(success), Some(attempt)) => {
2610                            success >= attempt || now - attempt >= CONNECTION_RETRY_DELTA
2611                        }
2612                        // If we haven't succeeded yet, and we waited long enough, we can try this address.
2613                        (None, Some(attempt)) => now - attempt >= CONNECTION_RETRY_DELTA,
2614                        // If we have no failed attempts for this address, it's worth a try.
2615                        (_, None) => true,
2616                    })
2617                    .map(|ka| (peer.nid, ka))
2618            })
2619            .filter(|(_, ka)| match AddressType::from(&ka.addr) {
2620                // Only consider onion addresses if configured.
2621                AddressType::Onion => self.config.onion.is_some(),
2622                AddressType::Dns | AddressType::Ipv4 | AddressType::Ipv6 => true,
2623            });
2624
2625        // Peers we are going to attempt connections to.
2626        let connect = available.take(wanted).collect::<Vec<_>>();
2627        if connect.len() < wanted {
2628            log::debug!(
2629                target: "service",
2630                "Not enough available peers to connect to (available={}, wanted={wanted})",
2631                connect.len()
2632            );
2633        }
2634        for (id, ka) in connect {
2635            if let Err(e) = self.connect(id, ka.addr.clone()) {
2636                error!(target: "service", "Service::maintain_connections connection error: {e}");
2637            }
2638        }
2639    }
2640
2641    /// Maintain persistent peer connections.
2642    fn maintain_persistent(&mut self) {
2643        trace!(target: "service", "Maintaining persistent peers..");
2644
2645        let now = self.local_time();
2646        let mut reconnect = Vec::new();
2647
2648        for (nid, session) in self.sessions.iter_mut() {
2649            if let Some(addr) = self.config.peer(nid) {
2650                if let session::State::Disconnected { retry_at, .. } = &mut session.state {
2651                    // TODO: Try to reconnect only if the peer was attempted. A disconnect without
2652                    // even a successful attempt means that we're unlikely to be able to reconnect.
2653
2654                    if now >= *retry_at {
2655                        reconnect.push((*nid, addr.clone(), session.attempts()));
2656                    }
2657                }
2658            }
2659        }
2660
2661        for (nid, addr, attempts) in reconnect {
2662            if self.reconnect(nid, addr) {
2663                debug!(target: "service", "Reconnecting to {nid} (attempts={attempts})...");
2664            }
2665        }
2666    }
2667}
2668
2669/// Gives read access to the service state.
2670pub trait ServiceState {
2671    /// Get the Node ID.
2672    fn nid(&self) -> &NodeId;
2673    /// Get the existing sessions.
2674    fn sessions(&self) -> &Sessions;
2675    /// Get fetch state.
2676    fn fetching(&self) -> &HashMap<RepoId, FetchState>;
2677    /// Get outbox.
2678    fn outbox(&self) -> &Outbox;
2679    /// Get rate limiter.
2680    fn limiter(&self) -> &RateLimiter;
2681    /// Get event emitter.
2682    fn emitter(&self) -> &Emitter<Event>;
2683    /// Get a repository from storage.
2684    fn get(&self, rid: RepoId) -> Result<Option<Doc>, RepositoryError>;
2685    /// Get the clock.
2686    fn clock(&self) -> &LocalTime;
2687    /// Get the clock mutably.
2688    fn clock_mut(&mut self) -> &mut LocalTime;
2689    /// Get service configuration.
2690    fn config(&self) -> &Config;
2691    /// Get service metrics.
2692    fn metrics(&self) -> &Metrics;
2693}
2694
2695impl<D, S, G> ServiceState for Service<D, S, G>
2696where
2697    D: routing::Store,
2698    G: crypto::signature::Signer<crypto::Signature>,
2699    S: ReadStorage,
2700{
2701    fn nid(&self) -> &NodeId {
2702        self.signer.public_key()
2703    }
2704
2705    fn sessions(&self) -> &Sessions {
2706        &self.sessions
2707    }
2708
2709    fn fetching(&self) -> &HashMap<RepoId, FetchState> {
2710        &self.fetching
2711    }
2712
2713    fn outbox(&self) -> &Outbox {
2714        &self.outbox
2715    }
2716
2717    fn limiter(&self) -> &RateLimiter {
2718        &self.limiter
2719    }
2720
2721    fn emitter(&self) -> &Emitter<Event> {
2722        &self.emitter
2723    }
2724
2725    fn get(&self, rid: RepoId) -> Result<Option<Doc>, RepositoryError> {
2726        self.storage.get(rid)
2727    }
2728
2729    fn clock(&self) -> &LocalTime {
2730        &self.clock
2731    }
2732
2733    fn clock_mut(&mut self) -> &mut LocalTime {
2734        &mut self.clock
2735    }
2736
2737    fn config(&self) -> &Config {
2738        &self.config
2739    }
2740
2741    fn metrics(&self) -> &Metrics {
2742        &self.metrics
2743    }
2744}
2745
2746/// Disconnect reason.
2747#[derive(Debug)]
2748pub enum DisconnectReason {
2749    /// Error while dialing the remote. This error occures before a connection is
2750    /// even established. Errors of this kind are usually not transient.
2751    Dial(Arc<dyn std::error::Error + Sync + Send>),
2752    /// Error with an underlying established connection. Sometimes, reconnecting
2753    /// after such an error is possible.
2754    Connection(Arc<dyn std::error::Error + Sync + Send>),
2755    /// Error with a fetch.
2756    Fetch(FetchError),
2757    /// Session error.
2758    Session(session::Error),
2759    /// Session conflicts with existing session.
2760    Conflict,
2761    /// Connection to self.
2762    SelfConnection,
2763    /// User requested disconnect
2764    Command,
2765}
2766
2767impl DisconnectReason {
2768    pub fn is_dial_err(&self) -> bool {
2769        matches!(self, Self::Dial(_))
2770    }
2771
2772    pub fn is_connection_err(&self) -> bool {
2773        matches!(self, Self::Connection(_))
2774    }
2775
2776    pub fn connection() -> Self {
2777        DisconnectReason::Connection(Arc::new(std::io::Error::from(
2778            std::io::ErrorKind::ConnectionReset,
2779        )))
2780    }
2781}
2782
2783impl fmt::Display for DisconnectReason {
2784    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2785        match self {
2786            Self::Dial(err) => write!(f, "{err}"),
2787            Self::Connection(err) => write!(f, "{err}"),
2788            Self::Command => write!(f, "command"),
2789            Self::SelfConnection => write!(f, "self-connection"),
2790            Self::Conflict => write!(f, "conflict"),
2791            Self::Session(err) => write!(f, "{err}"),
2792            Self::Fetch(err) => write!(f, "fetch: {err}"),
2793        }
2794    }
2795}
2796
2797/// Result of a project lookup.
2798#[derive(Debug)]
2799pub struct Lookup {
2800    /// Whether the project was found locally or not.
2801    pub local: Option<Doc>,
2802    /// A list of remote peers on which the project is known to exist.
2803    pub remote: Vec<NodeId>,
2804}
2805
2806#[derive(thiserror::Error, Debug)]
2807pub enum LookupError {
2808    #[error(transparent)]
2809    Routing(#[from] routing::Error),
2810    #[error(transparent)]
2811    Repository(#[from] RepositoryError),
2812}
2813
2814#[derive(Debug, Clone)]
2815/// Holds currently (or recently) connected peers.
2816pub struct Sessions(AddressBook<NodeId, Session>);
2817
2818impl Sessions {
2819    pub fn new(rng: Rng) -> Self {
2820        Self(AddressBook::new(rng))
2821    }
2822
2823    /// Iterator over fully connected peers.
2824    pub fn connected(&self) -> impl Iterator<Item = (&NodeId, &Session)> + Clone {
2825        self.0
2826            .iter()
2827            .filter_map(move |(id, sess)| match &sess.state {
2828                session::State::Connected { .. } => Some((id, sess)),
2829                _ => None,
2830            })
2831    }
2832
2833    /// Iterator over connected inbound peers.
2834    pub fn inbound(&self) -> impl Iterator<Item = (&NodeId, &Session)> + Clone {
2835        self.connected().filter(|(_, s)| s.link.is_inbound())
2836    }
2837
2838    /// Iterator over outbound peers.
2839    pub fn outbound(&self) -> impl Iterator<Item = (&NodeId, &Session)> + Clone {
2840        self.connected().filter(|(_, s)| s.link.is_outbound())
2841    }
2842
2843    /// Iterator over mutable fully connected peers.
2844    pub fn connected_mut(&mut self) -> impl Iterator<Item = (&NodeId, &mut Session)> {
2845        self.0.iter_mut().filter(move |(_, s)| s.is_connected())
2846    }
2847
2848    /// Iterator over disconnected peers.
2849    pub fn disconnected_mut(&mut self) -> impl Iterator<Item = (&NodeId, &mut Session)> {
2850        self.0.iter_mut().filter(move |(_, s)| s.is_disconnected())
2851    }
2852
2853    /// Return whether this node has a fully established session.
2854    pub fn is_connected(&self, id: &NodeId) -> bool {
2855        self.0.get(id).map(|s| s.is_connected()).unwrap_or(false)
2856    }
2857
2858    /// Return whether this node can be connected to.
2859    pub fn is_disconnected(&self, id: &NodeId) -> bool {
2860        self.0.get(id).map(|s| s.is_disconnected()).unwrap_or(true)
2861    }
2862}
2863
2864impl Deref for Sessions {
2865    type Target = AddressBook<NodeId, Session>;
2866
2867    fn deref(&self) -> &Self::Target {
2868        &self.0
2869    }
2870}
2871
2872impl DerefMut for Sessions {
2873    fn deref_mut(&mut self) -> &mut Self::Target {
2874        &mut self.0
2875    }
2876}