Skip to main content

radicle_protocol/
service.rs

1#![allow(clippy::too_many_arguments)]
2#![allow(clippy::collapsible_match)]
3#![allow(clippy::collapsible_if)]
4#![warn(clippy::unwrap_used)]
5pub mod command;
6pub use command::{Command, QueryState};
7
8pub mod filter;
9pub mod gossip;
10pub mod io;
11pub mod limiter;
12pub mod message;
13pub mod session;
14
15use std::collections::hash_map::Entry;
16use std::collections::{BTreeSet, HashMap, HashSet};
17use std::net::IpAddr;
18use std::ops::{Deref, DerefMut};
19use std::sync::Arc;
20use std::{fmt, net, time};
21
22use crossbeam_channel as chan;
23use fastrand::Rng;
24use localtime::{LocalDuration, LocalTime};
25use log::*;
26use nonempty::NonEmpty;
27
28use radicle::identity::Doc;
29use radicle::node;
30use radicle::node::address;
31use radicle::node::address::Store as _;
32use radicle::node::address::{AddressBook, AddressType, KnownAddress};
33use radicle::node::config::{PeerConfig, RateLimit};
34use radicle::node::device::Device;
35use radicle::node::refs::Store as _;
36use radicle::node::routing::Store as _;
37use radicle::node::seed;
38use radicle::node::seed::Store as _;
39use radicle::node::{Penalty, Severity};
40use radicle::storage::refs::SIGREFS_BRANCH;
41use radicle::storage::RepositoryError;
42use radicle_fetch::policy::SeedingPolicy;
43
44use crate::fetcher;
45use crate::fetcher::service::FetcherService;
46use crate::fetcher::FetcherState;
47use crate::fetcher::RefsToFetch;
48use crate::service::gossip::Store as _;
49use crate::service::message::{
50    Announcement, AnnouncementMessage, Info, NodeAnnouncement, Ping, RefsAnnouncement, RefsStatus,
51};
52use crate::service::policy::{store::Write, Scope};
53use radicle::identity::RepoId;
54use radicle::node::events::Emitter;
55use radicle::node::routing;
56use radicle::node::routing::InsertResult;
57use radicle::node::{Address, Features, FetchResult, HostName, Seed, Seeds, SyncStatus, SyncedAt};
58use radicle::prelude::*;
59use radicle::storage;
60use radicle::storage::{refs::RefsAt, Namespaces, ReadStorage};
61// use radicle::worker::fetch;
62// use crate::worker::FetchError;
63use radicle::crypto;
64use radicle::node::Link;
65use radicle::node::PROTOCOL_VERSION;
66
67use crate::bounded::BoundedVec;
68use crate::service::filter::Filter;
69pub use crate::service::message::{Message, ZeroBytes};
70pub use crate::service::session::{QueuedFetch, Session};
71use crate::worker::FetchError;
72use radicle::node::events::{Event, Events};
73use radicle::node::{Config, NodeId};
74
75use radicle::node::policy::config as policy;
76
77use self::io::Outbox;
78use self::limiter::RateLimiter;
79use self::message::InventoryAnnouncement;
80use self::policy::NamespacesError;
81
82/// How often to run the "idle" task.
83pub const IDLE_INTERVAL: LocalDuration = LocalDuration::from_secs(30);
84/// How often to run the "gossip" task.
85pub const GOSSIP_INTERVAL: LocalDuration = LocalDuration::from_secs(6);
86/// How often to run the "announce" task.
87pub const ANNOUNCE_INTERVAL: LocalDuration = LocalDuration::from_mins(60);
88/// How often to run the "sync" task.
89pub const SYNC_INTERVAL: LocalDuration = LocalDuration::from_secs(60);
90/// How often to run the "prune" task.
91pub const PRUNE_INTERVAL: LocalDuration = LocalDuration::from_mins(30);
92/// Duration to wait on an unresponsive peer before dropping its connection.
93pub const STALE_CONNECTION_TIMEOUT: LocalDuration = LocalDuration::from_mins(2);
94/// How much time should pass after a peer was last active for a *ping* to be sent.
95pub const KEEP_ALIVE_DELTA: LocalDuration = LocalDuration::from_mins(1);
96/// Maximum number of latency values to keep for a session.
97pub const MAX_LATENCIES: usize = 16;
98/// Maximum time difference between the local time, and an announcement timestamp.
99pub const MAX_TIME_DELTA: LocalDuration = LocalDuration::from_mins(60);
100/// Maximum attempts to connect to a peer before we give up.
101pub const MAX_CONNECTION_ATTEMPTS: usize = 3;
102/// How far back from the present time should we request gossip messages when connecting to a peer,
103/// when we come online for the first time.
104pub const INITIAL_SUBSCRIBE_BACKLOG_DELTA: LocalDuration = LocalDuration::from_mins(60 * 24);
105/// When subscribing, what margin of error do we give ourselves. A igher delta means we ask for
106/// messages further back than strictly necessary, to account for missed messages.
107pub const SUBSCRIBE_BACKLOG_DELTA: LocalDuration = LocalDuration::from_mins(3);
108/// Minimum amount of time to wait before reconnecting to a peer.
109pub const MIN_RECONNECTION_DELTA: LocalDuration = LocalDuration::from_secs(3);
110/// Maximum amount of time to wait before reconnecting to a peer.
111pub const MAX_RECONNECTION_DELTA: LocalDuration = LocalDuration::from_mins(60);
112/// Connection retry delta used for ephemeral peers that failed to connect previously.
113pub const CONNECTION_RETRY_DELTA: LocalDuration = LocalDuration::from_mins(10);
114/// How long to wait for a fetch to stall before aborting, default is 3s.
115pub const FETCH_TIMEOUT: time::Duration = time::Duration::from_secs(3);
116/// Target number of peers to maintain connections to.
117pub const TARGET_OUTBOUND_PEERS: usize = 8;
118
119/// Maximum external address limit imposed by message size limits.
120pub use message::ADDRESS_LIMIT;
121/// Maximum inventory limit imposed by message size limits.
122pub use message::INVENTORY_LIMIT;
123/// Maximum number of project git references imposed by message size limits.
124pub use message::REF_REMOTE_LIMIT;
125
126/// Metrics we track.
127#[derive(Clone, Debug, Default, serde::Serialize)]
128#[serde(rename_all = "camelCase")]
129pub struct Metrics {
130    /// Metrics for each peer.
131    pub peers: HashMap<NodeId, PeerMetrics>,
132    /// Tasks queued in worker queue.
133    pub worker_queue_size: usize,
134    /// Current open channel count.
135    pub open_channels: usize,
136}
137
138impl Metrics {
139    /// Get metrics for the given peer.
140    pub fn peer(&mut self, nid: NodeId) -> &mut PeerMetrics {
141        self.peers.entry(nid).or_default()
142    }
143}
144
145/// Per-peer metrics we track.
146#[derive(Clone, Debug, Default, serde::Serialize)]
147#[serde(rename_all = "camelCase")]
148pub struct PeerMetrics {
149    pub received_git_bytes: usize,
150    pub received_fetch_requests: usize,
151    pub received_bytes: usize,
152    pub received_gossip_messages: usize,
153    pub sent_bytes: usize,
154    pub sent_fetch_requests: usize,
155    pub sent_git_bytes: usize,
156    pub sent_gossip_messages: usize,
157    pub streams_opened: usize,
158    pub inbound_connection_attempts: usize,
159    pub outbound_connection_attempts: usize,
160    pub disconnects: usize,
161}
162
163/// Result of syncing our routing table with a node's inventory.
164#[derive(Default)]
165struct SyncedRouting {
166    /// Repo entries added.
167    added: Vec<RepoId>,
168    /// Repo entries removed.
169    removed: Vec<RepoId>,
170    /// Repo entries updated (time).
171    updated: Vec<RepoId>,
172}
173
174impl SyncedRouting {
175    fn is_empty(&self) -> bool {
176        self.added.is_empty() && self.removed.is_empty() && self.updated.is_empty()
177    }
178}
179
180/// A peer we can connect to.
181#[derive(Debug, Clone)]
182struct Peer {
183    nid: NodeId,
184    addresses: Vec<KnownAddress>,
185    penalty: Penalty,
186}
187
188/// General service error.
189#[derive(thiserror::Error, Debug)]
190pub enum Error {
191    #[error(transparent)]
192    Git(#[from] radicle::git::raw::Error),
193    #[error(transparent)]
194    Storage(#[from] storage::Error),
195    #[error(transparent)]
196    Gossip(#[from] gossip::Error),
197    #[error(transparent)]
198    Refs(#[from] storage::refs::Error),
199    #[error(transparent)]
200    Routing(#[from] routing::Error),
201    #[error(transparent)]
202    Address(#[from] address::Error),
203    #[error(transparent)]
204    Database(#[from] node::db::Error),
205    #[error(transparent)]
206    Seeds(#[from] seed::Error),
207    #[error(transparent)]
208    Policy(#[from] policy::Error),
209    #[error(transparent)]
210    Repository(#[from] radicle::storage::RepositoryError),
211    #[error("namespaces error: {0}")]
212    Namespaces(Box<NamespacesError>),
213}
214
215impl From<NamespacesError> for Error {
216    fn from(e: NamespacesError) -> Self {
217        Self::Namespaces(Box::new(e))
218    }
219}
220
221#[derive(thiserror::Error, Debug)]
222pub enum ConnectError {
223    #[error("attempted connection to peer {nid} which already has a session")]
224    SessionExists { nid: NodeId },
225    #[error("attempted connection to self")]
226    SelfConnection,
227    #[error("outbound connection limit reached when attempting {nid} ({addr})")]
228    LimitReached { nid: NodeId, addr: Address },
229    #[error(
230        "attempted connection to {nid}, via {addr} but addresses of this kind are not supported"
231    )]
232    UnsupportedAddress { nid: NodeId, addr: Address },
233    #[error("attempted connection with blocked peer {nid}")]
234    Blocked { nid: NodeId },
235}
236
237/// A store for all node data.
238pub trait Store:
239    address::Store + gossip::Store + routing::Store + seed::Store + node::refs::Store
240{
241}
242
243impl Store for radicle::node::Database {}
244
245/// Fetch state for an ongoing fetch.
246#[derive(Debug)]
247pub struct FetchState {
248    /// Node we're fetching from.
249    pub from: NodeId,
250    /// What refs we're fetching.
251    pub refs_at: Vec<RefsAt>,
252    /// Channels waiting for fetch results.
253    pub subscribers: Vec<chan::Sender<FetchResult>>,
254}
255
256/// Holds all node stores.
257#[derive(Debug)]
258pub struct Stores<D>(D);
259
260impl<D> Stores<D>
261where
262    D: Store,
263{
264    /// Get the database as a routing store.
265    pub fn routing(&self) -> &impl routing::Store {
266        &self.0
267    }
268
269    /// Get the database as a routing store, mutably.
270    pub fn routing_mut(&mut self) -> &mut impl routing::Store {
271        &mut self.0
272    }
273
274    /// Get the database as an address store.
275    pub fn addresses(&self) -> &impl address::Store {
276        &self.0
277    }
278
279    /// Get the database as an address store, mutably.
280    pub fn addresses_mut(&mut self) -> &mut impl address::Store {
281        &mut self.0
282    }
283
284    /// Get the database as a gossip store.
285    pub fn gossip(&self) -> &impl gossip::Store {
286        &self.0
287    }
288
289    /// Get the database as a gossip store, mutably.
290    pub fn gossip_mut(&mut self) -> &mut impl gossip::Store {
291        &mut self.0
292    }
293
294    /// Get the database as a seed store.
295    pub fn seeds(&self) -> &impl seed::Store {
296        &self.0
297    }
298
299    /// Get the database as a seed store, mutably.
300    pub fn seeds_mut(&mut self) -> &mut impl seed::Store {
301        &mut self.0
302    }
303
304    /// Get the database as a refs db.
305    pub fn refs(&self) -> &impl node::refs::Store {
306        &self.0
307    }
308
309    /// Get the database as a refs db, mutably.
310    pub fn refs_mut(&mut self) -> &mut impl node::refs::Store {
311        &mut self.0
312    }
313}
314
315impl<D> AsMut<D> for Stores<D> {
316    fn as_mut(&mut self) -> &mut D {
317        &mut self.0
318    }
319}
320
321impl<D> From<D> for Stores<D> {
322    fn from(db: D) -> Self {
323        Self(db)
324    }
325}
326
327/// The node service.
328#[derive(Debug)]
329pub struct Service<D, S, G> {
330    /// Service configuration.
331    config: Config,
332    /// Our cryptographic signer and key.
333    signer: Device<G>,
334    /// Project storage.
335    storage: S,
336    /// Node database.
337    db: Stores<D>,
338    /// Policy configuration.
339    policies: policy::Config<Write>,
340    /// Peer sessions, currently or recently connected.
341    sessions: Sessions,
342    /// Clock. Tells the time.
343    clock: LocalTime,
344    /// Who relayed what announcement to us. We keep track of this to ensure that
345    /// we don't relay messages to nodes that already know about these messages.
346    relayed_by: HashMap<gossip::AnnouncementId, Vec<NodeId>>,
347    /// I/O outbox.
348    outbox: Outbox,
349    /// Cached local node announcement.
350    node: NodeAnnouncement,
351    /// Cached local inventory announcement.
352    inventory: InventoryAnnouncement,
353    /// Source of entropy.
354    rng: Rng,
355    fetcher: FetcherService<command::Responder<FetchResult>>,
356    /// Request/connection rate limiter.
357    limiter: RateLimiter,
358    /// Current seeded repositories bloom filter.
359    filter: Filter,
360    /// Last time the service was idle.
361    last_idle: LocalTime,
362    /// Last time the gossip messages were relayed.
363    last_gossip: LocalTime,
364    /// Last time the service synced.
365    last_sync: LocalTime,
366    /// Last time the service routing table was pruned.
367    last_prune: LocalTime,
368    /// Last time the announcement task was run.
369    last_announce: LocalTime,
370    /// Timestamp of last local inventory announced.
371    last_inventory: LocalTime,
372    /// Last timestamp used for announcements.
373    last_timestamp: Timestamp,
374    /// Time when the service was initialized, or `None` if it wasn't initialized.
375    started_at: Option<LocalTime>,
376    /// Time when the service was last online, or `None` if this is the first time.
377    last_online_at: Option<LocalTime>,
378    /// Publishes events to subscribers.
379    emitter: Emitter<Event>,
380    /// Local listening addresses.
381    listening: Vec<net::SocketAddr>,
382    /// Latest metrics for all nodes connected to since the last start.
383    metrics: Metrics,
384}
385
386impl<D, S, G> Service<D, S, G> {
387    /// Get the local node id.
388    pub fn node_id(&self) -> NodeId {
389        *self.signer.public_key()
390    }
391
392    /// Get the local service time.
393    pub fn local_time(&self) -> LocalTime {
394        self.clock
395    }
396
397    pub fn emitter(&self) -> Emitter<Event> {
398        self.emitter.clone()
399    }
400}
401
402impl<D, S, G> Service<D, S, G>
403where
404    D: Store,
405    S: ReadStorage + 'static,
406    G: crypto::signature::Signer<crypto::Signature>,
407{
408    pub fn new(
409        config: Config,
410        db: Stores<D>,
411        storage: S,
412        policies: policy::Config<Write>,
413        signer: Device<G>,
414        rng: Rng,
415        node: NodeAnnouncement,
416        emitter: Emitter<Event>,
417    ) -> Self {
418        let sessions = Sessions::new(rng.clone());
419        let limiter = RateLimiter::new(config.peers());
420        let last_timestamp = node.timestamp;
421        let clock = LocalTime::default(); // Updated on initialize.
422        let inventory = gossip::inventory(clock.into(), []); // Updated on initialize.
423        let fetcher = {
424            let config = fetcher::Config::new()
425                .with_max_concurrency(
426                    std::num::NonZeroUsize::new(config.limits.fetch_concurrency.into())
427                        .expect("fetch concurrency was zero, must be at least 1"),
428                )
429                .with_max_capacity(fetcher::MaxQueueSize::default());
430            FetcherService::new(config)
431        };
432        Self {
433            config,
434            storage,
435            policies,
436            signer,
437            rng,
438            inventory,
439            node,
440            clock,
441            db,
442            outbox: Outbox::default(),
443            limiter,
444            sessions,
445            fetcher,
446            filter: Filter::empty(),
447            relayed_by: HashMap::default(),
448            last_idle: LocalTime::default(),
449            last_gossip: LocalTime::default(),
450            last_sync: LocalTime::default(),
451            last_prune: LocalTime::default(),
452            last_timestamp,
453            last_announce: LocalTime::default(),
454            last_inventory: LocalTime::default(),
455            started_at: None,     // Updated on initialize.
456            last_online_at: None, // Updated on initialize.
457            emitter,
458            listening: vec![],
459            metrics: Metrics::default(),
460        }
461    }
462
463    /// Whether the service was started (initialized) and if so, at what time.
464    pub fn started(&self) -> Option<LocalTime> {
465        self.started_at
466    }
467
468    /// Return the next i/o action to execute.
469    #[allow(clippy::should_implement_trait)]
470    pub fn next(&mut self) -> Option<io::Io> {
471        self.outbox.next()
472    }
473
474    /// Seed a repository.
475    /// Returns whether or not the repo policy was updated.
476    pub fn seed(&mut self, id: &RepoId, scope: Scope) -> Result<bool, policy::Error> {
477        let updated = self.policies.seed(id, scope)?;
478        self.filter.insert(id);
479
480        Ok(updated)
481    }
482
483    /// Unseed a repository.
484    /// Returns whether or not the repo policy was updated.
485    /// Note that when unseeding, we don't announce anything to the network. This is because by
486    /// simply not announcing it anymore, it will eventually be pruned by nodes.
487    pub fn unseed(&mut self, id: &RepoId) -> Result<bool, policy::Error> {
488        let updated = self.policies.unseed(id)?;
489
490        if updated {
491            // Nb. This is potentially slow if we have lots of repos. We should probably
492            // only re-compute the filter when we've unseeded a certain amount of repos
493            // and the filter is really out of date.
494            self.filter = Filter::allowed_by(self.policies.seed_policies()?);
495            // Update and announce new inventory.
496            if let Err(e) = self.remove_inventory(id) {
497                warn!(target: "service", "Failed to update inventory after unseed: {e}");
498            }
499        }
500        Ok(updated)
501    }
502
503    /// Find the closest `n` peers by proximity in seeding graphs.
504    /// Returns a sorted list from the closest peer to the furthest.
505    /// Peers with more seedings in common score score higher.
506    #[allow(unused)]
507    pub fn closest_peers(&self, n: usize) -> Vec<NodeId> {
508        todo!()
509    }
510
511    /// Get the database.
512    pub fn database(&self) -> &Stores<D> {
513        &self.db
514    }
515
516    /// Get the mutable database.
517    pub fn database_mut(&mut self) -> &mut Stores<D> {
518        &mut self.db
519    }
520
521    /// Get the storage instance.
522    pub fn storage(&self) -> &S {
523        &self.storage
524    }
525
526    /// Get the mutable storage instance.
527    pub fn storage_mut(&mut self) -> &mut S {
528        &mut self.storage
529    }
530
531    /// Get the node policies.
532    pub fn policies(&self) -> &policy::Config<Write> {
533        &self.policies
534    }
535
536    /// Get the local signer.
537    pub fn signer(&self) -> &Device<G> {
538        &self.signer
539    }
540
541    /// Subscriber to inner `Emitter` events.
542    pub fn events(&mut self) -> Events {
543        Events::from(self.emitter.subscribe())
544    }
545
546    pub fn fetcher(&self) -> &FetcherState {
547        self.fetcher.state()
548    }
549
550    /// Get I/O outbox.
551    pub fn outbox(&mut self) -> &mut Outbox {
552        &mut self.outbox
553    }
554
555    /// Get configuration.
556    pub fn config(&self) -> &Config {
557        &self.config
558    }
559
560    /// Lookup a repository, both locally and in the routing table.
561    pub fn lookup(&self, rid: RepoId) -> Result<Lookup, LookupError> {
562        let this = self.nid();
563        let local = self.storage.get(rid)?;
564        let remote = self
565            .db
566            .routing()
567            .get(&rid)?
568            .iter()
569            .filter(|nid| nid != &this)
570            .cloned()
571            .collect();
572
573        Ok(Lookup { local, remote })
574    }
575
576    /// Initialize service with current time. Call this once.
577    pub fn initialize(&mut self, time: LocalTime) -> Result<(), Error> {
578        debug!(target: "service", "Init @{}", time.as_millis());
579        assert_ne!(time, LocalTime::default());
580
581        let nid = self.node_id();
582
583        self.clock = time;
584        self.started_at = Some(time);
585        self.last_online_at = match self.db.gossip().last() {
586            Ok(Some(last)) => Some(last.to_local_time()),
587            Ok(None) => None,
588            Err(e) => {
589                warn!(target: "service", "Failed to get the latest gossip message from db: {e}");
590                None
591            }
592        };
593
594        // Populate refs database. This is only useful as part of the upgrade process for nodes
595        // that have been online since before the refs database was created.
596        match self.db.refs().count() {
597            Ok(0) => {
598                info!(target: "service", "Empty refs database, populating from storage..");
599                if let Err(e) = self.db.refs_mut().populate(&self.storage) {
600                    warn!(target: "service", "Failed to populate refs database: {e}");
601                }
602            }
603            Ok(n) => debug!(target: "service", "Refs database has {n} cached references"),
604            Err(e) => {
605                warn!(target: "service", "Failed to retrieve count of refs from database: {e}")
606            }
607        }
608
609        let announced = self
610            .db
611            .seeds()
612            .seeded_by(&nid)?
613            .collect::<Result<HashMap<_, _>, _>>()?;
614        let mut inventory = BTreeSet::new();
615        let mut private = BTreeSet::new();
616
617        for repo in self.storage.repositories()? {
618            let rid = repo.rid;
619
620            // If we're not seeding this repo, just skip it.
621            if !self.policies.is_seeding(&rid)? {
622                debug!(target: "service", "Local repository {rid} is not seeded");
623                continue;
624            }
625            // Add public repositories to inventory.
626            if repo.doc.is_public() {
627                inventory.insert(rid);
628            } else {
629                private.insert(rid);
630            }
631            // If we have no owned refs for this repo, then there's nothing to announce.
632            let Some(updated_at) = repo.synced_at else {
633                continue;
634            };
635            // Skip this repo if the sync status matches what we have in storage.
636            if let Some(announced) = announced.get(&rid) {
637                if updated_at.oid == announced.oid {
638                    continue;
639                }
640            }
641            // Make sure our local node's sync status is up to date with storage.
642            if self.db.seeds_mut().synced(
643                &rid,
644                &nid,
645                updated_at.oid,
646                updated_at.timestamp.into(),
647            )? {
648                debug!(target: "service", "Saved local sync status for {rid}..");
649            }
650            // If we got here, it likely means a repo was updated while the node was stopped.
651            // Therefore, we pre-load a refs announcement for this repo, so that it is included in
652            // the historical gossip messages when a node connects and subscribes to this repo.
653            if let Ok((ann, _)) = self.refs_announcement_for(rid, [nid]) {
654                debug!(target: "service", "Adding refs announcement for {rid} to historical gossip messages..");
655                self.db.gossip_mut().announced(&nid, &ann)?;
656            }
657        }
658
659        // Ensure that our inventory is recorded in our routing table, and we are seeding
660        // all of it. It can happen that inventory is not properly seeded if for eg. the
661        // user creates a new repository while the node is stopped.
662        self.db
663            .routing_mut()
664            .add_inventory(inventory.iter(), nid, time.into())?;
665        self.inventory = gossip::inventory(self.timestamp(), inventory);
666
667        // Ensure that private repositories are not in our inventory. It's possible that
668        // a repository was public and then it was made private.
669        self.db
670            .routing_mut()
671            .remove_inventories(private.iter(), &nid)?;
672
673        // Setup subscription filter for seeded repos.
674        self.filter = Filter::allowed_by(self.policies.seed_policies()?);
675        // Connect to configured peers.
676        let addrs = self.config.connect.clone();
677        for (id, addr) in addrs.into_iter().map(|ca| ca.into()) {
678            if let Err(e) = self.connect(id, addr) {
679                debug!(target: "service", "Service::initialization connection error: {e}");
680            }
681        }
682        // Try to establish some connections.
683        self.maintain_connections();
684        // Start periodic tasks.
685        self.outbox.wakeup(IDLE_INTERVAL);
686        self.outbox.wakeup(GOSSIP_INTERVAL);
687
688        Ok(())
689    }
690
691    pub fn tick(&mut self, now: LocalTime, metrics: &Metrics) {
692        trace!(
693            target: "service",
694            "Tick +{}",
695            now - self.started_at.expect("Service::tick: service must be initialized")
696        );
697        if now >= self.clock {
698            self.clock = now;
699        } else {
700            // Nb. In tests, we often move the clock forwards in time to test different behaviors,
701            // so this warning isn't applicable there.
702            #[cfg(not(test))]
703            warn!(
704                target: "service",
705                "System clock is not monotonic: {now} is not greater or equal to {}", self.clock
706            );
707        }
708        self.metrics = metrics.clone();
709    }
710
711    pub fn wake(&mut self) {
712        let now = self.clock;
713
714        trace!(
715            target: "service",
716            "Wake +{}",
717            now - self.started_at.expect("Service::wake: service must be initialized")
718        );
719
720        if now - self.last_idle >= IDLE_INTERVAL {
721            trace!(target: "service", "Running 'idle' task...");
722
723            self.keep_alive(&now);
724            self.disconnect_unresponsive_peers(&now);
725            self.idle_connections();
726            self.maintain_connections();
727            self.dequeue_fetches();
728            self.outbox.wakeup(IDLE_INTERVAL);
729            self.last_idle = now;
730        }
731        if now - self.last_gossip >= GOSSIP_INTERVAL {
732            trace!(target: "service", "Running 'gossip' task...");
733
734            if let Err(e) = self.relay_announcements() {
735                warn!(target: "service", "Failed to relay stored announcements: {e}");
736            }
737            self.outbox.wakeup(GOSSIP_INTERVAL);
738            self.last_gossip = now;
739        }
740        if now - self.last_sync >= SYNC_INTERVAL {
741            trace!(target: "service", "Running 'sync' task...");
742
743            if let Err(e) = self.fetch_missing_repositories() {
744                warn!(target: "service", "Failed to fetch missing inventory: {e}");
745            }
746            self.outbox.wakeup(SYNC_INTERVAL);
747            self.last_sync = now;
748        }
749        if now - self.last_announce >= ANNOUNCE_INTERVAL {
750            trace!(target: "service", "Running 'announce' task...");
751
752            self.announce_inventory();
753            self.outbox.wakeup(ANNOUNCE_INTERVAL);
754            self.last_announce = now;
755        }
756        if now - self.last_prune >= PRUNE_INTERVAL {
757            trace!(target: "service", "Running 'prune' task...");
758
759            if let Err(err) = self.prune_routing_entries(&now) {
760                warn!(target: "service", "Failed to prune routing entries: {err}");
761            }
762            if let Err(err) = self
763                .db
764                .gossip_mut()
765                .prune((now - LocalDuration::from(self.config.limits.gossip_max_age)).into())
766            {
767                warn!(target: "service", "Failed to prune gossip entries: {err}");
768            }
769
770            self.outbox.wakeup(PRUNE_INTERVAL);
771            self.last_prune = now;
772        }
773
774        // Always check whether there are persistent peers that need reconnecting.
775        self.maintain_persistent();
776    }
777
778    pub fn command(&mut self, cmd: Command) {
779        info!(target: "service", "Received command {cmd:?}");
780
781        match cmd {
782            Command::Connect(nid, addr, opts) => {
783                if opts.persistent {
784                    self.config.connect.insert((nid, addr.clone()).into());
785                }
786                if let Err(e) = self.connect(nid, addr) {
787                    match e {
788                        ConnectError::SessionExists { nid } => {
789                            self.emitter.emit(Event::PeerConnected { nid });
790                        }
791                        e => {
792                            // N.b. using the fact that the call to connect waits for an event
793                            self.emitter.emit(Event::PeerDisconnected {
794                                nid,
795                                reason: e.to_string(),
796                            });
797                        }
798                    }
799                }
800            }
801            Command::Disconnect(nid) => {
802                self.outbox.disconnect(nid, DisconnectReason::Command);
803            }
804            Command::Config(resp) => {
805                resp.ok(self.config.clone()).ok();
806            }
807            Command::ListenAddrs(resp) => {
808                resp.ok(self.listening.clone()).ok();
809            }
810            Command::Seeds(rid, namespaces, resp) => match self.seeds(&rid, namespaces) {
811                Ok(seeds) => {
812                    let (connected, disconnected) = seeds.partition();
813                    debug!(
814                        target: "service",
815                        "Found {} connected seed(s) and {} disconnected seed(s) for {}",
816                        connected.len(), disconnected.len(),  rid
817                    );
818                    resp.ok(seeds).ok();
819                }
820                Err(e) => {
821                    warn!(target: "service", "Failed to get seeds for {rid}: {e}");
822                    resp.err(e).ok();
823                }
824            },
825            Command::Fetch(rid, seed, timeout, resp) => {
826                self.fetch(rid, seed, vec![], timeout, Some(resp));
827            }
828            Command::Seed(rid, scope, resp) => {
829                // Update our seeding policy.
830                let seeded = self
831                    .seed(&rid, scope)
832                    .expect("Service::command: error seeding repository");
833                resp.ok(seeded).ok();
834
835                // Let all our peers know that we're interested in this repo from now on.
836                self.outbox.broadcast(
837                    Message::subscribe(self.filter(), self.clock.into(), Timestamp::MAX),
838                    self.sessions.connected().map(|(_, s)| s),
839                );
840            }
841            Command::Unseed(id, resp) => {
842                let updated = self
843                    .unseed(&id)
844                    .expect("Service::command: error unseeding repository");
845                resp.ok(updated).ok();
846            }
847            Command::Follow(id, alias, resp) => {
848                let seeded = self
849                    .policies
850                    .follow(&id, alias.as_ref())
851                    .expect("Service::command: error following node");
852                resp.ok(seeded).ok();
853            }
854            Command::Unfollow(id, resp) => {
855                let updated = self
856                    .policies
857                    .unfollow(&id)
858                    .expect("Service::command: error unfollowing node");
859                resp.ok(updated).ok();
860            }
861            Command::Block(id, resp) => {
862                let updated = self
863                    .policies
864                    .set_follow_policy(&id, policy::Policy::Block)
865                    .expect("Service::command: error blocking node");
866                if updated {
867                    self.outbox.disconnect(id, DisconnectReason::Policy);
868                }
869                resp.send(updated).ok();
870            }
871            Command::AnnounceRefs(id, namespaces, resp) => {
872                let doc = match self.storage.get(id) {
873                    Ok(Some(doc)) => doc,
874                    Ok(None) => {
875                        warn!(target: "service", "Failed to announce refs: repository {id} not found");
876                        resp.err(command::Error::custom(format!("repository {id} not found")))
877                            .ok();
878                        return;
879                    }
880                    Err(e) => {
881                        warn!(target: "service", "Failed to announce refs: doc error: {e}");
882                        resp.err(e).ok();
883                        return;
884                    }
885                };
886
887                match self.announce_own_refs(id, doc, namespaces) {
888                    Ok((refs, _timestamp)) => {
889                        // TODO(finto): currently the command caller only
890                        // expects one `RefsAt`, this should be fixed in the
891                        // trait, eventually.
892                        if let Some(refs) = refs.first() {
893                            resp.ok(*refs).ok();
894                        } else {
895                            resp.err(command::Error::custom(format!(
896                                "no refs were announced for {id}"
897                            )))
898                            .ok();
899                        }
900                    }
901                    Err(err) => {
902                        warn!(target: "service", "Failed to announce refs: {err}");
903                        resp.err(err).ok();
904                    }
905                }
906            }
907            Command::AnnounceInventory => {
908                self.announce_inventory();
909            }
910            Command::AddInventory(rid, resp) => match self.add_inventory(rid) {
911                Ok(updated) => {
912                    resp.ok(updated).ok();
913                }
914                Err(e) => {
915                    warn!(target: "service", "Failed to add {rid} to inventory: {e}");
916                    resp.err(e).ok();
917                }
918            },
919            Command::QueryState(query, sender) => {
920                sender.send(query(self)).ok();
921            }
922        }
923    }
924
925    /// Initiate an outgoing fetch for some repository, based on another node's announcement.
926    /// Returns `true` if the fetch was initiated and `false` if it was skipped.
927    fn fetch_refs_at(
928        &mut self,
929        rid: RepoId,
930        from: NodeId,
931        refs: NonEmpty<RefsAt>,
932        scope: Scope,
933        timeout: time::Duration,
934    ) -> bool {
935        match self.refs_status_of(rid, refs, &scope) {
936            Ok(status) => {
937                if status.want.is_empty() {
938                    debug!(target: "service", "Skipping fetch for {rid}, all refs are already in storage");
939                } else {
940                    self.fetch(rid, from, status.want, timeout, None);
941                    return true;
942                }
943            }
944            Err(e) => {
945                warn!(target: "service", "Failed to get the refs status of {rid}: {e}");
946            }
947        }
948        // We didn't try to fetch anything.
949        false
950    }
951
952    fn fetch(
953        &mut self,
954        rid: RepoId,
955        from: NodeId,
956        refs_at: Vec<RefsAt>,
957        timeout: time::Duration,
958        channel: Option<command::Responder<FetchResult>>,
959    ) {
960        let session = {
961            let reason = format!("peer {from} is not connected; cannot initiate fetch");
962            let Some(session) = self.sessions.get_mut(&from) else {
963                if let Some(c) = channel {
964                    c.ok(FetchResult::Failed { reason }).ok();
965                }
966                return;
967            };
968            if !session.is_connected() {
969                if let Some(c) = channel {
970                    c.ok(FetchResult::Failed { reason }).ok();
971                }
972                return;
973            }
974            session
975        };
976
977        let cmd = fetcher::state::command::Fetch {
978            from,
979            rid,
980            refs: refs_at.into(),
981            timeout,
982        };
983        let fetcher::service::FetchInitiated { event, rejected } = self.fetcher.fetch(cmd, channel);
984
985        if let Some(c) = rejected {
986            c.ok(FetchResult::Failed {
987                reason: "fetch queue at capacity".to_string(),
988            })
989            .ok();
990        }
991
992        match event {
993            fetcher::state::event::Fetch::Started {
994                rid,
995                from,
996                refs: refs_at,
997                timeout,
998            } => {
999                debug!(target: "service", "Starting fetch for {rid} from {from}");
1000                self.outbox.fetch(
1001                    session,
1002                    rid,
1003                    refs_at.into(),
1004                    timeout,
1005                    self.config.limits.fetch_pack_receive,
1006                );
1007            }
1008            fetcher::state::event::Fetch::Queued { rid, from } => {
1009                debug!(target: "service", "Queued fetch for {rid} from {from}");
1010            }
1011            fetcher::state::event::Fetch::AlreadyFetching { rid, from } => {
1012                debug!(target: "service", "Already fetching {rid} from {from}");
1013            }
1014            fetcher::state::event::Fetch::QueueAtCapacity { rid, from, .. } => {
1015                debug!(target: "service", "Queue at capacity for {from}, rejected {rid}");
1016            }
1017        }
1018    }
1019
1020    pub fn fetched(
1021        &mut self,
1022        rid: RepoId,
1023        from: NodeId,
1024        result: Result<crate::worker::fetch::FetchResult, crate::worker::FetchError>,
1025    ) {
1026        let cmd = fetcher::state::command::Fetched { from, rid };
1027        let fetcher::service::FetchCompleted { event, subscribers } = self.fetcher.fetched(cmd);
1028
1029        // Dequeue next fetches
1030        self.dequeue_fetches();
1031
1032        match event {
1033            fetcher::state::event::Fetched::NotFound { from, rid } => {
1034                debug!(target: "service", "Unexpected fetch result for {rid} from {from}");
1035            }
1036            fetcher::state::event::Fetched::Completed { from, rid, refs: _ } => {
1037                // Notify responders
1038                let fetch_result = match &result {
1039                    Ok(success) => FetchResult::Success {
1040                        updated: success.updated.clone(),
1041                        namespaces: success.namespaces.clone(),
1042                        clone: success.clone,
1043                    },
1044                    Err(e) => FetchResult::Failed {
1045                        reason: e.to_string(),
1046                    },
1047                };
1048                for responder in subscribers {
1049                    responder.ok(fetch_result.clone()).ok();
1050                }
1051                match result {
1052                    Ok(crate::worker::fetch::FetchResult {
1053                        updated,
1054                        canonical,
1055                        namespaces,
1056                        clone,
1057                        doc,
1058                    }) => {
1059                        info!(target: "service", "Fetched {rid} from {from} successfully");
1060                        // Update our routing table in case this fetch was user-initiated and doesn't
1061                        // come from an announcement.
1062                        self.seed_discovered(rid, from, self.clock.into());
1063
1064                        for update in &updated {
1065                            if update.is_skipped() {
1066                                trace!(target: "service", "Ref skipped: {update} for {rid}");
1067                            } else {
1068                                debug!(target: "service", "Ref updated: {update} for {rid}");
1069                            }
1070                        }
1071                        self.emitter.emit(Event::RefsFetched {
1072                            remote: from,
1073                            rid,
1074                            updated: updated.clone(),
1075                        });
1076                        self.emitter
1077                            .emit_all(canonical.into_iter().map(|(refname, target)| {
1078                                Event::CanonicalRefUpdated {
1079                                    rid,
1080                                    refname,
1081                                    target,
1082                                }
1083                            }));
1084
1085                        // Announce our new inventory if this fetch was a full clone.
1086                        // Only update and announce inventory for public repositories.
1087                        if clone && doc.is_public() {
1088                            debug!(target: "service", "Updating and announcing inventory for cloned repository {rid}..");
1089
1090                            if let Err(e) = self.add_inventory(rid) {
1091                                warn!(target: "service", "Failed to announce inventory for {rid}: {e}");
1092                            }
1093                        }
1094
1095                        // It's possible for a fetch to succeed but nothing was updated.
1096                        if updated.is_empty() || updated.iter().all(|u| u.is_skipped()) {
1097                            debug!(target: "service", "Nothing to announce, no refs were updated..");
1098                        } else {
1099                            // Finally, announce the refs. This is useful for nodes to know what we've synced,
1100                            // beyond just knowing that we have added an item to our inventory.
1101                            if let Err(e) = self.announce_refs(rid, doc.into(), namespaces, false) {
1102                                warn!(target: "service", "Failed to announce new refs: {e}");
1103                            }
1104                        }
1105                    }
1106                    Err(err) => {
1107                        warn!(target: "service", "Fetch failed for {rid} from {from}: {err}");
1108
1109                        // For now, we only disconnect the from in case of timeout. In the future,
1110                        // there may be other reasons to disconnect.
1111                        if err.is_timeout() {
1112                            self.outbox.disconnect(from, DisconnectReason::Fetch(err));
1113                        }
1114                    }
1115                }
1116            }
1117        }
1118    }
1119
1120    /// Attempt to dequeue fetches from all peers.
1121    /// At most one fetch is dequeued per peer. If the fetch cannot be processed,
1122    /// it is put back on the queue for that peer.
1123    ///
1124    /// Fetches are queued for two reasons:
1125    /// 1. The RID was already being fetched.
1126    /// 2. The session was already at fetch capacity.
1127    pub fn dequeue_fetches(&mut self) {
1128        let sessions = self
1129            .sessions
1130            .shuffled()
1131            .map(|(k, _)| *k)
1132            .collect::<Vec<_>>();
1133
1134        for nid in sessions {
1135            #[allow(clippy::unwrap_used)]
1136            let sess = self.sessions.get_mut(&nid).unwrap();
1137            if !sess.is_connected() {
1138                continue;
1139            }
1140
1141            let Some(fetcher::QueuedFetch {
1142                rid,
1143                refs: refs_at,
1144                timeout,
1145            }) = self.fetcher.dequeue(&nid)
1146            else {
1147                continue;
1148            };
1149
1150            // Check seeding policy
1151            let repo_entry = self
1152                .policies
1153                .seed_policy(&rid)
1154                .expect("error accessing repo seeding configuration");
1155
1156            let SeedingPolicy::Allow { scope } = repo_entry.policy else {
1157                debug!(target: "service", "Repository {} no longer seeded, skipping", rid);
1158                continue;
1159            };
1160
1161            debug!(target: "service", "Dequeued fetch for {} from {}", rid, nid);
1162
1163            match refs_at {
1164                RefsToFetch::Refs(refs) => {
1165                    self.fetch_refs_at(rid, nid, refs, scope, timeout);
1166                }
1167                RefsToFetch::All => {
1168                    // Channel is `None` since they will already be
1169                    // registered with the fetcher service.
1170                    self.fetch(rid, nid, vec![], timeout, None);
1171                }
1172            }
1173        }
1174    }
1175
1176    /// Inbound connection attempt.
1177    pub fn accepted(&mut self, ip: IpAddr) -> bool {
1178        // Always accept localhost connections, even if we already reached
1179        // our inbound connection limit.
1180        if ip.is_loopback() || ip.is_unspecified() {
1181            return true;
1182        }
1183        // Check for inbound connection limit.
1184        if self.sessions.inbound().count() >= self.config.limits.connection.inbound.into() {
1185            return false;
1186        }
1187        match self.db.addresses().is_ip_banned(ip) {
1188            Ok(banned) => {
1189                if banned {
1190                    debug!(target: "service", "Rejecting inbound connection from banned ip {ip}");
1191                    return false;
1192                }
1193            }
1194            Err(e) => warn!(target: "service", "Failed to query ban status for {ip}: {e}"),
1195        }
1196        let host: HostName = ip.into();
1197        let tokens = self.config.limits.rate.inbound;
1198
1199        if self.limiter.limit(host.clone(), None, &tokens, self.clock) {
1200            trace!(target: "service", "Rate limiting inbound connection from {host}..");
1201            return false;
1202        }
1203        true
1204    }
1205
1206    pub fn attempted(&mut self, nid: NodeId, addr: Address) {
1207        debug!(target: "service", "Attempted connection to {nid} ({addr})");
1208
1209        if let Some(sess) = self.sessions.get_mut(&nid) {
1210            sess.to_attempted();
1211        } else {
1212            #[cfg(debug_assertions)]
1213            panic!("Service::attempted: unknown session {nid}@{addr}");
1214        }
1215    }
1216
1217    pub fn listening(&mut self, local_addr: net::SocketAddr) {
1218        info!(target: "node", "Listening on {local_addr}..");
1219
1220        self.listening.push(local_addr);
1221    }
1222
1223    pub fn connected(&mut self, remote: NodeId, addr: Address, link: Link) {
1224        if let Ok(true) = self.policies.is_blocked(&remote) {
1225            self.emitter.emit(Event::PeerDisconnected {
1226                nid: remote,
1227                reason: format!("{remote} is blocked"),
1228            });
1229            info!(target: "service", "Disconnecting blocked inbound peer {remote}");
1230            self.outbox.disconnect(remote, DisconnectReason::Policy);
1231            return;
1232        }
1233
1234        info!(target: "service", "Connected to {remote} ({addr}) ({link:?})");
1235        self.emitter.emit(Event::PeerConnected { nid: remote });
1236
1237        let msgs = self.initial(link);
1238
1239        if link.is_outbound() {
1240            if let Some(peer) = self.sessions.get_mut(&remote) {
1241                peer.to_connected(self.clock);
1242                self.outbox.write_all(peer, msgs);
1243            }
1244        } else {
1245            match self.sessions.entry(remote) {
1246                Entry::Occupied(mut e) => {
1247                    // In this scenario, it's possible that our peer is persistent, and
1248                    // disconnected. We get an inbound connection before we attempt a re-connection,
1249                    // and therefore we treat it as a regular inbound connection.
1250                    //
1251                    // It's also possible that a disconnection hasn't gone through yet and our
1252                    // peer is still in connected state here, while a new inbound connection from
1253                    // that same peer is made. This results in a new connection from a peer that is
1254                    // already connected from the perspective of the service. This appears to be
1255                    // a bug in the underlying networking library.
1256                    let peer = e.get_mut();
1257                    debug!(
1258                        target: "service",
1259                        "Connecting peer {remote} already has a session open ({peer})"
1260                    );
1261                    peer.link = link;
1262                    peer.to_connected(self.clock);
1263                    self.outbox.write_all(peer, msgs);
1264                }
1265                Entry::Vacant(e) => {
1266                    if let HostName::Ip(ip) = addr.host {
1267                        if !address::is_local(&ip) {
1268                            if let Err(e) =
1269                                self.db
1270                                    .addresses_mut()
1271                                    .record_ip(&remote, ip, self.clock.into())
1272                            {
1273                                log::debug!(target: "service", "Failed to record IP address for {remote}: {e}");
1274                            }
1275                        }
1276                    }
1277                    let peer = e.insert(Session::inbound(
1278                        remote,
1279                        addr,
1280                        self.config.is_persistent(&remote),
1281                        self.rng.clone(),
1282                        self.clock,
1283                    ));
1284                    self.outbox.write_all(peer, msgs);
1285                }
1286            }
1287        }
1288    }
1289
1290    pub fn disconnected(&mut self, remote: NodeId, link: Link, reason: &DisconnectReason) {
1291        let since = self.local_time();
1292        let Some(session) = self.sessions.get_mut(&remote) else {
1293            // Since we sometimes disconnect the service eagerly, it's not unusual to get a second
1294            // disconnection event once the transport is dropped.
1295            trace!(target: "service", "Redundant disconnection for {remote} ({reason})");
1296            return;
1297        };
1298        // In cases of connection conflicts, there may be disconnections of one of the two
1299        // connections. In that case we don't want the service to remove the session.
1300        if session.link != link {
1301            return;
1302        }
1303
1304        info!(target: "service", "Disconnected from {remote} ({reason})");
1305        self.emitter.emit(Event::PeerDisconnected {
1306            nid: remote,
1307            reason: reason.to_string(),
1308        });
1309
1310        let link = session.link;
1311        let addr = session.addr.clone();
1312
1313        let cmd = fetcher::state::command::Cancel { from: remote };
1314        let fetcher::service::FetchesCancelled { event, orphaned } = self.fetcher.cancel(cmd);
1315
1316        match event {
1317            fetcher::state::event::Cancel::Unexpected { from } => {
1318                debug!(target: "service", "No fetches to cancel for {from}");
1319            }
1320            fetcher::state::event::Cancel::Canceled {
1321                from,
1322                active,
1323                queued,
1324            } => {
1325                debug!(target: "service", "Cancelled {} ongoing, {} queued for {from}", active.len(), queued.len());
1326            }
1327        }
1328
1329        // Notify orphaned responders
1330        for (rid, responder) in orphaned {
1331            responder
1332                .ok(FetchResult::Failed {
1333                    reason: format!("failed fetch to {rid}, peer disconnected: {reason}"),
1334                })
1335                .ok();
1336        }
1337
1338        // Attempt to re-connect to persistent peers.
1339        if self.config.is_persistent(&remote) {
1340            let delay = LocalDuration::from_secs(2u64.saturating_pow(session.attempts() as u32))
1341                .clamp(MIN_RECONNECTION_DELTA, MAX_RECONNECTION_DELTA);
1342
1343            // Nb. We always try to reconnect to persistent peers, even when the error appears
1344            // to not be transient.
1345            session.to_disconnected(since, since + delay);
1346
1347            debug!(target: "service", "Reconnecting to {remote} in {delay}..");
1348
1349            self.outbox.wakeup(delay);
1350        } else {
1351            debug!(target: "service", "Dropping peer {remote}..");
1352            self.sessions.remove(&remote);
1353
1354            let severity = match reason {
1355                DisconnectReason::Dial(_)
1356                | DisconnectReason::Fetch(_)
1357                | DisconnectReason::Connection(_) => {
1358                    if self.is_online() {
1359                        // If we're "online", there's something wrong with this
1360                        // peer connection specifically.
1361                        Severity::Medium
1362                    } else {
1363                        Severity::Low
1364                    }
1365                }
1366                DisconnectReason::Session(e) => e.severity(),
1367                DisconnectReason::Command
1368                | DisconnectReason::Conflict
1369                | DisconnectReason::Policy
1370                | DisconnectReason::SelfConnection => Severity::Low,
1371            };
1372
1373            if let Err(e) = self
1374                .db
1375                .addresses_mut()
1376                .disconnected(&remote, &addr, severity)
1377            {
1378                debug!(target: "service", "Failed to update address store: {e}");
1379            }
1380            // Only re-attempt outbound connections, since we don't care if an inbound connection
1381            // is dropped.
1382            if link.is_outbound() {
1383                self.maintain_connections();
1384            }
1385        }
1386        self.dequeue_fetches();
1387    }
1388
1389    pub fn received_message(&mut self, remote: NodeId, message: Message) {
1390        if let Err(err) = self.handle_message(&remote, message) {
1391            // If there's an error, stop processing messages from this peer.
1392            // However, we still relay messages returned up to this point.
1393            self.outbox
1394                .disconnect(remote, DisconnectReason::Session(err));
1395
1396            // FIXME: The peer should be set in a state such that we don't
1397            // process further messages.
1398        }
1399    }
1400
1401    /// Handle an announcement message.
1402    ///
1403    /// Returns `true` if this announcement should be stored and relayed to connected peers,
1404    /// and `false` if it should not.
1405    pub fn handle_announcement(
1406        &mut self,
1407        relayer: &NodeId,
1408        relayer_addr: &Address,
1409        announcement: &Announcement,
1410    ) -> Result<Option<gossip::AnnouncementId>, session::Error> {
1411        if !announcement.verify() {
1412            return Err(session::Error::Misbehavior);
1413        }
1414        let Announcement {
1415            node: announcer,
1416            message,
1417            ..
1418        } = announcement;
1419
1420        // Ignore our own announcements, in case the relayer sent one by mistake.
1421        if announcer == self.nid() {
1422            return Ok(None);
1423        }
1424        let now = self.clock;
1425        let timestamp = message.timestamp();
1426
1427        // Don't allow messages from too far in the future.
1428        if timestamp.saturating_sub(now.as_millis()) > MAX_TIME_DELTA.as_millis() as u64 {
1429            return Err(session::Error::InvalidTimestamp(timestamp));
1430        }
1431
1432        // We don't process announcements from nodes we don't know, since the node announcement is
1433        // what provides DoS protection.
1434        //
1435        // Note that it's possible to *not* receive the node announcement, but receive the
1436        // subsequent announcements of a node in the case of historical gossip messages requested
1437        // from the `subscribe` message. This can happen if the cut-off time is after the node
1438        // announcement timestamp, but before the other announcements. In that case, we simply
1439        // ignore all announcements of that node until we get a node announcement.
1440        if let AnnouncementMessage::Inventory(_) | AnnouncementMessage::Refs(_) = message {
1441            match self.db.addresses().get(announcer) {
1442                Ok(node) => {
1443                    if node.is_none() {
1444                        debug!(target: "service", "Ignoring announcement from unknown node {announcer} (t={timestamp})");
1445                        return Ok(None);
1446                    }
1447                }
1448                Err(e) => {
1449                    debug!(target: "service", "Failed to look up node in address book: {e}");
1450                    return Ok(None);
1451                }
1452            }
1453        }
1454
1455        // Discard announcement messages we've already seen, otherwise update our last seen time.
1456        let relay = match self.db.gossip_mut().announced(announcer, announcement) {
1457            Ok(Some(id)) => {
1458                log::debug!(
1459                    target: "service",
1460                    "Stored announcement from {announcer} to be broadcast in {} (t={timestamp})",
1461                    (self.last_gossip + GOSSIP_INTERVAL) - self.clock
1462                );
1463                // Keep track of who relayed the message for later.
1464                self.relayed_by.entry(id).or_default().push(*relayer);
1465
1466                // Decide whether or not to relay this message, if it's fresh.
1467                // To avoid spamming peers on startup with historical gossip messages,
1468                // don't relay messages that are too old. We make an exception for node announcements,
1469                // since they are cached, and will hence often carry old timestamps.
1470                let relay = message.is_node_announcement()
1471                    || now - timestamp.to_local_time() <= MAX_TIME_DELTA;
1472                relay.then_some(id)
1473            }
1474            Ok(None) => {
1475                // FIXME: Still mark as relayed by this peer.
1476                // FIXME: Refs announcements should not be delayed, since they are only sent
1477                // to subscribers.
1478                debug!(target: "service", "Ignoring stale announcement from {announcer} (t={timestamp})");
1479                return Ok(None);
1480            }
1481            Err(e) => {
1482                debug!(target: "service", "Failed to update gossip entry from {announcer}: {e}");
1483                return Ok(None);
1484            }
1485        };
1486
1487        match message {
1488            // Process a peer inventory update announcement by (maybe) fetching.
1489            AnnouncementMessage::Inventory(message) => {
1490                self.emitter.emit(Event::InventoryAnnounced {
1491                    nid: *announcer,
1492                    inventory: message.inventory.to_vec(),
1493                    timestamp: message.timestamp,
1494                });
1495                match self.sync_routing(
1496                    message.inventory.iter().cloned(),
1497                    *announcer,
1498                    message.timestamp,
1499                ) {
1500                    Ok(synced) => {
1501                        if synced.is_empty() {
1502                            trace!(target: "service", "No routes updated by inventory announcement from {announcer}");
1503                            return Ok(None);
1504                        }
1505                    }
1506                    Err(e) => {
1507                        debug!(target: "service", "Failed to process inventory from {announcer}: {e}");
1508                        return Ok(None);
1509                    }
1510                }
1511                let mut missing = Vec::new();
1512                let nid = *self.nid();
1513
1514                // Here we handle the special case where the inventory we received is that of
1515                // a connected peer, as opposed to being relayed to us.
1516                if let Some(sess) = self.sessions.get_mut(announcer) {
1517                    for id in message.inventory.as_slice() {
1518                        // If we are connected to the announcer of this inventory, update the peer's
1519                        // subscription filter to include all inventory items. This way, we'll
1520                        // relay messages relating to the peer's inventory.
1521                        if let Some(sub) = &mut sess.subscribe {
1522                            sub.filter.insert(id);
1523                        }
1524
1525                        // If we're seeding and connected to the announcer, and we don't have
1526                        // the inventory, fetch it from the announcer.
1527                        if self.policies.is_seeding(id).expect(
1528                            "Service::handle_announcement: error accessing seeding configuration",
1529                        ) {
1530                            // Only if we do not have the repository locally do we fetch here.
1531                            // If we do have it, only fetch after receiving a ref announcement.
1532                            match self.db.routing().entry(id, &nid) {
1533                                Ok(entry) => {
1534                                    if entry.is_none() {
1535                                        missing.push(*id);
1536                                    }
1537                                }
1538                                Err(e) => debug!(
1539                                    target: "service",
1540                                    "Error checking local inventory for {id}: {e}"
1541                                ),
1542                            }
1543                        }
1544                    }
1545                }
1546                // Since we have limited fetch capacity, it may be that we can't fetch an entire
1547                // inventory from a peer. Therefore we randomize the order of the RIDs to fetch
1548                // different RIDs from different peers in case multiple peers announce the same
1549                // RIDs.
1550                self.rng.shuffle(&mut missing);
1551
1552                for rid in missing {
1553                    debug!(target: "service", "Missing seeded inventory {rid}; initiating fetch..");
1554                    self.fetch(rid, *announcer, vec![], FETCH_TIMEOUT, None);
1555                }
1556                return Ok(relay);
1557            }
1558            AnnouncementMessage::Refs(message) => {
1559                self.emitter.emit(Event::RefsAnnounced {
1560                    nid: *announcer,
1561                    rid: message.rid,
1562                    refs: message.refs.to_vec(),
1563                    timestamp: message.timestamp,
1564                });
1565                // Empty announcements can be safely ignored.
1566                let Some(refs) = NonEmpty::from_vec(message.refs.to_vec()) else {
1567                    debug!(target: "service", "Skipping fetch, no refs in announcement for {} (t={timestamp})", message.rid);
1568                    return Ok(None);
1569                };
1570                // We update inventories when receiving ref announcements, as these could come
1571                // from a new repository being initialized.
1572                self.seed_discovered(message.rid, *announcer, message.timestamp);
1573
1574                // Update sync status of announcer for this repo.
1575                if let Some(refs) = refs.iter().find(|r| &r.remote == self.nid()) {
1576                    debug!(
1577                        target: "service",
1578                        "Refs announcement of {announcer} for {} contains our own remote at {} (t={})",
1579                        message.rid, refs.at, message.timestamp
1580                    );
1581                    match self.db.seeds_mut().synced(
1582                        &message.rid,
1583                        announcer,
1584                        refs.at,
1585                        message.timestamp,
1586                    ) {
1587                        Ok(updated) => {
1588                            if updated {
1589                                debug!(
1590                                    target: "service",
1591                                    "Updating sync status of {announcer} for {} to {}",
1592                                    message.rid, refs.at
1593                                );
1594                                self.emitter.emit(Event::RefsSynced {
1595                                    rid: message.rid,
1596                                    remote: *announcer,
1597                                    at: refs.at,
1598                                });
1599                            } else {
1600                                debug!(
1601                                    target: "service",
1602                                    "Sync status of {announcer} was not updated for {}",
1603                                    message.rid,
1604                                );
1605                            }
1606                        }
1607                        Err(e) => {
1608                            debug!(target: "service", "Failed to update sync status for {}: {e}", message.rid);
1609                        }
1610                    }
1611                }
1612                let repo_entry = self.policies.seed_policy(&message.rid).expect(
1613                    "Service::handle_announcement: error accessing repo seeding configuration",
1614                );
1615                let SeedingPolicy::Allow { scope } = repo_entry.policy else {
1616                    debug!(
1617                        target: "service",
1618                        "Ignoring refs announcement from {announcer}: repository {} isn't seeded (t={timestamp})",
1619                        message.rid
1620                    );
1621                    return Ok(None);
1622                };
1623                // Ref announcements may be relayed by peers who don't have the
1624                // actual refs in storage, therefore we only check whether we
1625                // are connected to the *announcer*, which is required by the
1626                // protocol to only announce refs it has.
1627                //
1628                // TODO(Ade): Perhaps it makes sense to establish connections to
1629                // followed but unconnected peers. Consider:
1630                //   Connections: Alice ←→ Bob ←→ Eve
1631                //   Follows:     Alice ←→ Eve
1632                // Eve announces refs, and Bob relays these announcements to Alice.
1633                // Then, Alice might determine that Bob does not have Eve's refs,
1634                // and therefore connect directly to Eve in order to fetch.
1635                let Some(remote) = self.sessions.get(announcer).cloned() else {
1636                    trace!(
1637                        target: "service",
1638                        "Skipping fetch of {}, no sessions connected to {announcer}",
1639                        message.rid
1640                    );
1641                    return Ok(relay);
1642                };
1643                // Finally, start the fetch.
1644                self.fetch_refs_at(message.rid, remote.id, refs, scope, FETCH_TIMEOUT);
1645
1646                return Ok(relay);
1647            }
1648            AnnouncementMessage::Node(
1649                ann @ NodeAnnouncement {
1650                    features,
1651                    addresses,
1652                    ..
1653                },
1654            ) => {
1655                self.emitter.emit(Event::NodeAnnounced {
1656                    nid: *announcer,
1657                    alias: ann.alias.clone(),
1658                    timestamp: ann.timestamp,
1659                    features: *features,
1660                    addresses: addresses.to_vec(),
1661                });
1662                // If this node isn't a seed, we're not interested in adding it
1663                // to our address book, but other nodes may be, so we relay the message anyway.
1664                if !features.has(Features::SEED) {
1665                    return Ok(relay);
1666                }
1667
1668                match self.db.addresses_mut().insert(
1669                    announcer,
1670                    ann.version,
1671                    ann.features,
1672                    &ann.alias,
1673                    ann.work(),
1674                    &ann.agent,
1675                    timestamp,
1676                    addresses
1677                        .iter()
1678                        // Ignore non-routable addresses unless received from a local network
1679                        // peer. This allows the node to function in a local network.
1680                        .filter(|a| a.is_routable() || relayer_addr.is_local())
1681                        .map(|a| KnownAddress::new(a.clone(), address::Source::Peer)),
1682                ) {
1683                    Ok(updated) => {
1684                        // Only relay if we received new information.
1685                        if updated {
1686                            debug!(
1687                                target: "service",
1688                                "Address store entry for node {announcer} updated at {timestamp}"
1689                            );
1690                            return Ok(relay);
1691                        }
1692                    }
1693                    Err(err) => {
1694                        // An error here is due to a fault in our address store.
1695                        warn!(target: "service", "Failed to process node announcement from {announcer}: {err}");
1696                    }
1697                }
1698            }
1699        }
1700        Ok(None)
1701    }
1702
1703    pub fn handle_info(&mut self, remote: NodeId, info: &Info) -> Result<(), session::Error> {
1704        match info {
1705            // Nb. We don't currently send this message.
1706            Info::RefsAlreadySynced { rid, at } => {
1707                debug!(target: "service", "Refs already synced for {rid} by {remote}");
1708                self.emitter.emit(Event::RefsSynced {
1709                    rid: *rid,
1710                    remote,
1711                    at: *at,
1712                });
1713            }
1714        }
1715
1716        Ok(())
1717    }
1718
1719    pub fn handle_message(
1720        &mut self,
1721        remote: &NodeId,
1722        message: Message,
1723    ) -> Result<(), session::Error> {
1724        let local = self.node_id();
1725        let relay = self.config.is_relay();
1726        let Some(peer) = self.sessions.get_mut(remote) else {
1727            debug!(target: "service", "Session not found for {remote}");
1728            return Ok(());
1729        };
1730        peer.last_active = self.clock;
1731
1732        let limit: RateLimit = match peer.link {
1733            Link::Outbound => self.config.limits.rate.outbound.into(),
1734            Link::Inbound => self.config.limits.rate.inbound.into(),
1735        };
1736        if self
1737            .limiter
1738            .limit(peer.addr.clone().into(), Some(remote), &limit, self.clock)
1739        {
1740            debug!(target: "service", "Rate limiting message from {remote} ({})", peer.addr);
1741            return Ok(());
1742        }
1743        message.log(log::Level::Debug, remote, Link::Inbound);
1744
1745        let connected = match &mut peer.state {
1746            session::State::Disconnected { .. } => {
1747                debug!(target: "service", "Ignoring message from disconnected peer {}", peer.id);
1748                return Ok(());
1749            }
1750            // In case of a discrepancy between the service state and the state of the underlying
1751            // wire protocol, we may receive a message from a peer that we consider not fully connected
1752            // at the service level. To remedy this, we simply transition the peer to a connected state.
1753            //
1754            // This is not ideal, but until the wire protocol and service are unified, it's the simplest
1755            // solution to converge towards the same state.
1756            session::State::Attempted | session::State::Initial => {
1757                debug!(target: "service", "Received unexpected message from connecting peer {}", peer.id);
1758                debug!(target: "service", "Transitioning peer {} to 'connected' state", peer.id);
1759
1760                peer.to_connected(self.clock);
1761
1762                None
1763            }
1764            session::State::Connected {
1765                ping, latencies, ..
1766            } => Some((ping, latencies)),
1767        };
1768
1769        trace!(target: "service", "Received message {message:?} from {remote}");
1770
1771        match message {
1772            // Process a peer announcement.
1773            Message::Announcement(ann) => {
1774                let relayer = remote;
1775                let relayer_addr = peer.addr.clone();
1776
1777                if let Some(id) = self.handle_announcement(relayer, &relayer_addr, &ann)? {
1778                    if self.config.is_relay() {
1779                        if let AnnouncementMessage::Inventory(_) = ann.message {
1780                            if let Err(e) = self
1781                                .database_mut()
1782                                .gossip_mut()
1783                                .set_relay(id, gossip::RelayStatus::Relay)
1784                            {
1785                                warn!(target: "service", "Failed to set relay flag for message: {e}");
1786                                return Ok(());
1787                            }
1788                        } else {
1789                            self.relay(id, ann);
1790                        }
1791                    }
1792                }
1793            }
1794            Message::Subscribe(subscribe) => {
1795                // Filter announcements by interest.
1796                match self
1797                    .db
1798                    .gossip()
1799                    .filtered(&subscribe.filter, subscribe.since, subscribe.until)
1800                {
1801                    Ok(anns) => {
1802                        for ann in anns {
1803                            let ann = match ann {
1804                                Ok(a) => a,
1805                                Err(e) => {
1806                                    debug!(target: "service", "Failed to read gossip message from store: {e}");
1807                                    continue;
1808                                }
1809                            };
1810                            // Don't send announcements authored by the remote, back to the remote.
1811                            if ann.node == *remote {
1812                                continue;
1813                            }
1814                            // Only send messages if we're a relay, or it's our own messages.
1815                            if relay || ann.node == local {
1816                                self.outbox.write(peer, ann.into());
1817                            }
1818                        }
1819                    }
1820                    Err(e) => {
1821                        warn!(target: "service", "Failed to query gossip messages from store: {e}");
1822                    }
1823                }
1824                peer.subscribe = Some(subscribe);
1825            }
1826            Message::Info(info) => {
1827                self.handle_info(*remote, &info)?;
1828            }
1829            Message::Ping(Ping { ponglen, .. }) => {
1830                // Ignore pings which ask for too much data.
1831                if ponglen > Ping::MAX_PONG_ZEROES {
1832                    return Ok(());
1833                }
1834                self.outbox.write(
1835                    peer,
1836                    Message::Pong {
1837                        zeroes: ZeroBytes::new(ponglen),
1838                    },
1839                );
1840            }
1841            Message::Pong { zeroes } => {
1842                if let Some((ping, latencies)) = connected {
1843                    if let session::PingState::AwaitingResponse {
1844                        len: ponglen,
1845                        since,
1846                    } = *ping
1847                    {
1848                        if (ponglen as usize) == zeroes.len() {
1849                            *ping = session::PingState::Ok;
1850                            // Keep track of peer latency.
1851                            latencies.push_back(self.clock - since);
1852                            if latencies.len() > MAX_LATENCIES {
1853                                latencies.pop_front();
1854                            }
1855                        }
1856                    }
1857                }
1858            }
1859        }
1860        Ok(())
1861    }
1862
1863    /// A convenient method to check if we should fetch from a `RefsAnnouncement` with `scope`.
1864    fn refs_status_of(
1865        &self,
1866        rid: RepoId,
1867        refs: NonEmpty<RefsAt>,
1868        scope: &policy::Scope,
1869    ) -> Result<RefsStatus, Error> {
1870        let mut refs = RefsStatus::new(rid, refs, self.db.refs())?;
1871        // Check that there's something we want.
1872        if refs.want.is_empty() {
1873            return Ok(refs);
1874        }
1875        // Check scope.
1876        let mut refs = match scope {
1877            policy::Scope::All => refs,
1878            policy::Scope::Followed => match self.policies.namespaces_for(&self.storage, &rid) {
1879                Ok(Namespaces::All) => refs,
1880                Ok(Namespaces::Followed(followed)) => {
1881                    refs.want.retain(|r| followed.contains(&r.remote));
1882                    refs
1883                }
1884                Err(e) => return Err(e.into()),
1885            },
1886        };
1887        // Remove our own remote, we don't want to fetch that.
1888        refs.want.retain(|r| r.remote != self.node_id());
1889
1890        Ok(refs)
1891    }
1892
1893    /// Add a seed to our routing table.
1894    fn seed_discovered(&mut self, rid: RepoId, nid: NodeId, time: Timestamp) {
1895        if let Ok(result) = self.db.routing_mut().add_inventory([&rid], nid, time) {
1896            if let &[(_, InsertResult::SeedAdded)] = result.as_slice() {
1897                self.emitter.emit(Event::SeedDiscovered { rid, nid });
1898                debug!(target: "service", "Routing table updated for {rid} with seed {nid}");
1899            }
1900        }
1901    }
1902
1903    /// Set of initial messages to send to a peer.
1904    fn initial(&mut self, _link: Link) -> Vec<Message> {
1905        let now = self.clock();
1906        let filter = self.filter();
1907
1908        // TODO: Only subscribe to outbound connections, otherwise we will consume too
1909        // much bandwidth.
1910
1911        // If we've been previously connected to the network, we'll have received gossip messages.
1912        // Instead of simply taking the last timestamp we try to ensure we don't miss any
1913        // messages due un-synchronized clocks.
1914        //
1915        // If this is our first connection to the network, we just ask for a fixed backlog
1916        // of messages to get us started.
1917        let since = if let Some(last) = self.last_online_at {
1918            Timestamp::from(last - SUBSCRIBE_BACKLOG_DELTA)
1919        } else {
1920            (*now - INITIAL_SUBSCRIBE_BACKLOG_DELTA).into()
1921        };
1922        debug!(target: "service", "Subscribing to messages since timestamp {since}..");
1923
1924        vec![
1925            Message::node(self.node.clone(), &self.signer),
1926            Message::inventory(self.inventory.clone(), &self.signer),
1927            Message::subscribe(filter, since, Timestamp::MAX),
1928        ]
1929    }
1930
1931    /// Try to guess whether we're online or not.
1932    fn is_online(&self) -> bool {
1933        self.sessions
1934            .connected()
1935            .filter(|(_, s)| s.addr.is_routable() && s.last_active >= self.clock - IDLE_INTERVAL)
1936            .count()
1937            > 0
1938    }
1939
1940    /// Remove a local repository from our inventory.
1941    fn remove_inventory(&mut self, rid: &RepoId) -> Result<bool, Error> {
1942        let node = self.node_id();
1943        let now = self.timestamp();
1944
1945        let removed = self.db.routing_mut().remove_inventory(rid, &node)?;
1946        if removed {
1947            self.refresh_and_announce_inventory(now)?;
1948        }
1949        Ok(removed)
1950    }
1951
1952    /// Add a local repository to our inventory.
1953    fn add_inventory(&mut self, rid: RepoId) -> Result<bool, Error> {
1954        let node = self.node_id();
1955        let now = self.timestamp();
1956
1957        if !self.storage.contains(&rid)? {
1958            debug!(target: "service", "Attempt to add non-existing inventory {rid}: repository not found in storage");
1959            return Ok(false);
1960        }
1961        // Add to our local inventory.
1962        let updates = self.db.routing_mut().add_inventory([&rid], node, now)?;
1963        let updated = !updates.is_empty();
1964
1965        if updated {
1966            self.refresh_and_announce_inventory(now)?;
1967        }
1968        Ok(updated)
1969    }
1970
1971    /// Update cached inventory message, and announce new inventory to peers.
1972    fn refresh_and_announce_inventory(&mut self, time: Timestamp) -> Result<(), Error> {
1973        let inventory = self.inventory()?;
1974
1975        self.inventory = gossip::inventory(time, inventory);
1976        self.announce_inventory();
1977
1978        Ok(())
1979    }
1980
1981    /// Get our local inventory.
1982    ///
1983    /// A node's inventory is the advertised list of repositories offered by a node.
1984    ///
1985    /// A node's inventory consists of *public* repositories that are seeded and available locally
1986    /// in the node's storage. We use the routing table as the canonical state of all inventories,
1987    /// including the local node's.
1988    ///
1989    /// When a repository is unseeded, it is also removed from the inventory. Private repositories
1990    /// are *not* part of a node's inventory.
1991    fn inventory(&self) -> Result<HashSet<RepoId>, Error> {
1992        self.db
1993            .routing()
1994            .get_inventory(self.nid())
1995            .map_err(Error::from)
1996    }
1997
1998    /// Process a peer inventory announcement by updating our routing table.
1999    /// This function expects the peer's full inventory, and prunes entries that are not in the
2000    /// given inventory.
2001    fn sync_routing(
2002        &mut self,
2003        inventory: impl IntoIterator<Item = RepoId>,
2004        from: NodeId,
2005        timestamp: Timestamp,
2006    ) -> Result<SyncedRouting, Error> {
2007        let mut synced = SyncedRouting::default();
2008        let included = inventory.into_iter().collect::<BTreeSet<_>>();
2009        let mut events = Vec::new();
2010
2011        for (rid, result) in
2012            self.db
2013                .routing_mut()
2014                .add_inventory(included.iter(), from, timestamp)?
2015        {
2016            match result {
2017                InsertResult::SeedAdded => {
2018                    debug!(target: "service", "Routing table updated for {rid} with seed {from}");
2019                    events.push(Event::SeedDiscovered { rid, nid: from });
2020
2021                    if self
2022                        .policies
2023                        .is_seeding(&rid)
2024                        .expect("Service::process_inventory: error accessing seeding configuration")
2025                    {
2026                        // TODO: We should fetch here if we're already connected, case this seed has
2027                        // refs we don't have.
2028                    }
2029                    synced.added.push(rid);
2030                }
2031                InsertResult::TimeUpdated => {
2032                    synced.updated.push(rid);
2033                }
2034                InsertResult::NotUpdated => {}
2035            }
2036        }
2037
2038        synced.removed.extend(
2039            self.db
2040                .routing()
2041                .get_inventory(&from)?
2042                .into_iter()
2043                .filter(|rid| !included.contains(rid)),
2044        );
2045        self.db
2046            .routing_mut()
2047            .remove_inventories(&synced.removed, &from)?;
2048        events.extend(
2049            synced
2050                .removed
2051                .iter()
2052                .map(|&rid| Event::SeedDropped { rid, nid: from }),
2053        );
2054
2055        self.emitter.emit_all(events);
2056
2057        Ok(synced)
2058    }
2059
2060    /// Return a refs announcement including the given remotes.
2061    fn refs_announcement_for(
2062        &mut self,
2063        rid: RepoId,
2064        remotes: impl IntoIterator<Item = NodeId>,
2065    ) -> Result<(Announcement, Vec<RefsAt>), Error> {
2066        let repo = self.storage.repository(rid)?;
2067        let timestamp = self.timestamp();
2068        let mut refs = BoundedVec::<_, REF_REMOTE_LIMIT>::new();
2069
2070        for remote_id in remotes.into_iter() {
2071            let refs_at = RefsAt::new(&repo, remote_id)?;
2072
2073            if refs.push(refs_at).is_err() {
2074                warn!(
2075                    target: "service",
2076                    "refs announcement limit ({REF_REMOTE_LIMIT}) exceeded, peers will see only some of your repository references",
2077                );
2078                break;
2079            }
2080        }
2081
2082        let msg = AnnouncementMessage::from(RefsAnnouncement {
2083            rid,
2084            refs: refs.clone(),
2085            timestamp,
2086        });
2087        Ok((msg.signed(&self.signer), refs.into()))
2088    }
2089
2090    /// Announce our own refs for the given repo.
2091    fn announce_own_refs(
2092        &mut self,
2093        rid: RepoId,
2094        doc: Doc,
2095        namespaces: impl IntoIterator<Item = NodeId>,
2096    ) -> Result<(Vec<RefsAt>, Timestamp), Error> {
2097        let (refs, timestamp) = self.announce_refs(rid, doc, namespaces, true)?;
2098
2099        // Update refs database with our signed refs branches.
2100        // This isn't strictly necessary for now, as we only use the database for fetches, and
2101        // we don't fetch our own refs that are announced, but it's for good measure.
2102        for r in refs.iter() {
2103            self.emitter.emit(Event::LocalRefsAnnounced {
2104                rid,
2105                refs: *r,
2106                timestamp,
2107            });
2108            if let Err(e) = self.database_mut().refs_mut().set(
2109                &rid,
2110                &r.remote,
2111                &SIGREFS_BRANCH,
2112                r.at,
2113                timestamp.to_local_time(),
2114            ) {
2115                warn!(
2116                    target: "service",
2117                    "Error updating refs database for `rad/sigrefs` of {} in {rid}: {e}",
2118                    r.remote
2119                );
2120            }
2121        }
2122        Ok((refs, timestamp))
2123    }
2124
2125    /// Announce local refs for given repo.
2126    fn announce_refs(
2127        &mut self,
2128        rid: RepoId,
2129        doc: Doc,
2130        remotes: impl IntoIterator<Item = NodeId>,
2131        own: bool,
2132    ) -> Result<(Vec<RefsAt>, Timestamp), Error> {
2133        let (ann, refs) = self.refs_announcement_for(rid, remotes)?;
2134        let timestamp = ann.timestamp();
2135        let peers = self.sessions.connected().map(|(_, p)| p);
2136
2137        // Update our sync status for our own refs. This is useful for determining if refs were
2138        // updated while the node was stopped.
2139        for r in refs.iter().filter(|r| own || r.remote == ann.node) {
2140            info!(
2141                target: "service",
2142                "Announcing refs {rid}/{r} to peers (t={timestamp})..",
2143            );
2144            // Update our local node's sync status to mark the refs as announced.
2145            if let Err(e) = self.db.seeds_mut().synced(&rid, &ann.node, r.at, timestamp) {
2146                warn!(target: "service", "Failed to update sync status for local node: {e}");
2147            } else {
2148                debug!(target: "service", "Saved local sync status for {rid}..");
2149            }
2150        }
2151
2152        self.outbox.announce(
2153            ann,
2154            peers.filter(|p| {
2155                // Only announce to peers who are allowed to view this repo.
2156                doc.is_visible_to(&p.id.into())
2157            }),
2158            self.db.gossip_mut(),
2159        );
2160        Ok((refs, timestamp))
2161    }
2162
2163    fn reconnect(&mut self, nid: NodeId, addr: Address) -> bool {
2164        if let Some(sess) = self.sessions.get_mut(&nid) {
2165            sess.to_initial();
2166            self.outbox.connect(nid, addr);
2167
2168            return true;
2169        }
2170        false
2171    }
2172
2173    fn connect(&mut self, nid: NodeId, addr: Address) -> Result<(), ConnectError> {
2174        debug!(target: "service", "Connecting to {nid} ({addr})..");
2175
2176        if nid == self.node_id() {
2177            return Err(ConnectError::SelfConnection);
2178        }
2179        if let Ok(true) = self.policies.is_blocked(&nid) {
2180            return Err(ConnectError::Blocked { nid });
2181        }
2182        if !self.is_supported_address(&addr) {
2183            return Err(ConnectError::UnsupportedAddress { nid, addr });
2184        }
2185        if self.sessions.contains_key(&nid) {
2186            return Err(ConnectError::SessionExists { nid });
2187        }
2188        if self.sessions.outbound().count() >= self.config.limits.connection.outbound.into() {
2189            return Err(ConnectError::LimitReached { nid, addr });
2190        }
2191        let persistent = self.config.is_persistent(&nid);
2192        let timestamp: Timestamp = self.clock.into();
2193
2194        if let Err(e) = self.db.addresses_mut().attempted(&nid, &addr, timestamp) {
2195            warn!(target: "service", "Failed to update address book with connection attempt: {e}");
2196        }
2197        self.sessions.insert(
2198            nid,
2199            Session::outbound(nid, addr.clone(), persistent, self.rng.clone()),
2200        );
2201        self.outbox.connect(nid, addr);
2202
2203        Ok(())
2204    }
2205
2206    fn seeds(&self, rid: &RepoId, namespaces: HashSet<PublicKey>) -> Result<Seeds, Error> {
2207        let mut seeds = Seeds::new(self.rng.clone());
2208
2209        // First, build a list of peers that have synced refs for `namespaces`, if any.
2210        // This step is skipped:
2211        //  1. For the repository (and thus all `namespaces`), if it not exist in storage.
2212        //  2. For each `namespace` in `namespaces`, which does not exist in storage.
2213        if let Ok(repo) = self.storage.repository(*rid) {
2214            for namespace in namespaces.iter() {
2215                let Ok(local) = RefsAt::new(&repo, *namespace) else {
2216                    continue;
2217                };
2218
2219                for seed in self.db.seeds().seeds_for(rid)? {
2220                    let seed = seed?;
2221                    let state = self.sessions.get(&seed.nid).map(|s| s.state.clone());
2222                    let synced = if local.at == seed.synced_at.oid {
2223                        SyncStatus::Synced { at: seed.synced_at }
2224                    } else {
2225                        let local = SyncedAt::new(local.at, &repo)?;
2226
2227                        SyncStatus::OutOfSync {
2228                            local,
2229                            remote: seed.synced_at,
2230                        }
2231                    };
2232                    seeds.insert(Seed::new(seed.nid, seed.addresses, state, Some(synced)));
2233                }
2234            }
2235        }
2236
2237        // Then, add peers we know about but have no information about the sync status.
2238        // These peers have announced that they seed the repository via an inventory
2239        // announcement, but we haven't received any ref announcements from them.
2240        for nid in self.db.routing().get(rid)? {
2241            if namespaces.contains(&nid) {
2242                continue;
2243            }
2244            if seeds.contains(&nid) {
2245                // We already have a richer entry for this node.
2246                continue;
2247            }
2248            let addrs = self.db.addresses().addresses_of(&nid)?;
2249            let state = self.sessions.get(&nid).map(|s| s.state.clone());
2250
2251            seeds.insert(Seed::new(nid, addrs, state, None));
2252        }
2253        Ok(seeds)
2254    }
2255
2256    /// Return a new filter object, based on our seeding policy.
2257    fn filter(&self) -> Filter {
2258        if self.config.seeding_policy.is_allow() {
2259            // TODO: Remove bits for blocked repos.
2260            Filter::default()
2261        } else {
2262            self.filter.clone()
2263        }
2264    }
2265
2266    /// Get a timestamp for using in announcements.
2267    /// Never returns the same timestamp twice.
2268    fn timestamp(&mut self) -> Timestamp {
2269        let now = Timestamp::from(self.clock);
2270        if *now > *self.last_timestamp {
2271            self.last_timestamp = now;
2272        } else {
2273            self.last_timestamp = self.last_timestamp + 1;
2274        }
2275        self.last_timestamp
2276    }
2277
2278    fn relay(&mut self, id: gossip::AnnouncementId, ann: Announcement) {
2279        let announcer = ann.node;
2280        let relayed_by = self.relayed_by.get(&id);
2281        let rid = if let AnnouncementMessage::Refs(RefsAnnouncement { rid, .. }) = ann.message {
2282            Some(rid)
2283        } else {
2284            None
2285        };
2286        // Choose peers we should relay this message to.
2287        // 1. Don't relay to a peer who sent us this message.
2288        // 2. Don't relay to the peer who signed this announcement.
2289        let relay_to = self
2290            .sessions
2291            .connected()
2292            .filter(|(id, _)| {
2293                relayed_by
2294                    .map(|relayers| !relayers.contains(id))
2295                    .unwrap_or(true) // If there are no relayers we let it through.
2296            })
2297            .filter(|(id, _)| **id != announcer)
2298            .filter(|(id, _)| {
2299                if let Some(rid) = rid {
2300                    // Only relay this message if the peer is allowed to know about the
2301                    // repository. If we don't have the repository, return `false` because
2302                    // we can't determine if it's private or public.
2303                    self.storage
2304                        .get(rid)
2305                        .ok()
2306                        .flatten()
2307                        .map(|doc| doc.is_visible_to(&(*id).into()))
2308                        .unwrap_or(false)
2309                } else {
2310                    // Announcement doesn't concern a specific repository, let it through.
2311                    true
2312                }
2313            })
2314            .map(|(_, p)| p);
2315
2316        self.outbox.relay(ann, relay_to);
2317    }
2318
2319    ////////////////////////////////////////////////////////////////////////////
2320    // Periodic tasks
2321    ////////////////////////////////////////////////////////////////////////////
2322
2323    fn relay_announcements(&mut self) -> Result<(), Error> {
2324        let now = self.clock.into();
2325        let rows = self.database_mut().gossip_mut().relays(now)?;
2326        let local = self.node_id();
2327
2328        for (id, msg) in rows {
2329            let announcer = msg.node;
2330            if announcer == local {
2331                // Don't relay our own stored gossip messages.
2332                continue;
2333            }
2334            self.relay(id, msg);
2335        }
2336        Ok(())
2337    }
2338
2339    /// Announce our inventory to all connected peers, unless it was already announced.
2340    fn announce_inventory(&mut self) {
2341        let timestamp = self.inventory.timestamp.to_local_time();
2342
2343        if self.last_inventory == timestamp {
2344            debug!(target: "service", "Skipping redundant inventory announcement (t={})", self.inventory.timestamp);
2345            return;
2346        }
2347        let msg = AnnouncementMessage::from(self.inventory.clone());
2348
2349        self.outbox.announce(
2350            msg.signed(&self.signer),
2351            self.sessions.connected().map(|(_, p)| p),
2352            self.db.gossip_mut(),
2353        );
2354        self.last_inventory = timestamp;
2355    }
2356
2357    fn prune_routing_entries(&mut self, now: &LocalTime) -> Result<(), routing::Error> {
2358        let count = self.db.routing().len()?;
2359        if count <= self.config.limits.routing_max_size.into() {
2360            return Ok(());
2361        }
2362
2363        let delta = count - usize::from(self.config.limits.routing_max_size);
2364        let nid = self.node_id();
2365        self.db.routing_mut().prune(
2366            (*now - LocalDuration::from(self.config.limits.routing_max_age)).into(),
2367            Some(delta),
2368            &nid,
2369        )?;
2370        Ok(())
2371    }
2372
2373    fn disconnect_unresponsive_peers(&mut self, now: &LocalTime) {
2374        let stale = self
2375            .sessions
2376            .connected()
2377            .filter(|(_, session)| *now - session.last_active >= STALE_CONNECTION_TIMEOUT);
2378
2379        for (_, session) in stale {
2380            debug!(target: "service", "Disconnecting unresponsive peer {}..", session.id);
2381
2382            // TODO: Should we switch the session state to "disconnected" even before receiving
2383            // an official "disconnect"? Otherwise we keep pinging until we get the disconnection.
2384
2385            self.outbox.disconnect(
2386                session.id,
2387                DisconnectReason::Session(session::Error::Timeout),
2388            );
2389        }
2390    }
2391
2392    /// Ensure connection health by pinging connected peers.
2393    fn keep_alive(&mut self, now: &LocalTime) {
2394        let inactive_sessions = self
2395            .sessions
2396            .connected_mut()
2397            .filter(|(_, session)| *now - session.last_active >= KEEP_ALIVE_DELTA)
2398            .map(|(_, session)| session);
2399        for session in inactive_sessions {
2400            session.ping(self.clock, &mut self.outbox).ok();
2401        }
2402    }
2403
2404    /// Get a list of peers available to connect to, sorted by lowest penalty.
2405    fn available_peers(&mut self) -> Vec<Peer> {
2406        match self.db.addresses().entries() {
2407            Ok(entries) => {
2408                // Nb. we don't want to connect to any peers that already have a session with us,
2409                // even if it's in a disconnected state. Those sessions are re-attempted automatically.
2410                let mut peers = entries
2411                    .filter(|entry| entry.version == PROTOCOL_VERSION)
2412                    .filter(|entry| !entry.address.banned)
2413                    .filter(|entry| !entry.penalty.is_connect_threshold_reached())
2414                    .filter(|entry| !self.sessions.contains_key(&entry.node))
2415                    .filter(|entry| !self.policies.is_blocked(&entry.node).unwrap_or(false))
2416                    .filter(|entry| !self.config.external_addresses.contains(&entry.address.addr))
2417                    .filter(|entry| &entry.node != self.nid())
2418                    .filter(|entry| self.is_supported_address(&entry.address.addr))
2419                    .fold(HashMap::new(), |mut acc, entry| {
2420                        acc.entry(entry.node)
2421                            .and_modify(|e: &mut Peer| e.addresses.push(entry.address.clone()))
2422                            .or_insert_with(|| Peer {
2423                                nid: entry.node,
2424                                addresses: vec![entry.address],
2425                                penalty: entry.penalty,
2426                            });
2427                        acc
2428                    })
2429                    .into_values()
2430                    .collect::<Vec<_>>();
2431                peers.sort_by_key(|p| p.penalty);
2432                peers
2433            }
2434            Err(e) => {
2435                warn!(target: "service", "Unable to lookup available peers in address book: {e}");
2436                Vec::new()
2437            }
2438        }
2439    }
2440
2441    /// Fetch all repositories that are seeded but missing from storage.
2442    fn fetch_missing_repositories(&mut self) -> Result<(), Error> {
2443        let policies = self.policies.seed_policies()?.collect::<Vec<_>>();
2444        for policy in policies {
2445            let policy = match policy {
2446                Ok(policy) => policy,
2447                Err(err) => {
2448                    debug!(target: "protocol::filter", "Failed to read seed policy: {err}");
2449                    continue;
2450                }
2451            };
2452
2453            let rid = policy.rid;
2454
2455            if !policy.is_allow() {
2456                continue;
2457            }
2458            match self.storage.contains(&rid) {
2459                Ok(exists) => {
2460                    if exists {
2461                        continue;
2462                    }
2463                }
2464                Err(err) => {
2465                    log::debug!(target: "protocol::filter", "Failed to check if {rid} exists: {err}");
2466                    continue;
2467                }
2468            }
2469            match self.seeds(&rid, [self.node_id()].into()) {
2470                Ok(seeds) => {
2471                    if let Some(connected) = NonEmpty::from_vec(seeds.connected().collect()) {
2472                        for seed in connected {
2473                            self.fetch(rid, seed.nid, vec![], FETCH_TIMEOUT, None);
2474                        }
2475                    } else {
2476                        // TODO: We should make sure that this fetch is retried later, either
2477                        // when we connect to a seed, or when we discover a new seed.
2478                        // Since new connections and routing table updates are both conditions for
2479                        // fetching, we should trigger fetches when those conditions appear.
2480                        // Another way to handle this would be to update our database, saying
2481                        // that we're trying to fetch a certain repo. We would then just
2482                        // iterate over those entries in the above circumstances. This is
2483                        // merely an optimization though, we can also iterate over all seeded
2484                        // repos and check which ones are not in our inventory.
2485                        debug!(target: "service", "No connected seeds found for {rid}..");
2486                    }
2487                }
2488                Err(e) => {
2489                    debug!(target: "service", "Couldn't fetch missing repo {rid}: failed to lookup seeds: {e}");
2490                }
2491            }
2492        }
2493        Ok(())
2494    }
2495
2496    /// Run idle task for all connections.
2497    fn idle_connections(&mut self) {
2498        for (_, sess) in self.sessions.iter_mut() {
2499            sess.idle(self.clock);
2500
2501            if sess.is_stable() {
2502                // Mark as connected once connection is stable.
2503                if let Err(e) =
2504                    self.db
2505                        .addresses_mut()
2506                        .connected(&sess.id, &sess.addr, self.clock.into())
2507                {
2508                    warn!(target: "service", "Failed to update address book with connection: {e}");
2509                }
2510            }
2511        }
2512    }
2513
2514    /// Try to maintain a target number of connections.
2515    fn maintain_connections(&mut self) {
2516        let PeerConfig::Dynamic = self.config.peers else {
2517            return;
2518        };
2519        trace!(target: "service", "Maintaining connections..");
2520
2521        let target = TARGET_OUTBOUND_PEERS;
2522        let now = self.clock;
2523        let outbound = self
2524            .sessions
2525            .values()
2526            .filter(|s| s.link.is_outbound())
2527            .filter(|s| s.is_connected() || s.is_connecting())
2528            .count();
2529        let wanted = target.saturating_sub(outbound);
2530
2531        // Don't connect to more peers than needed.
2532        if wanted == 0 {
2533            return;
2534        }
2535
2536        // Peers available to connect to.
2537        let available = self
2538            .available_peers()
2539            .into_iter()
2540            .filter_map(|peer| {
2541                peer.addresses
2542                    .into_iter()
2543                    .find(|ka| match (ka.last_success, ka.last_attempt) {
2544                        // If we succeeded the last time we tried, this is a good address.
2545                        // If it's been long enough that we failed to connect, we also try again.
2546                        (Some(success), Some(attempt)) => {
2547                            success >= attempt || now - attempt >= CONNECTION_RETRY_DELTA
2548                        }
2549                        // If we haven't succeeded yet, and we waited long enough, we can try this address.
2550                        (None, Some(attempt)) => now - attempt >= CONNECTION_RETRY_DELTA,
2551                        // If we have no failed attempts for this address, it's worth a try.
2552                        (_, None) => true,
2553                    })
2554                    .map(|ka| (peer.nid, ka))
2555            })
2556            .filter(|(_, ka)| self.is_supported_address(&ka.addr));
2557
2558        // Peers we are going to attempt connections to.
2559        let connect = available.take(wanted).collect::<Vec<_>>();
2560        if connect.len() < wanted {
2561            log::debug!(
2562                target: "service",
2563                "Not enough available peers to connect to (available={}, wanted={wanted})",
2564                connect.len()
2565            );
2566        }
2567        for (id, ka) in connect {
2568            if let Err(e) = self.connect(id, ka.addr.clone()) {
2569                warn!(target: "service", "Service::maintain_connections connection error: {e}");
2570            }
2571        }
2572    }
2573
2574    /// Maintain persistent peer connections.
2575    fn maintain_persistent(&mut self) {
2576        trace!(target: "service", "Maintaining persistent peers..");
2577
2578        let now = self.local_time();
2579        let mut reconnect = Vec::new();
2580
2581        for (nid, session) in self.sessions.iter_mut() {
2582            if self.config.is_persistent(nid) {
2583                if self.policies.is_blocked(nid).unwrap_or(false) {
2584                    continue;
2585                }
2586                if let session::State::Disconnected { retry_at, .. } = &mut session.state {
2587                    // TODO: Try to reconnect only if the peer was attempted. A disconnect without
2588                    // even a successful attempt means that we're unlikely to be able to reconnect.
2589
2590                    if now >= *retry_at {
2591                        reconnect.push((*nid, session.addr.clone(), session.attempts()));
2592                    }
2593                }
2594            }
2595        }
2596
2597        for (nid, addr, attempts) in reconnect {
2598            if self.reconnect(nid, addr) {
2599                debug!(target: "service", "Reconnecting to {nid} (attempts={attempts})...");
2600            }
2601        }
2602    }
2603
2604    /// Checks if the given [`Address`] is supported for connecting to.
2605    ///
2606    /// # IPv4/IPv6/DNS
2607    ///
2608    /// Always returns `true`.
2609    ///
2610    /// # Tor
2611    ///
2612    /// If the [`Address`] is an `.onion` address and the service supports onion
2613    /// routing then this will return `true`.
2614    fn is_supported_address(&self, address: &Address) -> bool {
2615        match AddressType::from(address) {
2616            // Only consider onion addresses if configured.
2617            AddressType::Onion => self.config.onion.is_some(),
2618            AddressType::Dns | AddressType::Ipv4 | AddressType::Ipv6 => true,
2619        }
2620    }
2621}
2622
2623/// Gives read access to the service state.
2624pub trait ServiceState {
2625    /// Get the Node ID.
2626    fn nid(&self) -> &NodeId;
2627    /// Get the existing sessions.
2628    fn sessions(&self) -> &Sessions;
2629    /// Get fetch state.
2630    fn fetching(&self) -> &FetcherState;
2631    /// Get outbox.
2632    fn outbox(&self) -> &Outbox;
2633    /// Get rate limiter.
2634    fn limiter(&self) -> &RateLimiter;
2635    /// Get event emitter.
2636    fn emitter(&self) -> &Emitter<Event>;
2637    /// Get a repository from storage.
2638    fn get(&self, rid: RepoId) -> Result<Option<Doc>, RepositoryError>;
2639    /// Get the clock.
2640    fn clock(&self) -> &LocalTime;
2641    /// Get the clock mutably.
2642    fn clock_mut(&mut self) -> &mut LocalTime;
2643    /// Get service configuration.
2644    fn config(&self) -> &Config;
2645    /// Get service metrics.
2646    fn metrics(&self) -> &Metrics;
2647}
2648
2649impl<D, S, G> ServiceState for Service<D, S, G>
2650where
2651    D: routing::Store,
2652    G: crypto::signature::Signer<crypto::Signature>,
2653    S: ReadStorage,
2654{
2655    fn nid(&self) -> &NodeId {
2656        self.signer.public_key()
2657    }
2658
2659    fn sessions(&self) -> &Sessions {
2660        &self.sessions
2661    }
2662
2663    fn fetching(&self) -> &FetcherState {
2664        self.fetcher.state()
2665    }
2666
2667    fn outbox(&self) -> &Outbox {
2668        &self.outbox
2669    }
2670
2671    fn limiter(&self) -> &RateLimiter {
2672        &self.limiter
2673    }
2674
2675    fn emitter(&self) -> &Emitter<Event> {
2676        &self.emitter
2677    }
2678
2679    fn get(&self, rid: RepoId) -> Result<Option<Doc>, RepositoryError> {
2680        self.storage.get(rid)
2681    }
2682
2683    fn clock(&self) -> &LocalTime {
2684        &self.clock
2685    }
2686
2687    fn clock_mut(&mut self) -> &mut LocalTime {
2688        &mut self.clock
2689    }
2690
2691    fn config(&self) -> &Config {
2692        &self.config
2693    }
2694
2695    fn metrics(&self) -> &Metrics {
2696        &self.metrics
2697    }
2698}
2699
2700/// Disconnect reason.
2701#[derive(Debug)]
2702pub enum DisconnectReason {
2703    /// Error while dialing the remote. This error occurs before a connection is
2704    /// even established. Errors of this kind are usually not transient.
2705    Dial(Arc<dyn std::error::Error + Sync + Send>),
2706    /// Error with an underlying established connection. Sometimes, reconnecting
2707    /// after such an error is possible.
2708    Connection(Arc<dyn std::error::Error + Sync + Send>),
2709    /// Error with a fetch.
2710    Fetch(FetchError),
2711    /// Session error.
2712    Session(session::Error),
2713    /// Session conflicts with existing session.
2714    Conflict,
2715    /// Connection to self.
2716    SelfConnection,
2717    /// Peer is blocked by policy
2718    Policy,
2719    /// User requested disconnect
2720    Command,
2721}
2722
2723impl DisconnectReason {
2724    pub fn is_dial_err(&self) -> bool {
2725        matches!(self, Self::Dial(_))
2726    }
2727
2728    pub fn is_connection_err(&self) -> bool {
2729        matches!(self, Self::Connection(_))
2730    }
2731
2732    pub fn connection() -> Self {
2733        DisconnectReason::Connection(Arc::new(std::io::Error::from(
2734            std::io::ErrorKind::ConnectionReset,
2735        )))
2736    }
2737}
2738
2739impl fmt::Display for DisconnectReason {
2740    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2741        match self {
2742            Self::Dial(err) => write!(f, "{err}"),
2743            Self::Connection(err) => write!(f, "{err}"),
2744            Self::Command => write!(f, "command"),
2745            Self::SelfConnection => write!(f, "self-connection"),
2746            Self::Conflict => write!(f, "conflict"),
2747            Self::Policy => write!(f, "policy"),
2748            Self::Session(err) => write!(f, "{err}"),
2749            Self::Fetch(err) => write!(f, "fetch: {err}"),
2750        }
2751    }
2752}
2753
2754/// Result of a project lookup.
2755#[derive(Debug)]
2756pub struct Lookup {
2757    /// Whether the project was found locally or not.
2758    pub local: Option<Doc>,
2759    /// A list of remote peers on which the project is known to exist.
2760    pub remote: Vec<NodeId>,
2761}
2762
2763#[derive(thiserror::Error, Debug)]
2764pub enum LookupError {
2765    #[error(transparent)]
2766    Routing(#[from] routing::Error),
2767    #[error(transparent)]
2768    Repository(#[from] RepositoryError),
2769}
2770
2771#[derive(Debug, Clone)]
2772/// Holds currently (or recently) connected peers.
2773pub struct Sessions(AddressBook<NodeId, Session>);
2774
2775impl Sessions {
2776    pub fn new(rng: Rng) -> Self {
2777        Self(AddressBook::new(rng))
2778    }
2779
2780    /// Iterator over fully connected peers.
2781    pub fn connected(&self) -> impl Iterator<Item = (&NodeId, &Session)> + Clone {
2782        self.0
2783            .iter()
2784            .filter_map(move |(id, sess)| match &sess.state {
2785                session::State::Connected { .. } => Some((id, sess)),
2786                _ => None,
2787            })
2788    }
2789
2790    /// Iterator over connected inbound peers.
2791    pub fn inbound(&self) -> impl Iterator<Item = (&NodeId, &Session)> + Clone {
2792        self.connected().filter(|(_, s)| s.link.is_inbound())
2793    }
2794
2795    /// Iterator over outbound peers.
2796    pub fn outbound(&self) -> impl Iterator<Item = (&NodeId, &Session)> + Clone {
2797        self.connected().filter(|(_, s)| s.link.is_outbound())
2798    }
2799
2800    /// Iterator over mutable fully connected peers.
2801    pub fn connected_mut(&mut self) -> impl Iterator<Item = (&NodeId, &mut Session)> {
2802        self.0.iter_mut().filter(move |(_, s)| s.is_connected())
2803    }
2804
2805    /// Iterator over disconnected peers.
2806    pub fn disconnected_mut(&mut self) -> impl Iterator<Item = (&NodeId, &mut Session)> {
2807        self.0.iter_mut().filter(move |(_, s)| s.is_disconnected())
2808    }
2809
2810    /// Return whether this node has a fully established session.
2811    pub fn is_connected(&self, id: &NodeId) -> bool {
2812        self.0.get(id).map(|s| s.is_connected()).unwrap_or(false)
2813    }
2814
2815    /// Return whether this node can be connected to.
2816    pub fn is_disconnected(&self, id: &NodeId) -> bool {
2817        self.0.get(id).map(|s| s.is_disconnected()).unwrap_or(true)
2818    }
2819}
2820
2821impl Deref for Sessions {
2822    type Target = AddressBook<NodeId, Session>;
2823
2824    fn deref(&self) -> &Self::Target {
2825        &self.0
2826    }
2827}
2828
2829impl DerefMut for Sessions {
2830    fn deref_mut(&mut self) -> &mut Self::Target {
2831        &mut self.0
2832    }
2833}