1#![allow(clippy::too_many_arguments)]
2#![allow(clippy::collapsible_match)]
3#![allow(clippy::collapsible_if)]
4#![warn(clippy::unwrap_used)]
5pub mod command;
6pub use command::{Command, QueryState};
7
8pub mod filter;
9pub mod gossip;
10pub mod io;
11pub mod limiter;
12pub mod message;
13pub mod session;
14
15use std::collections::hash_map::Entry;
16use std::collections::{BTreeSet, HashMap, HashSet};
17use std::net::IpAddr;
18use std::ops::{Deref, DerefMut};
19use std::sync::Arc;
20use std::{fmt, net, time};
21
22use crossbeam_channel as chan;
23use fastrand::Rng;
24use localtime::{LocalDuration, LocalTime};
25use log::*;
26use nonempty::NonEmpty;
27
28use radicle::identity::Doc;
29use radicle::node;
30use radicle::node::address;
31use radicle::node::address::Store as _;
32use radicle::node::address::{AddressBook, AddressType, KnownAddress};
33use radicle::node::config::{PeerConfig, RateLimit};
34use radicle::node::device::Device;
35use radicle::node::refs::Store as _;
36use radicle::node::routing::Store as _;
37use radicle::node::seed;
38use radicle::node::seed::Store as _;
39use radicle::node::{Penalty, Severity};
40use radicle::storage::refs::{FeatureLevel, SIGREFS_BRANCH};
41use radicle::storage::{RepositoryError, RepositoryInfo, SignedRefsInfo};
42use radicle_fetch::policy::SeedingPolicy;
43
44use crate::fetcher;
45use crate::fetcher::service::FetcherService;
46use crate::fetcher::FetcherState;
47use crate::fetcher::RefsToFetch;
48use crate::service::gossip::Store as _;
49use crate::service::message::{
50 Announcement, AnnouncementMessage, Info, NodeAnnouncement, Ping, RefsAnnouncement, RefsStatus,
51};
52use crate::service::policy::{store::Write, Scope};
53use radicle::identity::RepoId;
54use radicle::node::events::Emitter;
55use radicle::node::routing;
56use radicle::node::routing::InsertResult;
57use radicle::node::{Address, Features, FetchResult, HostName, Seed, Seeds, SyncStatus, SyncedAt};
58use radicle::prelude::*;
59use radicle::storage;
60use radicle::storage::{refs::RefsAt, Namespaces, ReadStorage};
61use radicle::crypto;
64use radicle::node::Link;
65use radicle::node::PROTOCOL_VERSION;
66
67use crate::bounded::BoundedVec;
68use crate::service::filter::Filter;
69pub use crate::service::message::{Message, ZeroBytes};
70pub use crate::service::session::{QueuedFetch, Session};
71use crate::worker::FetchError;
72use radicle::node::events::{Event, Events};
73use radicle::node::{Config, NodeId};
74
75use radicle::node::policy::config as policy;
76
77use self::io::Outbox;
78use self::limiter::RateLimiter;
79use self::message::InventoryAnnouncement;
80use self::policy::NamespacesError;
81
82pub const IDLE_INTERVAL: LocalDuration = LocalDuration::from_secs(30);
84pub const GOSSIP_INTERVAL: LocalDuration = LocalDuration::from_secs(6);
86pub const ANNOUNCE_INTERVAL: LocalDuration = LocalDuration::from_mins(60);
88pub const SYNC_INTERVAL: LocalDuration = LocalDuration::from_secs(60);
90pub const PRUNE_INTERVAL: LocalDuration = LocalDuration::from_mins(30);
92pub const STALE_CONNECTION_TIMEOUT: LocalDuration = LocalDuration::from_mins(2);
94pub const KEEP_ALIVE_DELTA: LocalDuration = LocalDuration::from_mins(1);
96pub const MAX_LATENCIES: usize = 16;
98pub const MAX_TIME_DELTA: LocalDuration = LocalDuration::from_mins(60);
100pub const MAX_CONNECTION_ATTEMPTS: usize = 3;
102pub const INITIAL_SUBSCRIBE_BACKLOG_DELTA: LocalDuration = LocalDuration::from_mins(60 * 24);
105pub const SUBSCRIBE_BACKLOG_DELTA: LocalDuration = LocalDuration::from_mins(3);
108pub const MIN_RECONNECTION_DELTA: LocalDuration = LocalDuration::from_secs(3);
110pub const MAX_RECONNECTION_DELTA: LocalDuration = LocalDuration::from_mins(60);
112pub const CONNECTION_RETRY_DELTA: LocalDuration = LocalDuration::from_mins(10);
114pub const FETCH_TIMEOUT: time::Duration = time::Duration::from_secs(30);
116pub const TARGET_OUTBOUND_PEERS: usize = 8;
118
119pub use message::ADDRESS_LIMIT;
121pub use message::INVENTORY_LIMIT;
123pub use message::REF_REMOTE_LIMIT;
125
126#[derive(Clone, Debug, Default, serde::Serialize)]
128#[serde(rename_all = "camelCase")]
129pub struct Metrics {
130 pub peers: HashMap<NodeId, PeerMetrics>,
132 pub worker_queue_size: usize,
134 pub open_channels: usize,
136}
137
138impl Metrics {
139 pub fn peer(&mut self, nid: NodeId) -> &mut PeerMetrics {
141 self.peers.entry(nid).or_default()
142 }
143}
144
145#[derive(Clone, Debug, Default, serde::Serialize)]
147#[serde(rename_all = "camelCase")]
148pub struct PeerMetrics {
149 pub received_git_bytes: usize,
150 pub received_fetch_requests: usize,
151 pub received_bytes: usize,
152 pub received_gossip_messages: usize,
153 pub sent_bytes: usize,
154 pub sent_fetch_requests: usize,
155 pub sent_git_bytes: usize,
156 pub sent_gossip_messages: usize,
157 pub streams_opened: usize,
158 pub inbound_connection_attempts: usize,
159 pub outbound_connection_attempts: usize,
160 pub disconnects: usize,
161}
162
163#[derive(Default)]
165struct SyncedRouting {
166 added: Vec<RepoId>,
168 removed: Vec<RepoId>,
170 updated: Vec<RepoId>,
172}
173
174impl SyncedRouting {
175 fn is_empty(&self) -> bool {
176 self.added.is_empty() && self.removed.is_empty() && self.updated.is_empty()
177 }
178}
179
180#[derive(Debug, Clone)]
182struct Peer {
183 nid: NodeId,
184 addresses: Vec<KnownAddress>,
185 penalty: Penalty,
186}
187
188#[derive(thiserror::Error, Debug)]
190pub enum Error {
191 #[error(transparent)]
192 Git(#[from] radicle::git::raw::Error),
193 #[error(transparent)]
194 Storage(#[from] storage::Error),
195 #[error(transparent)]
196 Gossip(#[from] gossip::Error),
197 #[error(transparent)]
198 Refs(#[from] storage::refs::Error),
199 #[error(transparent)]
200 Routing(#[from] routing::Error),
201 #[error(transparent)]
202 Address(#[from] address::Error),
203 #[error(transparent)]
204 Database(#[from] node::db::Error),
205 #[error(transparent)]
206 Seeds(#[from] seed::Error),
207 #[error(transparent)]
208 Policy(#[from] policy::Error),
209 #[error(transparent)]
210 Repository(#[from] radicle::storage::RepositoryError),
211 #[error("namespaces error: {0}")]
212 Namespaces(Box<NamespacesError>),
213}
214
215impl From<NamespacesError> for Error {
216 fn from(e: NamespacesError) -> Self {
217 Self::Namespaces(Box::new(e))
218 }
219}
220
221#[derive(thiserror::Error, Debug)]
222pub enum ConnectError {
223 #[error("attempted connection to peer {nid} which already has a session")]
224 SessionExists { nid: NodeId },
225 #[error("attempted connection to self")]
226 SelfConnection,
227 #[error("outbound connection limit reached when attempting {nid} ({addr})")]
228 LimitReached { nid: NodeId, addr: Address },
229 #[error(
230 "attempted connection to {nid}, via {addr} but addresses of this kind are not supported"
231 )]
232 UnsupportedAddress { nid: NodeId, addr: Address },
233 #[error("attempted connection with blocked peer {nid}")]
234 Blocked { nid: NodeId },
235}
236
237pub trait Store:
239 address::Store + gossip::Store + routing::Store + seed::Store + node::refs::Store
240{
241}
242
243impl Store for radicle::node::Database {}
244
245#[derive(Debug)]
247pub struct FetchState {
248 pub from: NodeId,
250 pub refs_at: Vec<RefsAt>,
252 pub subscribers: Vec<chan::Sender<FetchResult>>,
254}
255
256#[derive(Debug)]
258pub struct Stores<D>(D);
259
260impl<D> Stores<D>
261where
262 D: Store,
263{
264 pub fn routing(&self) -> &impl routing::Store {
266 &self.0
267 }
268
269 pub fn routing_mut(&mut self) -> &mut impl routing::Store {
271 &mut self.0
272 }
273
274 pub fn addresses(&self) -> &impl address::Store {
276 &self.0
277 }
278
279 pub fn addresses_mut(&mut self) -> &mut impl address::Store {
281 &mut self.0
282 }
283
284 pub fn gossip(&self) -> &impl gossip::Store {
286 &self.0
287 }
288
289 pub fn gossip_mut(&mut self) -> &mut impl gossip::Store {
291 &mut self.0
292 }
293
294 pub fn seeds(&self) -> &impl seed::Store {
296 &self.0
297 }
298
299 pub fn seeds_mut(&mut self) -> &mut impl seed::Store {
301 &mut self.0
302 }
303
304 pub fn refs(&self) -> &impl node::refs::Store {
306 &self.0
307 }
308
309 pub fn refs_mut(&mut self) -> &mut impl node::refs::Store {
311 &mut self.0
312 }
313}
314
315impl<D> AsMut<D> for Stores<D> {
316 fn as_mut(&mut self) -> &mut D {
317 &mut self.0
318 }
319}
320
321impl<D> From<D> for Stores<D> {
322 fn from(db: D) -> Self {
323 Self(db)
324 }
325}
326
327#[derive(Debug)]
329pub struct Service<D, S, G> {
330 config: Config,
332 signer: Device<G>,
334 storage: S,
336 db: Stores<D>,
338 policies: policy::Config<Write>,
340 sessions: Sessions,
342 clock: LocalTime,
344 relayed_by: HashMap<gossip::AnnouncementId, Vec<NodeId>>,
347 outbox: Outbox,
349 node: NodeAnnouncement,
351 inventory: InventoryAnnouncement,
353 rng: Rng,
355 fetcher: FetcherService<command::Responder<FetchResult>>,
356 limiter: RateLimiter,
358 filter: Filter,
360 last_idle: LocalTime,
362 last_gossip: LocalTime,
364 last_sync: LocalTime,
366 last_prune: LocalTime,
368 last_announce: LocalTime,
370 last_inventory: LocalTime,
372 last_timestamp: Timestamp,
374 started_at: Option<LocalTime>,
376 last_online_at: Option<LocalTime>,
378 emitter: Emitter<Event>,
380 listening: Vec<net::SocketAddr>,
382 metrics: Metrics,
384}
385
386impl<D, S, G> Service<D, S, G> {
387 pub fn node_id(&self) -> NodeId {
389 *self.signer.public_key()
390 }
391
392 pub fn local_time(&self) -> LocalTime {
394 self.clock
395 }
396
397 pub fn emitter(&self) -> Emitter<Event> {
398 self.emitter.clone()
399 }
400}
401
402impl<D, S, G> Service<D, S, G>
403where
404 D: Store,
405 S: WriteStorage + 'static,
406 G: crypto::signature::Signer<crypto::Signature>,
407{
408 pub fn initialize(&mut self, time: LocalTime) -> Result<(), Error> {
410 debug!(target: "service", "Init @{}", time.as_millis());
411 assert_ne!(time, LocalTime::default());
412
413 let nid = self.node_id();
414
415 self.clock = time;
416 self.started_at = Some(time);
417 self.last_online_at = match self.db.gossip().last() {
418 Ok(Some(last)) => Some(last.to_local_time()),
419 Ok(None) => None,
420 Err(e) => {
421 warn!(target: "service", "Failed to get the latest gossip message from db: {e}");
422 None
423 }
424 };
425
426 match self.db.refs().count() {
429 Ok(0) => {
430 info!(target: "service", "Empty refs database, populating from storage..");
431 if let Err(e) = self.db.refs_mut().populate(&self.storage) {
432 warn!(target: "service", "Failed to populate refs database: {e}");
433 }
434 }
435 Ok(n) => debug!(target: "service", "Refs database has {n} cached references"),
436 Err(e) => {
437 warn!(target: "service", "Failed to retrieve count of refs from database: {e}")
438 }
439 }
440
441 let announced = self
442 .db
443 .seeds()
444 .seeded_by(&nid)?
445 .collect::<Result<HashMap<_, _>, _>>()?;
446 let mut inventory = BTreeSet::new();
447 let mut private = BTreeSet::new();
448
449 for repo in self.storage.repositories()? {
450 let repo = self.upgrade_sigrefs(repo)?;
451 let rid = repo.rid;
452
453 if !self.policies.is_seeding(&rid)? {
455 debug!(target: "service", "Local repository {rid} is not seeded");
456 continue;
457 }
458 if repo.doc.is_public() {
460 inventory.insert(rid);
461 } else {
462 private.insert(rid);
463 }
464 let Some(updated_at) = repo.synced_at else {
466 continue;
467 };
468 if let Some(announced) = announced.get(&rid) {
470 if updated_at.oid == announced.oid {
471 continue;
472 }
473 }
474 if self.db.seeds_mut().synced(
476 &rid,
477 &nid,
478 updated_at.oid,
479 updated_at.timestamp.into(),
480 )? {
481 debug!(target: "service", "Saved local sync status for {rid}..");
482 }
483 if let Ok((ann, _)) = self.refs_announcement_for(rid, [nid]) {
487 debug!(target: "service", "Adding refs announcement for {rid} to historical gossip messages..");
488 self.db.gossip_mut().announced(&nid, &ann)?;
489 }
490 }
491
492 self.db
496 .routing_mut()
497 .add_inventory(inventory.iter(), nid, time.into())?;
498 self.inventory = gossip::inventory(self.timestamp(), inventory);
499
500 self.db
503 .routing_mut()
504 .remove_inventories(private.iter(), &nid)?;
505
506 self.filter = Filter::allowed_by(self.policies.seed_policies()?);
508 let addrs = self.config.connect.clone();
510 for (id, addr) in addrs.into_iter().map(|ca| ca.into()) {
511 if let Err(e) = self.connect(id, addr) {
512 debug!(target: "service", "Service::initialization connection error: {e}");
513 }
514 }
515 self.maintain_connections();
517 self.outbox.wakeup(IDLE_INTERVAL);
519 self.outbox.wakeup(GOSSIP_INTERVAL);
520
521 Ok(())
522 }
523
524 fn upgrade_sigrefs(&mut self, mut info: RepositoryInfo) -> Result<RepositoryInfo, Error> {
525 if !matches!(info.refs, SignedRefsInfo::NeedsMigration) {
526 return Ok(info);
527 }
528
529 let rid = info.rid;
530
531 log::info!(
532 "Migrating `rad/sigrefs` of {rid} to force feature level {}.",
533 FeatureLevel::LATEST
534 );
535
536 let repo = self.storage.repository_mut(rid)?;
537 let refs = repo.force_sign_refs(&self.signer)?;
539
540 let repo = self.storage.repository(rid)?;
541 let synced_at = SyncedAt::new(refs.at, &repo)?;
542
543 info.synced_at = Some(synced_at);
544 info.refs = SignedRefsInfo::Some(refs);
545
546 Ok(info)
547 }
548}
549
550impl<D, S, G> Service<D, S, G>
551where
552 D: Store,
553 S: ReadStorage + 'static,
554 G: crypto::signature::Signer<crypto::Signature>,
555{
556 pub fn new(
557 config: Config,
558 db: Stores<D>,
559 storage: S,
560 policies: policy::Config<Write>,
561 signer: Device<G>,
562 rng: Rng,
563 node: NodeAnnouncement,
564 emitter: Emitter<Event>,
565 ) -> Self {
566 let sessions = Sessions::new(rng.clone());
567 let limiter = RateLimiter::new(config.peers());
568 let last_timestamp = node.timestamp;
569 let clock = LocalTime::default(); let inventory = gossip::inventory(clock.into(), []); let fetcher = {
572 let config = fetcher::Config::new()
573 .with_max_concurrency(
574 std::num::NonZeroUsize::new(config.limits.fetch_concurrency.into())
575 .expect("fetch concurrency was zero, must be at least 1"),
576 )
577 .with_max_capacity(fetcher::MaxQueueSize::default());
578 FetcherService::new(config)
579 };
580 Self {
581 config,
582 storage,
583 policies,
584 signer,
585 rng,
586 inventory,
587 node,
588 clock,
589 db,
590 outbox: Outbox::default(),
591 limiter,
592 sessions,
593 fetcher,
594 filter: Filter::empty(),
595 relayed_by: HashMap::default(),
596 last_idle: LocalTime::default(),
597 last_gossip: LocalTime::default(),
598 last_sync: LocalTime::default(),
599 last_prune: LocalTime::default(),
600 last_timestamp,
601 last_announce: LocalTime::default(),
602 last_inventory: LocalTime::default(),
603 started_at: None, last_online_at: None, emitter,
606 listening: vec![],
607 metrics: Metrics::default(),
608 }
609 }
610
611 pub fn started(&self) -> Option<LocalTime> {
613 self.started_at
614 }
615
616 #[allow(clippy::should_implement_trait)]
618 pub fn next(&mut self) -> Option<io::Io> {
619 self.outbox.next()
620 }
621
622 pub fn seed(&mut self, id: &RepoId, scope: Scope) -> Result<bool, policy::Error> {
625 let updated = self.policies.seed(id, scope)?;
626 self.filter.insert(id);
627
628 Ok(updated)
629 }
630
631 pub fn unseed(&mut self, id: &RepoId) -> Result<bool, policy::Error> {
636 let updated = self.policies.unseed(id)?;
637
638 if updated {
639 self.filter = Filter::allowed_by(self.policies.seed_policies()?);
643 if let Err(e) = self.remove_inventory(id) {
645 warn!(target: "service", "Failed to update inventory after unseed: {e}");
646 }
647 }
648 Ok(updated)
649 }
650
651 #[allow(unused)]
655 pub fn closest_peers(&self, n: usize) -> Vec<NodeId> {
656 todo!()
657 }
658
659 pub fn database(&self) -> &Stores<D> {
661 &self.db
662 }
663
664 pub fn database_mut(&mut self) -> &mut Stores<D> {
666 &mut self.db
667 }
668
669 pub fn storage(&self) -> &S {
671 &self.storage
672 }
673
674 pub fn storage_mut(&mut self) -> &mut S {
676 &mut self.storage
677 }
678
679 pub fn policies(&self) -> &policy::Config<Write> {
681 &self.policies
682 }
683
684 pub fn signer(&self) -> &Device<G> {
686 &self.signer
687 }
688
689 pub fn events(&mut self) -> Events {
691 Events::from(self.emitter.subscribe())
692 }
693
694 pub fn fetcher(&self) -> &FetcherState {
695 self.fetcher.state()
696 }
697
698 pub fn outbox(&mut self) -> &mut Outbox {
700 &mut self.outbox
701 }
702
703 pub fn config(&self) -> &Config {
705 &self.config
706 }
707
708 pub fn lookup(&self, rid: RepoId) -> Result<Lookup, LookupError> {
710 let this = self.nid();
711 let local = self.storage.get(rid)?;
712 let remote = self
713 .db
714 .routing()
715 .get(&rid)?
716 .iter()
717 .filter(|nid| nid != &this)
718 .cloned()
719 .collect();
720
721 Ok(Lookup { local, remote })
722 }
723
724 pub fn tick(&mut self, now: LocalTime, metrics: &Metrics) {
725 trace!(
726 target: "service",
727 "Tick +{}",
728 now - self.started_at.expect("Service::tick: service must be initialized")
729 );
730 if now >= self.clock {
731 self.clock = now;
732 } else {
733 #[cfg(not(test))]
736 warn!(
737 target: "service",
738 "System clock is not monotonic: {now} is not greater or equal to {}", self.clock
739 );
740 }
741 self.metrics = metrics.clone();
742 }
743
744 pub fn wake(&mut self) {
745 let now = self.clock;
746
747 trace!(
748 target: "service",
749 "Wake +{}",
750 now - self.started_at.expect("Service::wake: service must be initialized")
751 );
752
753 if now - self.last_idle >= IDLE_INTERVAL {
754 trace!(target: "service", "Running 'idle' task...");
755
756 self.keep_alive(&now);
757 self.disconnect_unresponsive_peers(&now);
758 self.idle_connections();
759 self.maintain_connections();
760 self.dequeue_fetches();
761 self.outbox.wakeup(IDLE_INTERVAL);
762 self.last_idle = now;
763 }
764 if now - self.last_gossip >= GOSSIP_INTERVAL {
765 trace!(target: "service", "Running 'gossip' task...");
766
767 if let Err(e) = self.relay_announcements() {
768 warn!(target: "service", "Failed to relay stored announcements: {e}");
769 }
770 self.outbox.wakeup(GOSSIP_INTERVAL);
771 self.last_gossip = now;
772 }
773 if now - self.last_sync >= SYNC_INTERVAL {
774 trace!(target: "service", "Running 'sync' task...");
775
776 if let Err(e) = self.fetch_missing_repositories() {
777 warn!(target: "service", "Failed to fetch missing inventory: {e}");
778 }
779 self.outbox.wakeup(SYNC_INTERVAL);
780 self.last_sync = now;
781 }
782 if now - self.last_announce >= ANNOUNCE_INTERVAL {
783 trace!(target: "service", "Running 'announce' task...");
784
785 self.announce_inventory();
786 self.outbox.wakeup(ANNOUNCE_INTERVAL);
787 self.last_announce = now;
788 }
789 if now - self.last_prune >= PRUNE_INTERVAL {
790 trace!(target: "service", "Running 'prune' task...");
791
792 if let Err(err) = self.prune_routing_entries(&now) {
793 warn!(target: "service", "Failed to prune routing entries: {err}");
794 }
795 if let Err(err) = self
796 .db
797 .gossip_mut()
798 .prune((now - LocalDuration::from(self.config.limits.gossip_max_age)).into())
799 {
800 warn!(target: "service", "Failed to prune gossip entries: {err}");
801 }
802
803 self.outbox.wakeup(PRUNE_INTERVAL);
804 self.last_prune = now;
805 }
806
807 self.maintain_persistent();
809 }
810
811 pub fn command(&mut self, cmd: Command) {
812 info!(target: "service", "Received command {cmd:?}");
813
814 match cmd {
815 Command::Connect(nid, addr, opts) => {
816 if opts.persistent {
817 self.config.connect.insert((nid, addr.clone()).into());
818 }
819 if let Err(e) = self.connect(nid, addr) {
820 match e {
821 ConnectError::SessionExists { nid } => {
822 self.emitter.emit(Event::PeerConnected { nid });
823 }
824 e => {
825 self.emitter.emit(Event::PeerDisconnected {
827 nid,
828 reason: e.to_string(),
829 });
830 }
831 }
832 }
833 }
834 Command::Disconnect(nid) => {
835 self.outbox.disconnect(nid, DisconnectReason::Command);
836 }
837 Command::Config(resp) => {
838 resp.ok(self.config.clone()).ok();
839 }
840 Command::ListenAddrs(resp) => {
841 resp.ok(self.listening.clone()).ok();
842 }
843 Command::Seeds(rid, namespaces, resp) => match self.seeds(&rid, namespaces) {
844 Ok(seeds) => {
845 let (connected, disconnected) = seeds.partition();
846 debug!(
847 target: "service",
848 "Found {} connected seed(s) and {} disconnected seed(s) for {}",
849 connected.len(), disconnected.len(), rid
850 );
851 resp.ok(seeds).ok();
852 }
853 Err(e) => {
854 warn!(target: "service", "Failed to get seeds for {rid}: {e}");
855 resp.err(e).ok();
856 }
857 },
858 Command::Fetch(rid, seed, timeout, signed_references_minimum_feature_level, resp) => {
859 let feature_level = signed_references_minimum_feature_level
860 .unwrap_or(self.config.fetch.feature_level_min());
861 let config = self
862 .fetch_config()
863 .with_timeout(timeout)
864 .with_minimum_feature_level(feature_level);
865 self.fetch(rid, seed, vec![], config, Some(resp));
866 }
867 Command::Seed(rid, scope, resp) => {
868 let seeded = self
870 .seed(&rid, scope)
871 .expect("Service::command: error seeding repository");
872 resp.ok(seeded).ok();
873
874 self.outbox.broadcast(
876 Message::subscribe(self.filter(), self.clock.into(), Timestamp::MAX),
877 self.sessions.connected().map(|(_, s)| s),
878 );
879 }
880 Command::Unseed(id, resp) => {
881 let updated = self
882 .unseed(&id)
883 .expect("Service::command: error unseeding repository");
884 resp.ok(updated).ok();
885 }
886 Command::Follow(id, alias, resp) => {
887 let seeded = self
888 .policies
889 .follow(&id, alias.as_ref())
890 .expect("Service::command: error following node");
891 resp.ok(seeded).ok();
892 }
893 Command::Unfollow(id, resp) => {
894 let updated = self
895 .policies
896 .unfollow(&id)
897 .expect("Service::command: error unfollowing node");
898 resp.ok(updated).ok();
899 }
900 Command::Block(id, resp) => {
901 let updated = self
902 .policies
903 .set_follow_policy(&id, policy::Policy::Block)
904 .expect("Service::command: error blocking node");
905 if updated {
906 self.outbox.disconnect(id, DisconnectReason::Policy);
907 }
908 resp.send(updated).ok();
909 }
910 Command::AnnounceRefs(id, namespaces, resp) => {
911 let doc = match self.storage.get(id) {
912 Ok(Some(doc)) => doc,
913 Ok(None) => {
914 warn!(target: "service", "Failed to announce refs: repository {id} not found");
915 resp.err(command::Error::custom(format!("repository {id} not found")))
916 .ok();
917 return;
918 }
919 Err(e) => {
920 warn!(target: "service", "Failed to announce refs: doc error: {e}");
921 resp.err(e).ok();
922 return;
923 }
924 };
925
926 match self.announce_own_refs(id, doc, namespaces) {
927 Ok((refs, _timestamp)) => {
928 if let Some(refs) = refs.first() {
932 resp.ok(*refs).ok();
933 } else {
934 resp.err(command::Error::custom(format!(
935 "no refs were announced for {id}"
936 )))
937 .ok();
938 }
939 }
940 Err(err) => {
941 warn!(target: "service", "Failed to announce refs: {err}");
942 resp.err(err).ok();
943 }
944 }
945 }
946 Command::AnnounceInventory => {
947 self.announce_inventory();
948 }
949 Command::AddInventory(rid, resp) => match self.add_inventory(rid) {
950 Ok(updated) => {
951 resp.ok(updated).ok();
952 }
953 Err(e) => {
954 warn!(target: "service", "Failed to add {rid} to inventory: {e}");
955 resp.err(e).ok();
956 }
957 },
958 Command::QueryState(query, sender) => {
959 sender.send(query(self)).ok();
960 }
961 }
962 }
963
964 fn fetch_refs_at(
967 &mut self,
968 rid: RepoId,
969 from: NodeId,
970 refs: NonEmpty<RefsAt>,
971 scope: Scope,
972 config: fetcher::FetchConfig,
973 ) -> bool {
974 match self.refs_status_of(rid, refs, &scope) {
975 Ok(status) => {
976 if status.want.is_empty() {
977 debug!(target: "service", "Skipping fetch for {rid}, all refs are already in storage");
978 } else {
979 self.fetch(rid, from, status.want, config, None);
980 return true;
981 }
982 }
983 Err(e) => {
984 warn!(target: "service", "Failed to get the refs status of {rid}: {e}");
985 }
986 }
987 false
989 }
990
991 fn fetch(
992 &mut self,
993 rid: RepoId,
994 from: NodeId,
995 refs_at: Vec<RefsAt>,
996 config: fetcher::FetchConfig,
997 channel: Option<command::Responder<FetchResult>>,
998 ) {
999 let session = {
1000 let reason = format!("peer {from} is not connected; cannot initiate fetch");
1001 let Some(session) = self.sessions.get_mut(&from) else {
1002 if let Some(c) = channel {
1003 c.ok(FetchResult::Failed { reason }).ok();
1004 }
1005 return;
1006 };
1007 if !session.is_connected() {
1008 if let Some(c) = channel {
1009 c.ok(FetchResult::Failed { reason }).ok();
1010 }
1011 return;
1012 }
1013 session
1014 };
1015
1016 let cmd = fetcher::state::command::Fetch {
1017 from,
1018 rid,
1019 refs: refs_at.into(),
1020 config,
1021 };
1022 let fetcher::service::FetchInitiated { event, rejected } = self.fetcher.fetch(cmd, channel);
1023
1024 if let Some(c) = rejected {
1025 c.ok(FetchResult::Failed {
1026 reason: "fetch queue at capacity".to_string(),
1027 })
1028 .ok();
1029 }
1030
1031 match event {
1032 fetcher::state::event::Fetch::Started {
1033 rid,
1034 from,
1035 refs: refs_at,
1036 config,
1037 } => {
1038 debug!(target: "service", "Starting fetch for {rid} from {from}");
1039 self.outbox.fetch(
1040 session,
1041 rid,
1042 refs_at.into(),
1043 self.config.limits.fetch_pack_receive,
1044 config,
1045 );
1046 }
1047 fetcher::state::event::Fetch::Queued { rid, from } => {
1048 debug!(target: "service", "Queued fetch for {rid} from {from}");
1049 }
1050 fetcher::state::event::Fetch::AlreadyFetching { rid, from } => {
1051 debug!(target: "service", "Already fetching {rid} from {from}");
1052 }
1053 fetcher::state::event::Fetch::QueueAtCapacity { rid, from, .. } => {
1054 debug!(target: "service", "Queue at capacity for {from}, rejected {rid}");
1055 }
1056 }
1057 }
1058
1059 pub fn fetched(
1060 &mut self,
1061 rid: RepoId,
1062 from: NodeId,
1063 result: Result<crate::worker::fetch::FetchResult, crate::worker::FetchError>,
1064 ) {
1065 let cmd = fetcher::state::command::Fetched { from, rid };
1066 let fetcher::service::FetchCompleted { event, subscribers } = self.fetcher.fetched(cmd);
1067
1068 self.dequeue_fetches();
1070
1071 match event {
1072 fetcher::state::event::Fetched::NotFound { from, rid } => {
1073 debug!(target: "service", "Unexpected fetch result for {rid} from {from}");
1074 }
1075 fetcher::state::event::Fetched::Completed { from, rid, refs: _ } => {
1076 let fetch_result = match &result {
1078 Ok(success) => FetchResult::Success {
1079 updated: success.updated.clone(),
1080 namespaces: success.namespaces.clone(),
1081 clone: success.clone,
1082 },
1083 Err(e) => FetchResult::Failed {
1084 reason: e.to_string(),
1085 },
1086 };
1087 for responder in subscribers {
1088 responder.ok(fetch_result.clone()).ok();
1089 }
1090 match result {
1091 Ok(crate::worker::fetch::FetchResult {
1092 updated,
1093 canonical,
1094 namespaces,
1095 clone,
1096 doc,
1097 }) => {
1098 info!(target: "service", "Fetched {rid} from {from} successfully");
1099 self.seed_discovered(rid, from, self.clock.into());
1102
1103 for update in &updated {
1104 if update.is_skipped() {
1105 trace!(target: "service", "Ref skipped: {update} for {rid}");
1106 } else {
1107 debug!(target: "service", "Ref updated: {update} for {rid}");
1108 }
1109 }
1110 self.emitter.emit(Event::RefsFetched {
1111 remote: from,
1112 rid,
1113 updated: updated.clone(),
1114 });
1115 self.emitter
1116 .emit_all(canonical.into_iter().map(|(refname, target)| {
1117 Event::CanonicalRefUpdated {
1118 rid,
1119 refname,
1120 target,
1121 }
1122 }));
1123
1124 if clone && doc.is_public() {
1127 debug!(target: "service", "Updating and announcing inventory for cloned repository {rid}..");
1128
1129 if let Err(e) = self.add_inventory(rid) {
1130 warn!(target: "service", "Failed to announce inventory for {rid}: {e}");
1131 }
1132 }
1133
1134 if updated.is_empty() || updated.iter().all(|u| u.is_skipped()) {
1136 debug!(target: "service", "Nothing to announce, no refs were updated..");
1137 } else {
1138 if let Err(e) = self.announce_refs(rid, doc.into(), namespaces, false) {
1141 warn!(target: "service", "Failed to announce new refs: {e}");
1142 }
1143 }
1144 }
1145 Err(err) => {
1146 warn!(target: "service", "Fetch failed for {rid} from {from}: {err}");
1147
1148 if err.is_timeout() {
1151 self.outbox.disconnect(from, DisconnectReason::Fetch(err));
1152 }
1153 }
1154 }
1155 }
1156 }
1157 }
1158
1159 pub fn dequeue_fetches(&mut self) {
1167 let sessions = self
1168 .sessions
1169 .shuffled()
1170 .map(|(k, _)| *k)
1171 .collect::<Vec<_>>();
1172
1173 for nid in sessions {
1174 #[allow(clippy::unwrap_used)]
1175 let sess = self.sessions.get_mut(&nid).unwrap();
1176 if !sess.is_connected() {
1177 continue;
1178 }
1179
1180 let Some(fetcher::QueuedFetch {
1181 rid,
1182 refs: refs_at,
1183 config,
1184 }) = self.fetcher.dequeue(&nid)
1185 else {
1186 continue;
1187 };
1188
1189 let repo_entry = self
1191 .policies
1192 .seed_policy(&rid)
1193 .expect("error accessing repo seeding configuration");
1194
1195 let SeedingPolicy::Allow { scope } = repo_entry.policy else {
1196 debug!(target: "service", "Repository {} no longer seeded, skipping", rid);
1197 continue;
1198 };
1199
1200 debug!(target: "service", "Dequeued fetch for {} from {}", rid, nid);
1201
1202 match refs_at {
1203 RefsToFetch::Refs(refs) => {
1204 self.fetch_refs_at(rid, nid, refs, scope, config);
1205 }
1206 RefsToFetch::All => {
1207 self.fetch(rid, nid, vec![], config, None);
1210 }
1211 }
1212 }
1213 }
1214
1215 pub fn accepted(&mut self, ip: IpAddr) -> bool {
1217 if ip.is_loopback() || ip.is_unspecified() {
1220 return true;
1221 }
1222 if self.sessions.inbound().count() >= self.config.limits.connection.inbound.into() {
1224 return false;
1225 }
1226 match self.db.addresses().is_ip_banned(ip) {
1227 Ok(banned) => {
1228 if banned {
1229 debug!(target: "service", "Rejecting inbound connection from banned ip {ip}");
1230 return false;
1231 }
1232 }
1233 Err(e) => warn!(target: "service", "Failed to query ban status for {ip}: {e}"),
1234 }
1235 let host: HostName = ip.into();
1236 let tokens = self.config.limits.rate.inbound;
1237
1238 if self.limiter.limit(host.clone(), None, &tokens, self.clock) {
1239 trace!(target: "service", "Rate limiting inbound connection from {host}..");
1240 return false;
1241 }
1242 true
1243 }
1244
1245 pub fn attempted(&mut self, nid: NodeId, addr: Address) {
1246 debug!(target: "service", "Attempted connection to {nid} ({addr})");
1247
1248 if let Some(sess) = self.sessions.get_mut(&nid) {
1249 sess.to_attempted();
1250 } else {
1251 #[cfg(debug_assertions)]
1252 panic!("Service::attempted: unknown session {nid}@{addr}");
1253 }
1254 }
1255
1256 pub fn listening(&mut self, local_addr: net::SocketAddr) {
1257 info!(target: "node", "Listening on {local_addr}..");
1258
1259 self.listening.push(local_addr);
1260 }
1261
1262 pub fn connected(&mut self, remote: NodeId, addr: Address, link: Link) {
1263 if let Ok(true) = self.policies.is_blocked(&remote) {
1264 self.emitter.emit(Event::PeerDisconnected {
1265 nid: remote,
1266 reason: format!("{remote} is blocked"),
1267 });
1268 info!(target: "service", "Disconnecting blocked inbound peer {remote}");
1269 self.outbox.disconnect(remote, DisconnectReason::Policy);
1270 return;
1271 }
1272
1273 info!(target: "service", "Connected to {remote} ({addr}) ({link:?})");
1274 self.emitter.emit(Event::PeerConnected { nid: remote });
1275
1276 let msgs = self.initial(link);
1277
1278 if link.is_outbound() {
1279 if let Some(peer) = self.sessions.get_mut(&remote) {
1280 peer.to_connected(self.clock);
1281 self.outbox.write_all(peer, msgs);
1282 }
1283 } else {
1284 match self.sessions.entry(remote) {
1285 Entry::Occupied(mut e) => {
1286 let peer = e.get_mut();
1296 debug!(
1297 target: "service",
1298 "Connecting peer {remote} already has a session open ({peer})"
1299 );
1300 peer.link = link;
1301 peer.to_connected(self.clock);
1302 self.outbox.write_all(peer, msgs);
1303 }
1304 Entry::Vacant(e) => {
1305 if let HostName::Ip(ip) = addr.host {
1306 if !address::is_local(&ip) {
1307 if let Err(e) =
1308 self.db
1309 .addresses_mut()
1310 .record_ip(&remote, ip, self.clock.into())
1311 {
1312 log::debug!(target: "service", "Failed to record IP address for {remote}: {e}");
1313 }
1314 }
1315 }
1316 let peer = e.insert(Session::inbound(
1317 remote,
1318 addr,
1319 self.config.is_persistent(&remote),
1320 self.rng.clone(),
1321 self.clock,
1322 ));
1323 self.outbox.write_all(peer, msgs);
1324 }
1325 }
1326 }
1327 }
1328
1329 pub fn disconnected(&mut self, remote: NodeId, link: Link, reason: &DisconnectReason) {
1330 let since = self.local_time();
1331 let Some(session) = self.sessions.get_mut(&remote) else {
1332 trace!(target: "service", "Redundant disconnection for {remote} ({reason})");
1335 return;
1336 };
1337 if session.link != link {
1340 return;
1341 }
1342
1343 info!(target: "service", "Disconnected from {remote} ({reason})");
1344 self.emitter.emit(Event::PeerDisconnected {
1345 nid: remote,
1346 reason: reason.to_string(),
1347 });
1348
1349 let link = session.link;
1350 let addr = session.addr.clone();
1351
1352 let cmd = fetcher::state::command::Cancel { from: remote };
1353 let fetcher::service::FetchesCancelled { event, orphaned } = self.fetcher.cancel(cmd);
1354
1355 match event {
1356 fetcher::state::event::Cancel::Unexpected { from } => {
1357 debug!(target: "service", "No fetches to cancel for {from}");
1358 }
1359 fetcher::state::event::Cancel::Canceled {
1360 from,
1361 active,
1362 queued,
1363 } => {
1364 debug!(target: "service", "Cancelled {} ongoing, {} queued for {from}", active.len(), queued.len());
1365 }
1366 }
1367
1368 for (rid, responder) in orphaned {
1370 responder
1371 .ok(FetchResult::Failed {
1372 reason: format!("failed fetch to {rid}, peer disconnected: {reason}"),
1373 })
1374 .ok();
1375 }
1376
1377 if self.config.is_persistent(&remote) {
1379 let delay = LocalDuration::from_secs(2u64.saturating_pow(session.attempts() as u32))
1380 .clamp(MIN_RECONNECTION_DELTA, MAX_RECONNECTION_DELTA);
1381
1382 session.to_disconnected(since, since + delay);
1385
1386 debug!(target: "service", "Reconnecting to {remote} in {delay}..");
1387
1388 self.outbox.wakeup(delay);
1389 } else {
1390 debug!(target: "service", "Dropping peer {remote}..");
1391 self.sessions.remove(&remote);
1392
1393 let severity = match reason {
1394 DisconnectReason::Dial(_)
1395 | DisconnectReason::Fetch(_)
1396 | DisconnectReason::Connection(_) => {
1397 if self.is_online() {
1398 Severity::Medium
1401 } else {
1402 Severity::Low
1403 }
1404 }
1405 DisconnectReason::Session(e) => e.severity(),
1406 DisconnectReason::Command
1407 | DisconnectReason::Conflict
1408 | DisconnectReason::Policy
1409 | DisconnectReason::SelfConnection => Severity::Low,
1410 };
1411
1412 if let Err(e) = self
1413 .db
1414 .addresses_mut()
1415 .disconnected(&remote, &addr, severity)
1416 {
1417 debug!(target: "service", "Failed to update address store: {e}");
1418 }
1419 if link.is_outbound() {
1422 self.maintain_connections();
1423 }
1424 }
1425 self.dequeue_fetches();
1426 }
1427
1428 pub fn received_message(&mut self, remote: NodeId, message: Message) {
1429 if let Err(err) = self.handle_message(&remote, message) {
1430 self.outbox
1433 .disconnect(remote, DisconnectReason::Session(err));
1434
1435 }
1438 }
1439
1440 pub fn handle_announcement(
1445 &mut self,
1446 relayer: &NodeId,
1447 relayer_addr: &Address,
1448 announcement: &Announcement,
1449 ) -> Result<Option<gossip::AnnouncementId>, session::Error> {
1450 if !announcement.verify() {
1451 return Err(session::Error::Misbehavior);
1452 }
1453 let Announcement {
1454 node: announcer,
1455 message,
1456 ..
1457 } = announcement;
1458
1459 if announcer == self.nid() {
1461 return Ok(None);
1462 }
1463 let now = self.clock;
1464 let timestamp = message.timestamp();
1465
1466 if timestamp.saturating_sub(now.as_millis()) > MAX_TIME_DELTA.as_millis() as u64 {
1468 return Err(session::Error::InvalidTimestamp(timestamp));
1469 }
1470
1471 if let AnnouncementMessage::Inventory(_) | AnnouncementMessage::Refs(_) = message {
1480 match self.db.addresses().get(announcer) {
1481 Ok(node) => {
1482 if node.is_none() {
1483 debug!(target: "service", "Ignoring announcement from unknown node {announcer} (t={timestamp})");
1484 return Ok(None);
1485 }
1486 }
1487 Err(e) => {
1488 debug!(target: "service", "Failed to look up node in address book: {e}");
1489 return Ok(None);
1490 }
1491 }
1492 }
1493
1494 let relay = match self.db.gossip_mut().announced(announcer, announcement) {
1496 Ok(Some(id)) => {
1497 log::debug!(
1498 target: "service",
1499 "Stored announcement from {announcer} to be broadcast in {} (t={timestamp})",
1500 (self.last_gossip + GOSSIP_INTERVAL) - self.clock
1501 );
1502 self.relayed_by.entry(id).or_default().push(*relayer);
1504
1505 let relay = message.is_node_announcement()
1510 || now - timestamp.to_local_time() <= MAX_TIME_DELTA;
1511 relay.then_some(id)
1512 }
1513 Ok(None) => {
1514 debug!(target: "service", "Ignoring stale announcement from {announcer} (t={timestamp})");
1518 return Ok(None);
1519 }
1520 Err(e) => {
1521 debug!(target: "service", "Failed to update gossip entry from {announcer}: {e}");
1522 return Ok(None);
1523 }
1524 };
1525
1526 match message {
1527 AnnouncementMessage::Inventory(message) => {
1529 self.emitter.emit(Event::InventoryAnnounced {
1530 nid: *announcer,
1531 inventory: message.inventory.to_vec(),
1532 timestamp: message.timestamp,
1533 });
1534 match self.sync_routing(
1535 message.inventory.iter().cloned(),
1536 *announcer,
1537 message.timestamp,
1538 ) {
1539 Ok(synced) => {
1540 if synced.is_empty() {
1541 trace!(target: "service", "No routes updated by inventory announcement from {announcer}");
1542 return Ok(None);
1543 }
1544 }
1545 Err(e) => {
1546 debug!(target: "service", "Failed to process inventory from {announcer}: {e}");
1547 return Ok(None);
1548 }
1549 }
1550 let mut missing = Vec::new();
1551 let nid = *self.nid();
1552
1553 if let Some(sess) = self.sessions.get_mut(announcer) {
1556 for id in message.inventory.as_slice() {
1557 if let Some(sub) = &mut sess.subscribe {
1561 sub.filter.insert(id);
1562 }
1563
1564 if self.policies.is_seeding(id).expect(
1567 "Service::handle_announcement: error accessing seeding configuration",
1568 ) {
1569 match self.db.routing().entry(id, &nid) {
1572 Ok(entry) => {
1573 if entry.is_none() {
1574 missing.push(*id);
1575 }
1576 }
1577 Err(e) => debug!(
1578 target: "service",
1579 "Error checking local inventory for {id}: {e}"
1580 ),
1581 }
1582 }
1583 }
1584 }
1585 self.rng.shuffle(&mut missing);
1590
1591 for rid in missing {
1592 debug!(target: "service", "Missing seeded inventory {rid}; initiating fetch..");
1593 self.fetch(rid, *announcer, vec![], self.fetch_config(), None);
1594 }
1595 return Ok(relay);
1596 }
1597 AnnouncementMessage::Refs(message) => {
1598 self.emitter.emit(Event::RefsAnnounced {
1599 nid: *announcer,
1600 rid: message.rid,
1601 refs: message.refs.to_vec(),
1602 timestamp: message.timestamp,
1603 });
1604 let Some(refs) = NonEmpty::from_vec(message.refs.to_vec()) else {
1606 debug!(target: "service", "Skipping fetch, no refs in announcement for {} (t={timestamp})", message.rid);
1607 return Ok(None);
1608 };
1609 self.seed_discovered(message.rid, *announcer, message.timestamp);
1612
1613 if let Some(refs) = refs.iter().find(|r| &r.remote == self.nid()) {
1615 debug!(
1616 target: "service",
1617 "Refs announcement of {announcer} for {} contains our own remote at {} (t={})",
1618 message.rid, refs.at, message.timestamp
1619 );
1620 match self.db.seeds_mut().synced(
1621 &message.rid,
1622 announcer,
1623 refs.at,
1624 message.timestamp,
1625 ) {
1626 Ok(updated) => {
1627 if updated {
1628 debug!(
1629 target: "service",
1630 "Updating sync status of {announcer} for {} to {}",
1631 message.rid, refs.at
1632 );
1633 self.emitter.emit(Event::RefsSynced {
1634 rid: message.rid,
1635 remote: *announcer,
1636 at: refs.at,
1637 });
1638 } else {
1639 debug!(
1640 target: "service",
1641 "Sync status of {announcer} was not updated for {}",
1642 message.rid,
1643 );
1644 }
1645 }
1646 Err(e) => {
1647 debug!(target: "service", "Failed to update sync status for {}: {e}", message.rid);
1648 }
1649 }
1650 }
1651 let repo_entry = self.policies.seed_policy(&message.rid).expect(
1652 "Service::handle_announcement: error accessing repo seeding configuration",
1653 );
1654 let SeedingPolicy::Allow { scope } = repo_entry.policy else {
1655 debug!(
1656 target: "service",
1657 "Ignoring refs announcement from {announcer}: repository {} isn't seeded (t={timestamp})",
1658 message.rid
1659 );
1660 return Ok(None);
1661 };
1662 let Some(remote) = self.sessions.get(announcer).cloned() else {
1675 trace!(
1676 target: "service",
1677 "Skipping fetch of {}, no sessions connected to {announcer}",
1678 message.rid
1679 );
1680 return Ok(relay);
1681 };
1682 self.fetch_refs_at(message.rid, remote.id, refs, scope, self.fetch_config());
1684
1685 return Ok(relay);
1686 }
1687 AnnouncementMessage::Node(
1688 ann @ NodeAnnouncement {
1689 features,
1690 addresses,
1691 ..
1692 },
1693 ) => {
1694 self.emitter.emit(Event::NodeAnnounced {
1695 nid: *announcer,
1696 alias: ann.alias.clone(),
1697 timestamp: ann.timestamp,
1698 features: *features,
1699 addresses: addresses.to_vec(),
1700 });
1701 if !features.has(Features::SEED) {
1704 return Ok(relay);
1705 }
1706
1707 match self.db.addresses_mut().insert(
1708 announcer,
1709 ann.version,
1710 ann.features,
1711 &ann.alias,
1712 ann.work(),
1713 &ann.agent,
1714 timestamp,
1715 addresses
1716 .iter()
1717 .filter(|a| a.is_routable() || relayer_addr.is_local())
1720 .map(|a| KnownAddress::new(a.clone(), address::Source::Peer)),
1721 ) {
1722 Ok(updated) => {
1723 if updated {
1725 debug!(
1726 target: "service",
1727 "Address store entry for node {announcer} updated at {timestamp}"
1728 );
1729 return Ok(relay);
1730 }
1731 }
1732 Err(err) => {
1733 warn!(target: "service", "Failed to process node announcement from {announcer}: {err}");
1735 }
1736 }
1737 }
1738 }
1739 Ok(None)
1740 }
1741
1742 pub fn handle_info(&mut self, remote: NodeId, info: &Info) -> Result<(), session::Error> {
1743 match info {
1744 Info::RefsAlreadySynced { rid, at } => {
1746 debug!(target: "service", "Refs already synced for {rid} by {remote}");
1747 self.emitter.emit(Event::RefsSynced {
1748 rid: *rid,
1749 remote,
1750 at: *at,
1751 });
1752 }
1753 }
1754
1755 Ok(())
1756 }
1757
1758 pub fn handle_message(
1759 &mut self,
1760 remote: &NodeId,
1761 message: Message,
1762 ) -> Result<(), session::Error> {
1763 let local = self.node_id();
1764 let relay = self.config.is_relay();
1765 let Some(peer) = self.sessions.get_mut(remote) else {
1766 debug!(target: "service", "Session not found for {remote}");
1767 return Ok(());
1768 };
1769 peer.last_active = self.clock;
1770
1771 let limit: RateLimit = match peer.link {
1772 Link::Outbound => self.config.limits.rate.outbound.into(),
1773 Link::Inbound => self.config.limits.rate.inbound.into(),
1774 };
1775 if self
1776 .limiter
1777 .limit(peer.addr.clone().into(), Some(remote), &limit, self.clock)
1778 {
1779 debug!(target: "service", "Rate limiting message from {remote} ({})", peer.addr);
1780 return Ok(());
1781 }
1782 message.log(log::Level::Debug, remote, Link::Inbound);
1783
1784 let connected = match &mut peer.state {
1785 session::State::Disconnected { .. } => {
1786 debug!(target: "service", "Ignoring message from disconnected peer {}", peer.id);
1787 return Ok(());
1788 }
1789 session::State::Attempted | session::State::Initial => {
1796 debug!(target: "service", "Received unexpected message from connecting peer {}", peer.id);
1797 debug!(target: "service", "Transitioning peer {} to 'connected' state", peer.id);
1798
1799 peer.to_connected(self.clock);
1800
1801 None
1802 }
1803 session::State::Connected {
1804 ping, latencies, ..
1805 } => Some((ping, latencies)),
1806 };
1807
1808 trace!(target: "service", "Received message {message:?} from {remote}");
1809
1810 match message {
1811 Message::Announcement(ann) => {
1813 let relayer = remote;
1814 let relayer_addr = peer.addr.clone();
1815
1816 if let Some(id) = self.handle_announcement(relayer, &relayer_addr, &ann)? {
1817 if self.config.is_relay() {
1818 if let AnnouncementMessage::Inventory(_) = ann.message {
1819 if let Err(e) = self
1820 .database_mut()
1821 .gossip_mut()
1822 .set_relay(id, gossip::RelayStatus::Relay)
1823 {
1824 warn!(target: "service", "Failed to set relay flag for message: {e}");
1825 return Ok(());
1826 }
1827 } else {
1828 self.relay(id, ann);
1829 }
1830 }
1831 }
1832 }
1833 Message::Subscribe(subscribe) => {
1834 match self
1836 .db
1837 .gossip()
1838 .filtered(&subscribe.filter, subscribe.since, subscribe.until)
1839 {
1840 Ok(anns) => {
1841 for ann in anns {
1842 let ann = match ann {
1843 Ok(a) => a,
1844 Err(e) => {
1845 debug!(target: "service", "Failed to read gossip message from store: {e}");
1846 continue;
1847 }
1848 };
1849 if ann.node == *remote {
1851 continue;
1852 }
1853 if relay || ann.node == local {
1855 self.outbox.write(peer, ann.into());
1856 }
1857 }
1858 }
1859 Err(e) => {
1860 warn!(target: "service", "Failed to query gossip messages from store: {e}");
1861 }
1862 }
1863 peer.subscribe = Some(subscribe);
1864 }
1865 Message::Info(info) => {
1866 self.handle_info(*remote, &info)?;
1867 }
1868 Message::Ping(Ping { ponglen, .. }) => {
1869 if ponglen > Ping::MAX_PONG_ZEROES {
1871 return Ok(());
1872 }
1873 self.outbox.write(
1874 peer,
1875 Message::Pong {
1876 zeroes: ZeroBytes::new(ponglen),
1877 },
1878 );
1879 }
1880 Message::Pong { zeroes } => {
1881 if let Some((ping, latencies)) = connected {
1882 if let session::PingState::AwaitingResponse {
1883 len: ponglen,
1884 since,
1885 } = *ping
1886 {
1887 if (ponglen as usize) == zeroes.len() {
1888 *ping = session::PingState::Ok;
1889 latencies.push_back(self.clock - since);
1891 if latencies.len() > MAX_LATENCIES {
1892 latencies.pop_front();
1893 }
1894 }
1895 }
1896 }
1897 }
1898 }
1899 Ok(())
1900 }
1901
1902 fn refs_status_of(
1904 &self,
1905 rid: RepoId,
1906 refs: NonEmpty<RefsAt>,
1907 scope: &policy::Scope,
1908 ) -> Result<RefsStatus, Error> {
1909 let mut refs = RefsStatus::new(rid, refs, self.db.refs())?;
1910 if refs.want.is_empty() {
1912 return Ok(refs);
1913 }
1914 let mut refs = match scope {
1916 policy::Scope::All => refs,
1917 policy::Scope::Followed => match self.policies.namespaces_for(&self.storage, &rid) {
1918 Ok(Namespaces::All) => refs,
1919 Ok(Namespaces::Followed(followed)) => {
1920 refs.want.retain(|r| followed.contains(&r.remote));
1921 refs
1922 }
1923 Err(e) => return Err(e.into()),
1924 },
1925 };
1926 refs.want.retain(|r| r.remote != self.node_id());
1928
1929 Ok(refs)
1930 }
1931
1932 fn seed_discovered(&mut self, rid: RepoId, nid: NodeId, time: Timestamp) {
1934 if let Ok(result) = self.db.routing_mut().add_inventory([&rid], nid, time) {
1935 if let &[(_, InsertResult::SeedAdded)] = result.as_slice() {
1936 self.emitter.emit(Event::SeedDiscovered { rid, nid });
1937 debug!(target: "service", "Routing table updated for {rid} with seed {nid}");
1938 }
1939 }
1940 }
1941
1942 fn initial(&mut self, _link: Link) -> Vec<Message> {
1944 let now = self.clock();
1945 let filter = self.filter();
1946
1947 let since = if let Some(last) = self.last_online_at {
1957 Timestamp::from(last - SUBSCRIBE_BACKLOG_DELTA)
1958 } else {
1959 (*now - INITIAL_SUBSCRIBE_BACKLOG_DELTA).into()
1960 };
1961 debug!(target: "service", "Subscribing to messages since timestamp {since}..");
1962
1963 vec![
1964 Message::node(self.node.clone(), &self.signer),
1965 Message::inventory(self.inventory.clone(), &self.signer),
1966 Message::subscribe(filter, since, Timestamp::MAX),
1967 ]
1968 }
1969
1970 fn is_online(&self) -> bool {
1972 self.sessions
1973 .connected()
1974 .filter(|(_, s)| s.addr.is_routable() && s.last_active >= self.clock - IDLE_INTERVAL)
1975 .count()
1976 > 0
1977 }
1978
1979 fn remove_inventory(&mut self, rid: &RepoId) -> Result<bool, Error> {
1981 let node = self.node_id();
1982 let now = self.timestamp();
1983
1984 let removed = self.db.routing_mut().remove_inventory(rid, &node)?;
1985 if removed {
1986 self.refresh_and_announce_inventory(now)?;
1987 }
1988 Ok(removed)
1989 }
1990
1991 fn add_inventory(&mut self, rid: RepoId) -> Result<bool, Error> {
1993 let node = self.node_id();
1994 let now = self.timestamp();
1995
1996 if !self.storage.contains(&rid)? {
1997 debug!(target: "service", "Attempt to add non-existing inventory {rid}: repository not found in storage");
1998 return Ok(false);
1999 }
2000 let updates = self.db.routing_mut().add_inventory([&rid], node, now)?;
2002 let updated = !updates.is_empty();
2003
2004 if updated {
2005 self.refresh_and_announce_inventory(now)?;
2006 }
2007 Ok(updated)
2008 }
2009
2010 fn refresh_and_announce_inventory(&mut self, time: Timestamp) -> Result<(), Error> {
2012 let inventory = self.inventory()?;
2013
2014 self.inventory = gossip::inventory(time, inventory);
2015 self.announce_inventory();
2016
2017 Ok(())
2018 }
2019
2020 fn inventory(&self) -> Result<HashSet<RepoId>, Error> {
2031 self.db
2032 .routing()
2033 .get_inventory(self.nid())
2034 .map_err(Error::from)
2035 }
2036
2037 fn sync_routing(
2041 &mut self,
2042 inventory: impl IntoIterator<Item = RepoId>,
2043 from: NodeId,
2044 timestamp: Timestamp,
2045 ) -> Result<SyncedRouting, Error> {
2046 let mut synced = SyncedRouting::default();
2047 let included = inventory.into_iter().collect::<BTreeSet<_>>();
2048 let mut events = Vec::new();
2049
2050 for (rid, result) in
2051 self.db
2052 .routing_mut()
2053 .add_inventory(included.iter(), from, timestamp)?
2054 {
2055 match result {
2056 InsertResult::SeedAdded => {
2057 debug!(target: "service", "Routing table updated for {rid} with seed {from}");
2058 events.push(Event::SeedDiscovered { rid, nid: from });
2059
2060 if self
2061 .policies
2062 .is_seeding(&rid)
2063 .expect("Service::process_inventory: error accessing seeding configuration")
2064 {
2065 }
2068 synced.added.push(rid);
2069 }
2070 InsertResult::TimeUpdated => {
2071 synced.updated.push(rid);
2072 }
2073 InsertResult::NotUpdated => {}
2074 }
2075 }
2076
2077 synced.removed.extend(
2078 self.db
2079 .routing()
2080 .get_inventory(&from)?
2081 .into_iter()
2082 .filter(|rid| !included.contains(rid)),
2083 );
2084 self.db
2085 .routing_mut()
2086 .remove_inventories(&synced.removed, &from)?;
2087 events.extend(
2088 synced
2089 .removed
2090 .iter()
2091 .map(|&rid| Event::SeedDropped { rid, nid: from }),
2092 );
2093
2094 self.emitter.emit_all(events);
2095
2096 Ok(synced)
2097 }
2098
2099 fn refs_announcement_for(
2101 &mut self,
2102 rid: RepoId,
2103 remotes: impl IntoIterator<Item = NodeId>,
2104 ) -> Result<(Announcement, Vec<RefsAt>), Error> {
2105 let repo = self.storage.repository(rid)?;
2106 let timestamp = self.timestamp();
2107 let mut refs = BoundedVec::<_, REF_REMOTE_LIMIT>::new();
2108
2109 for remote_id in remotes.into_iter() {
2110 let refs_at = RefsAt::new(&repo, remote_id).map_err(|err| {
2111 radicle::storage::Error::Refs(radicle::storage::refs::Error::Read(err))
2112 })?;
2113
2114 if refs.push(refs_at).is_err() {
2115 warn!(
2116 target: "service",
2117 "refs announcement limit ({REF_REMOTE_LIMIT}) exceeded, peers will see only some of your repository references",
2118 );
2119 break;
2120 }
2121 }
2122
2123 let msg = AnnouncementMessage::from(RefsAnnouncement {
2124 rid,
2125 refs: refs.clone(),
2126 timestamp,
2127 });
2128 Ok((msg.signed(&self.signer), refs.into()))
2129 }
2130
2131 fn announce_own_refs(
2133 &mut self,
2134 rid: RepoId,
2135 doc: Doc,
2136 namespaces: impl IntoIterator<Item = NodeId>,
2137 ) -> Result<(Vec<RefsAt>, Timestamp), Error> {
2138 let (refs, timestamp) = self.announce_refs(rid, doc, namespaces, true)?;
2139
2140 for r in refs.iter() {
2144 self.emitter.emit(Event::LocalRefsAnnounced {
2145 rid,
2146 refs: *r,
2147 timestamp,
2148 });
2149 if let Err(e) = self.database_mut().refs_mut().set(
2150 &rid,
2151 &r.remote,
2152 &SIGREFS_BRANCH,
2153 r.at,
2154 timestamp.to_local_time(),
2155 ) {
2156 warn!(
2157 target: "service",
2158 "Error updating refs database for `rad/sigrefs` of {} in {rid}: {e}",
2159 r.remote
2160 );
2161 }
2162 }
2163 Ok((refs, timestamp))
2164 }
2165
2166 fn announce_refs(
2168 &mut self,
2169 rid: RepoId,
2170 doc: Doc,
2171 remotes: impl IntoIterator<Item = NodeId>,
2172 own: bool,
2173 ) -> Result<(Vec<RefsAt>, Timestamp), Error> {
2174 let (ann, refs) = self.refs_announcement_for(rid, remotes)?;
2175 let timestamp = ann.timestamp();
2176 let peers = self.sessions.connected().map(|(_, p)| p);
2177
2178 for r in refs.iter().filter(|r| own || r.remote == ann.node) {
2181 info!(
2182 target: "service",
2183 "Announcing refs {rid}/{r} to peers (t={timestamp})..",
2184 );
2185 if let Err(e) = self.db.seeds_mut().synced(&rid, &ann.node, r.at, timestamp) {
2187 warn!(target: "service", "Failed to update sync status for local node: {e}");
2188 } else {
2189 debug!(target: "service", "Saved local sync status for {rid}..");
2190 }
2191 }
2192
2193 self.outbox.announce(
2194 ann,
2195 peers.filter(|p| {
2196 doc.is_visible_to(&p.id.into())
2198 }),
2199 self.db.gossip_mut(),
2200 );
2201 Ok((refs, timestamp))
2202 }
2203
2204 fn reconnect(&mut self, nid: NodeId, addr: Address) -> bool {
2205 if let Some(sess) = self.sessions.get_mut(&nid) {
2206 sess.to_initial();
2207 self.outbox.connect(nid, addr);
2208
2209 return true;
2210 }
2211 false
2212 }
2213
2214 fn connect(&mut self, nid: NodeId, addr: Address) -> Result<(), ConnectError> {
2215 debug!(target: "service", "Connecting to {nid} ({addr})..");
2216
2217 if nid == self.node_id() {
2218 return Err(ConnectError::SelfConnection);
2219 }
2220 if let Ok(true) = self.policies.is_blocked(&nid) {
2221 return Err(ConnectError::Blocked { nid });
2222 }
2223 if !self.is_supported_address(&addr) {
2224 return Err(ConnectError::UnsupportedAddress { nid, addr });
2225 }
2226 if self.sessions.contains_key(&nid) {
2227 return Err(ConnectError::SessionExists { nid });
2228 }
2229 if self.sessions.outbound().count() >= self.config.limits.connection.outbound.into() {
2230 return Err(ConnectError::LimitReached { nid, addr });
2231 }
2232 let persistent = self.config.is_persistent(&nid);
2233 let timestamp: Timestamp = self.clock.into();
2234
2235 if let Err(e) = self.db.addresses_mut().attempted(&nid, &addr, timestamp) {
2236 warn!(target: "service", "Failed to update address book with connection attempt: {e}");
2237 }
2238 self.sessions.insert(
2239 nid,
2240 Session::outbound(nid, addr.clone(), persistent, self.rng.clone()),
2241 );
2242 self.outbox.connect(nid, addr);
2243
2244 Ok(())
2245 }
2246
2247 fn seeds(&self, rid: &RepoId, namespaces: HashSet<PublicKey>) -> Result<Seeds, Error> {
2248 let mut seeds = Seeds::new(self.rng.clone());
2249
2250 if let Ok(repo) = self.storage.repository(*rid) {
2255 for namespace in namespaces.iter() {
2256 let Ok(local) = RefsAt::new(&repo, *namespace) else {
2257 continue;
2258 };
2259
2260 for seed in self.db.seeds().seeds_for(rid)? {
2261 let seed = seed?;
2262 let state = self.sessions.get(&seed.nid).map(|s| s.state.clone());
2263 let synced = if local.at == seed.synced_at.oid {
2264 SyncStatus::Synced { at: seed.synced_at }
2265 } else {
2266 let local = SyncedAt::new(local.at, &repo)?;
2267
2268 SyncStatus::OutOfSync {
2269 local,
2270 remote: seed.synced_at,
2271 }
2272 };
2273 seeds.insert(Seed::new(seed.nid, seed.addresses, state, Some(synced)));
2274 }
2275 }
2276 }
2277
2278 for nid in self.db.routing().get(rid)? {
2282 if namespaces.contains(&nid) {
2283 continue;
2284 }
2285 if seeds.contains(&nid) {
2286 continue;
2288 }
2289 let addrs = self.db.addresses().addresses_of(&nid)?;
2290 let state = self.sessions.get(&nid).map(|s| s.state.clone());
2291
2292 seeds.insert(Seed::new(nid, addrs, state, None));
2293 }
2294 Ok(seeds)
2295 }
2296
2297 fn filter(&self) -> Filter {
2299 if self.config.seeding_policy.is_allow() {
2300 Filter::default()
2302 } else {
2303 self.filter.clone()
2304 }
2305 }
2306
2307 fn timestamp(&mut self) -> Timestamp {
2310 let now = Timestamp::from(self.clock);
2311 if *now > *self.last_timestamp {
2312 self.last_timestamp = now;
2313 } else {
2314 self.last_timestamp = self.last_timestamp + 1;
2315 }
2316 self.last_timestamp
2317 }
2318
2319 fn relay(&mut self, id: gossip::AnnouncementId, ann: Announcement) {
2320 let announcer = ann.node;
2321 let relayed_by = self.relayed_by.get(&id);
2322 let rid = if let AnnouncementMessage::Refs(RefsAnnouncement { rid, .. }) = ann.message {
2323 Some(rid)
2324 } else {
2325 None
2326 };
2327 let relay_to = self
2331 .sessions
2332 .connected()
2333 .filter(|(id, _)| {
2334 relayed_by
2335 .map(|relayers| !relayers.contains(id))
2336 .unwrap_or(true) })
2338 .filter(|(id, _)| **id != announcer)
2339 .filter(|(id, _)| {
2340 if let Some(rid) = rid {
2341 self.storage
2345 .get(rid)
2346 .ok()
2347 .flatten()
2348 .map(|doc| doc.is_visible_to(&(*id).into()))
2349 .unwrap_or(false)
2350 } else {
2351 true
2353 }
2354 })
2355 .map(|(_, p)| p);
2356
2357 self.outbox.relay(ann, relay_to);
2358 }
2359
2360 fn relay_announcements(&mut self) -> Result<(), Error> {
2365 let now = self.clock.into();
2366 let rows = self.database_mut().gossip_mut().relays(now)?;
2367 let local = self.node_id();
2368
2369 for (id, msg) in rows {
2370 let announcer = msg.node;
2371 if announcer == local {
2372 continue;
2374 }
2375 self.relay(id, msg);
2376 }
2377 Ok(())
2378 }
2379
2380 fn announce_inventory(&mut self) {
2382 let timestamp = self.inventory.timestamp.to_local_time();
2383
2384 if self.last_inventory == timestamp {
2385 debug!(target: "service", "Skipping redundant inventory announcement (t={})", self.inventory.timestamp);
2386 return;
2387 }
2388 let msg = AnnouncementMessage::from(self.inventory.clone());
2389
2390 self.outbox.announce(
2391 msg.signed(&self.signer),
2392 self.sessions.connected().map(|(_, p)| p),
2393 self.db.gossip_mut(),
2394 );
2395 self.last_inventory = timestamp;
2396 }
2397
2398 fn prune_routing_entries(&mut self, now: &LocalTime) -> Result<(), routing::Error> {
2399 let count = self.db.routing().len()?;
2400 if count <= self.config.limits.routing_max_size.into() {
2401 return Ok(());
2402 }
2403
2404 let delta = count - usize::from(self.config.limits.routing_max_size);
2405 let nid = self.node_id();
2406 self.db.routing_mut().prune(
2407 (*now - LocalDuration::from(self.config.limits.routing_max_age)).into(),
2408 Some(delta),
2409 &nid,
2410 )?;
2411 Ok(())
2412 }
2413
2414 fn disconnect_unresponsive_peers(&mut self, now: &LocalTime) {
2415 let stale = self
2416 .sessions
2417 .connected()
2418 .filter(|(_, session)| *now - session.last_active >= STALE_CONNECTION_TIMEOUT);
2419
2420 for (_, session) in stale {
2421 debug!(target: "service", "Disconnecting unresponsive peer {}..", session.id);
2422
2423 self.outbox.disconnect(
2427 session.id,
2428 DisconnectReason::Session(session::Error::Timeout),
2429 );
2430 }
2431 }
2432
2433 fn keep_alive(&mut self, now: &LocalTime) {
2435 let inactive_sessions = self
2436 .sessions
2437 .connected_mut()
2438 .filter(|(_, session)| *now - session.last_active >= KEEP_ALIVE_DELTA)
2439 .map(|(_, session)| session);
2440 for session in inactive_sessions {
2441 session.ping(self.clock, &mut self.outbox).ok();
2442 }
2443 }
2444
2445 fn available_peers(&mut self) -> Vec<Peer> {
2447 match self.db.addresses().entries() {
2448 Ok(entries) => {
2449 let mut peers = entries
2452 .filter(|entry| entry.version == PROTOCOL_VERSION)
2453 .filter(|entry| !entry.address.banned)
2454 .filter(|entry| !entry.penalty.is_connect_threshold_reached())
2455 .filter(|entry| !self.sessions.contains_key(&entry.node))
2456 .filter(|entry| !self.policies.is_blocked(&entry.node).unwrap_or(false))
2457 .filter(|entry| !self.config.external_addresses.contains(&entry.address.addr))
2458 .filter(|entry| &entry.node != self.nid())
2459 .filter(|entry| self.is_supported_address(&entry.address.addr))
2460 .fold(HashMap::new(), |mut acc, entry| {
2461 acc.entry(entry.node)
2462 .and_modify(|e: &mut Peer| e.addresses.push(entry.address.clone()))
2463 .or_insert_with(|| Peer {
2464 nid: entry.node,
2465 addresses: vec![entry.address],
2466 penalty: entry.penalty,
2467 });
2468 acc
2469 })
2470 .into_values()
2471 .collect::<Vec<_>>();
2472 peers.sort_by_key(|p| p.penalty);
2473 peers
2474 }
2475 Err(e) => {
2476 warn!(target: "service", "Unable to lookup available peers in address book: {e}");
2477 Vec::new()
2478 }
2479 }
2480 }
2481
2482 fn fetch_missing_repositories(&mut self) -> Result<(), Error> {
2484 let policies = self.policies.seed_policies()?.collect::<Vec<_>>();
2485 for policy in policies {
2486 let policy = match policy {
2487 Ok(policy) => policy,
2488 Err(err) => {
2489 debug!(target: "protocol::filter", "Failed to read seed policy: {err}");
2490 continue;
2491 }
2492 };
2493
2494 let rid = policy.rid;
2495
2496 if !policy.is_allow() {
2497 continue;
2498 }
2499 match self.storage.contains(&rid) {
2500 Ok(exists) => {
2501 if exists {
2502 continue;
2503 }
2504 }
2505 Err(err) => {
2506 log::debug!(target: "protocol::filter", "Failed to check if {rid} exists: {err}");
2507 continue;
2508 }
2509 }
2510 match self.seeds(&rid, [self.node_id()].into()) {
2511 Ok(seeds) => {
2512 if let Some(connected) = NonEmpty::from_vec(seeds.connected().collect()) {
2513 for seed in connected {
2514 self.fetch(rid, seed.nid, vec![], self.fetch_config(), None);
2515 }
2516 } else {
2517 debug!(target: "service", "No connected seeds found for {rid}..");
2527 }
2528 }
2529 Err(e) => {
2530 debug!(target: "service", "Couldn't fetch missing repo {rid}: failed to lookup seeds: {e}");
2531 }
2532 }
2533 }
2534 Ok(())
2535 }
2536
2537 fn idle_connections(&mut self) {
2539 for (_, sess) in self.sessions.iter_mut() {
2540 sess.idle(self.clock);
2541
2542 if sess.is_stable() {
2543 if let Err(e) =
2545 self.db
2546 .addresses_mut()
2547 .connected(&sess.id, &sess.addr, self.clock.into())
2548 {
2549 warn!(target: "service", "Failed to update address book with connection: {e}");
2550 }
2551 }
2552 }
2553 }
2554
2555 fn maintain_connections(&mut self) {
2557 let PeerConfig::Dynamic = self.config.peers else {
2558 return;
2559 };
2560 trace!(target: "service", "Maintaining connections..");
2561
2562 let target = TARGET_OUTBOUND_PEERS;
2563 let now = self.clock;
2564 let outbound = self
2565 .sessions
2566 .values()
2567 .filter(|s| s.link.is_outbound())
2568 .filter(|s| s.is_connected() || s.is_connecting())
2569 .count();
2570 let wanted = target.saturating_sub(outbound);
2571
2572 if wanted == 0 {
2574 return;
2575 }
2576
2577 let available = self
2579 .available_peers()
2580 .into_iter()
2581 .filter_map(|peer| {
2582 peer.addresses
2583 .into_iter()
2584 .find(|ka| match (ka.last_success, ka.last_attempt) {
2585 (Some(success), Some(attempt)) => {
2588 success >= attempt || now - attempt >= CONNECTION_RETRY_DELTA
2589 }
2590 (None, Some(attempt)) => now - attempt >= CONNECTION_RETRY_DELTA,
2592 (_, None) => true,
2594 })
2595 .map(|ka| (peer.nid, ka))
2596 })
2597 .filter(|(_, ka)| self.is_supported_address(&ka.addr));
2598
2599 let connect = available.take(wanted).collect::<Vec<_>>();
2601 if connect.len() < wanted {
2602 log::debug!(
2603 target: "service",
2604 "Not enough available peers to connect to (available={}, wanted={wanted})",
2605 connect.len()
2606 );
2607 }
2608 for (id, ka) in connect {
2609 if let Err(e) = self.connect(id, ka.addr.clone()) {
2610 warn!(target: "service", "Service::maintain_connections connection error: {e}");
2611 }
2612 }
2613 }
2614
2615 fn maintain_persistent(&mut self) {
2617 trace!(target: "service", "Maintaining persistent peers..");
2618
2619 let now = self.local_time();
2620 let mut reconnect = Vec::new();
2621
2622 for (nid, session) in self.sessions.iter_mut() {
2623 if self.config.is_persistent(nid) {
2624 if self.policies.is_blocked(nid).unwrap_or(false) {
2625 continue;
2626 }
2627 if let session::State::Disconnected { retry_at, .. } = &mut session.state {
2628 if now >= *retry_at {
2632 reconnect.push((*nid, session.addr.clone(), session.attempts()));
2633 }
2634 }
2635 }
2636 }
2637
2638 for (nid, addr, attempts) in reconnect {
2639 if self.reconnect(nid, addr) {
2640 debug!(target: "service", "Reconnecting to {nid} (attempts={attempts})...");
2641 }
2642 }
2643 }
2644
2645 fn is_supported_address(&self, address: &Address) -> bool {
2656 match AddressType::from(address) {
2657 AddressType::Onion => self.config.onion.is_some(),
2659 AddressType::Dns | AddressType::Ipv4 | AddressType::Ipv6 => true,
2660 }
2661 }
2662
2663 fn fetch_config(&self) -> fetcher::FetchConfig {
2664 fetcher::FetchConfig::default()
2665 .with_minimum_feature_level(self.config.fetch.feature_level_min())
2666 }
2667}
2668
2669pub trait ServiceState {
2671 fn nid(&self) -> &NodeId;
2673 fn sessions(&self) -> &Sessions;
2675 fn fetching(&self) -> &FetcherState;
2677 fn outbox(&self) -> &Outbox;
2679 fn limiter(&self) -> &RateLimiter;
2681 fn emitter(&self) -> &Emitter<Event>;
2683 fn get(&self, rid: RepoId) -> Result<Option<Doc>, RepositoryError>;
2685 fn clock(&self) -> &LocalTime;
2687 fn clock_mut(&mut self) -> &mut LocalTime;
2689 fn config(&self) -> &Config;
2691 fn metrics(&self) -> &Metrics;
2693}
2694
2695impl<D, S, G> ServiceState for Service<D, S, G>
2696where
2697 D: routing::Store,
2698 G: crypto::signature::Signer<crypto::Signature>,
2699 S: ReadStorage,
2700{
2701 fn nid(&self) -> &NodeId {
2702 self.signer.public_key()
2703 }
2704
2705 fn sessions(&self) -> &Sessions {
2706 &self.sessions
2707 }
2708
2709 fn fetching(&self) -> &FetcherState {
2710 self.fetcher.state()
2711 }
2712
2713 fn outbox(&self) -> &Outbox {
2714 &self.outbox
2715 }
2716
2717 fn limiter(&self) -> &RateLimiter {
2718 &self.limiter
2719 }
2720
2721 fn emitter(&self) -> &Emitter<Event> {
2722 &self.emitter
2723 }
2724
2725 fn get(&self, rid: RepoId) -> Result<Option<Doc>, RepositoryError> {
2726 self.storage.get(rid)
2727 }
2728
2729 fn clock(&self) -> &LocalTime {
2730 &self.clock
2731 }
2732
2733 fn clock_mut(&mut self) -> &mut LocalTime {
2734 &mut self.clock
2735 }
2736
2737 fn config(&self) -> &Config {
2738 &self.config
2739 }
2740
2741 fn metrics(&self) -> &Metrics {
2742 &self.metrics
2743 }
2744}
2745
2746#[derive(Debug)]
2748pub enum DisconnectReason {
2749 Dial(Arc<dyn std::error::Error + Sync + Send>),
2752 Connection(Arc<dyn std::error::Error + Sync + Send>),
2755 Fetch(FetchError),
2757 Session(session::Error),
2759 Conflict,
2761 SelfConnection,
2763 Policy,
2765 Command,
2767}
2768
2769impl DisconnectReason {
2770 pub fn is_dial_err(&self) -> bool {
2771 matches!(self, Self::Dial(_))
2772 }
2773
2774 pub fn is_connection_err(&self) -> bool {
2775 matches!(self, Self::Connection(_))
2776 }
2777
2778 pub fn connection() -> Self {
2779 DisconnectReason::Connection(Arc::new(std::io::Error::from(
2780 std::io::ErrorKind::ConnectionReset,
2781 )))
2782 }
2783}
2784
2785impl fmt::Display for DisconnectReason {
2786 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2787 match self {
2788 Self::Dial(err) => write!(f, "{err}"),
2789 Self::Connection(err) => write!(f, "{err}"),
2790 Self::Command => write!(f, "command"),
2791 Self::SelfConnection => write!(f, "self-connection"),
2792 Self::Conflict => write!(f, "conflict"),
2793 Self::Policy => write!(f, "policy"),
2794 Self::Session(err) => write!(f, "{err}"),
2795 Self::Fetch(err) => write!(f, "fetch: {err}"),
2796 }
2797 }
2798}
2799
2800#[derive(Debug)]
2802pub struct Lookup {
2803 pub local: Option<Doc>,
2805 pub remote: Vec<NodeId>,
2807}
2808
2809#[derive(thiserror::Error, Debug)]
2810pub enum LookupError {
2811 #[error(transparent)]
2812 Routing(#[from] routing::Error),
2813 #[error(transparent)]
2814 Repository(#[from] RepositoryError),
2815}
2816
2817#[derive(Debug, Clone)]
2818pub struct Sessions(AddressBook<NodeId, Session>);
2820
2821impl Sessions {
2822 pub fn new(rng: Rng) -> Self {
2823 Self(AddressBook::new(rng))
2824 }
2825
2826 pub fn connected(&self) -> impl Iterator<Item = (&NodeId, &Session)> + Clone {
2828 self.0
2829 .iter()
2830 .filter_map(move |(id, sess)| match &sess.state {
2831 session::State::Connected { .. } => Some((id, sess)),
2832 _ => None,
2833 })
2834 }
2835
2836 pub fn inbound(&self) -> impl Iterator<Item = (&NodeId, &Session)> + Clone {
2838 self.connected().filter(|(_, s)| s.link.is_inbound())
2839 }
2840
2841 pub fn outbound(&self) -> impl Iterator<Item = (&NodeId, &Session)> + Clone {
2843 self.connected().filter(|(_, s)| s.link.is_outbound())
2844 }
2845
2846 pub fn connected_mut(&mut self) -> impl Iterator<Item = (&NodeId, &mut Session)> {
2848 self.0.iter_mut().filter(move |(_, s)| s.is_connected())
2849 }
2850
2851 pub fn disconnected_mut(&mut self) -> impl Iterator<Item = (&NodeId, &mut Session)> {
2853 self.0.iter_mut().filter(move |(_, s)| s.is_disconnected())
2854 }
2855
2856 pub fn is_connected(&self, id: &NodeId) -> bool {
2858 self.0.get(id).map(|s| s.is_connected()).unwrap_or(false)
2859 }
2860
2861 pub fn is_disconnected(&self, id: &NodeId) -> bool {
2863 self.0.get(id).map(|s| s.is_disconnected()).unwrap_or(true)
2864 }
2865}
2866
2867impl Deref for Sessions {
2868 type Target = AddressBook<NodeId, Session>;
2869
2870 fn deref(&self) -> &Self::Target {
2871 &self.0
2872 }
2873}
2874
2875impl DerefMut for Sessions {
2876 fn deref_mut(&mut self) -> &mut Self::Target {
2877 &mut self.0
2878 }
2879}