1#![allow(clippy::too_many_arguments)]
2#![deny(clippy::unwrap_used)]
3pub mod command;
4pub use command::{Command, QueryState};
5
6pub mod filter;
7pub mod gossip;
8pub mod io;
9pub mod limiter;
10pub mod message;
11pub mod session;
12
13use std::collections::hash_map::Entry;
14use std::collections::{BTreeSet, HashMap, HashSet};
15use std::net::IpAddr;
16use std::ops::{Deref, DerefMut};
17use std::sync::Arc;
18use std::{fmt, net, time};
19
20use crossbeam_channel as chan;
21use fastrand::Rng;
22use localtime::{LocalDuration, LocalTime};
23use log::*;
24use nonempty::NonEmpty;
25
26use radicle::identity::Doc;
27use radicle::node;
28use radicle::node::address;
29use radicle::node::address::Store as _;
30use radicle::node::address::{AddressBook, AddressType, KnownAddress};
31use radicle::node::config::{PeerConfig, RateLimit};
32use radicle::node::device::Device;
33use radicle::node::refs::Store as _;
34use radicle::node::routing::Store as _;
35use radicle::node::seed;
36use radicle::node::seed::Store as _;
37use radicle::node::{Penalty, Severity};
38use radicle::storage::refs::{FeatureLevel, SIGREFS_BRANCH};
39use radicle::storage::{RepositoryError, RepositoryInfo, SignedRefsInfo};
40use radicle_fetch::policy::SeedingPolicy;
41
42use crate::fetcher;
43use crate::fetcher::FetcherState;
44use crate::fetcher::RefsToFetch;
45use crate::fetcher::service::FetcherService;
46use crate::service::gossip::Store as _;
47use crate::service::message::{
48 Announcement, AnnouncementMessage, Info, NodeAnnouncement, Ping, RefsAnnouncement, RefsStatus,
49};
50use crate::service::policy::{Scope, store::Write};
51use radicle::identity::RepoId;
52use radicle::node::events::Emitter;
53use radicle::node::routing;
54use radicle::node::routing::InsertResult;
55use radicle::node::{Address, Features, FetchResult, HostName, Seed, Seeds, SyncStatus, SyncedAt};
56use radicle::prelude::*;
57use radicle::storage;
58use radicle::storage::{Namespaces, ReadStorage, refs::RefsAt};
59use radicle::crypto;
62use radicle::node::Link;
63use radicle::node::PROTOCOL_VERSION;
64
65use crate::bounded::BoundedVec;
66use crate::service::filter::Filter;
67pub use crate::service::message::{Message, ZeroBytes};
68pub use crate::service::session::{QueuedFetch, Session};
69use crate::worker::FetchError;
70use radicle::node::events::{Event, Events};
71use radicle::node::{Config, NodeId};
72
73use radicle::node::policy::config as policy;
74
75use self::io::Outbox;
76use self::limiter::RateLimiter;
77use self::message::InventoryAnnouncement;
78use self::policy::NamespacesError;
79
80pub const IDLE_INTERVAL: LocalDuration = LocalDuration::from_secs(30);
82pub const GOSSIP_INTERVAL: LocalDuration = LocalDuration::from_secs(6);
84pub const ANNOUNCE_INTERVAL: LocalDuration = LocalDuration::from_mins(60);
86pub const SYNC_INTERVAL: LocalDuration = LocalDuration::from_secs(60);
88pub const PRUNE_INTERVAL: LocalDuration = LocalDuration::from_mins(30);
90pub const STALE_CONNECTION_TIMEOUT: LocalDuration = LocalDuration::from_mins(2);
92pub const KEEP_ALIVE_DELTA: LocalDuration = LocalDuration::from_mins(1);
94pub const MAX_LATENCIES: usize = 16;
96pub const MAX_TIME_DELTA: LocalDuration = LocalDuration::from_mins(60);
98pub const MAX_CONNECTION_ATTEMPTS: usize = 3;
100pub const INITIAL_SUBSCRIBE_BACKLOG_DELTA: LocalDuration = LocalDuration::from_mins(60 * 24);
103pub const SUBSCRIBE_BACKLOG_DELTA: LocalDuration = LocalDuration::from_mins(3);
106pub const MIN_RECONNECTION_DELTA: LocalDuration = LocalDuration::from_secs(3);
108pub const MAX_RECONNECTION_DELTA: LocalDuration = LocalDuration::from_mins(60);
110pub const CONNECTION_RETRY_DELTA: LocalDuration = LocalDuration::from_mins(10);
112pub const FETCH_TIMEOUT: time::Duration = time::Duration::from_secs(30);
114pub const TARGET_OUTBOUND_PEERS: usize = 8;
116
117pub use message::ADDRESS_LIMIT;
119pub use message::INVENTORY_LIMIT;
121pub use message::REF_REMOTE_LIMIT;
123
124#[derive(Clone, Debug, Default, serde::Serialize)]
126#[serde(rename_all = "camelCase")]
127pub struct Metrics {
128 pub peers: HashMap<NodeId, PeerMetrics>,
130 pub worker_queue_size: usize,
132 pub open_channels: usize,
134}
135
136impl Metrics {
137 pub fn peer(&mut self, nid: NodeId) -> &mut PeerMetrics {
139 self.peers.entry(nid).or_default()
140 }
141}
142
143#[derive(Clone, Debug, Default, serde::Serialize)]
145#[serde(rename_all = "camelCase")]
146pub struct PeerMetrics {
147 pub received_git_bytes: usize,
148 pub received_fetch_requests: usize,
149 pub received_bytes: usize,
150 pub received_gossip_messages: usize,
151 pub sent_bytes: usize,
152 pub sent_fetch_requests: usize,
153 pub sent_git_bytes: usize,
154 pub sent_gossip_messages: usize,
155 pub streams_opened: usize,
156 pub inbound_connection_attempts: usize,
157 pub outbound_connection_attempts: usize,
158 pub disconnects: usize,
159}
160
161#[derive(Default)]
163struct SyncedRouting {
164 added: Vec<RepoId>,
166 removed: Vec<RepoId>,
168 updated: Vec<RepoId>,
170}
171
172impl SyncedRouting {
173 fn is_empty(&self) -> bool {
174 self.added.is_empty() && self.removed.is_empty() && self.updated.is_empty()
175 }
176}
177
178#[derive(Debug, Clone)]
180struct Peer {
181 nid: NodeId,
182 addresses: Vec<KnownAddress>,
183 penalty: Penalty,
184}
185
186#[derive(thiserror::Error, Debug)]
188pub enum Error {
189 #[error(transparent)]
190 Git(#[from] radicle::git::raw::Error),
191 #[error(transparent)]
192 Storage(#[from] storage::Error),
193 #[error(transparent)]
194 Gossip(#[from] gossip::Error),
195 #[error(transparent)]
196 Refs(#[from] storage::refs::Error),
197 #[error(transparent)]
198 Routing(#[from] routing::Error),
199 #[error(transparent)]
200 Address(#[from] address::Error),
201 #[error(transparent)]
202 Database(#[from] node::db::Error),
203 #[error(transparent)]
204 Seeds(#[from] seed::Error),
205 #[error(transparent)]
206 Policy(#[from] policy::Error),
207 #[error(transparent)]
208 Repository(#[from] radicle::storage::RepositoryError),
209 #[error("namespaces error: {0}")]
210 Namespaces(Box<NamespacesError>),
211}
212
213impl From<NamespacesError> for Error {
214 fn from(e: NamespacesError) -> Self {
215 Self::Namespaces(Box::new(e))
216 }
217}
218
219#[derive(thiserror::Error, Debug)]
220pub enum ConnectError {
221 #[error("attempted connection to peer {nid} which already has a session")]
222 SessionExists { nid: NodeId },
223 #[error("attempted connection to self")]
224 SelfConnection,
225 #[error("outbound connection limit reached when attempting {nid} ({addr})")]
226 LimitReached { nid: NodeId, addr: Address },
227 #[error(
228 "attempted connection to {nid}, via {addr} but addresses of this kind are not supported"
229 )]
230 UnsupportedAddress { nid: NodeId, addr: Address },
231 #[error("attempted connection with blocked peer {nid}")]
232 Blocked { nid: NodeId },
233}
234
235pub trait Store:
237 address::Store + gossip::Store + routing::Store + seed::Store + node::refs::Store
238{
239}
240
241impl Store for radicle::node::Database {}
242
243#[derive(Debug)]
245pub struct FetchState {
246 pub from: NodeId,
248 pub refs_at: Vec<RefsAt>,
250 pub subscribers: Vec<chan::Sender<FetchResult>>,
252}
253
254#[derive(Debug)]
256pub struct Stores<D>(D);
257
258impl<D> Stores<D>
259where
260 D: Store,
261{
262 pub fn routing(&self) -> &impl routing::Store {
264 &self.0
265 }
266
267 pub fn routing_mut(&mut self) -> &mut impl routing::Store {
269 &mut self.0
270 }
271
272 pub fn addresses(&self) -> &impl address::Store {
274 &self.0
275 }
276
277 pub fn addresses_mut(&mut self) -> &mut impl address::Store {
279 &mut self.0
280 }
281
282 pub fn gossip(&self) -> &impl gossip::Store {
284 &self.0
285 }
286
287 pub fn gossip_mut(&mut self) -> &mut impl gossip::Store {
289 &mut self.0
290 }
291
292 pub fn seeds(&self) -> &impl seed::Store {
294 &self.0
295 }
296
297 pub fn seeds_mut(&mut self) -> &mut impl seed::Store {
299 &mut self.0
300 }
301
302 pub fn refs(&self) -> &impl node::refs::Store {
304 &self.0
305 }
306
307 pub fn refs_mut(&mut self) -> &mut impl node::refs::Store {
309 &mut self.0
310 }
311}
312
313impl<D> AsMut<D> for Stores<D> {
314 fn as_mut(&mut self) -> &mut D {
315 &mut self.0
316 }
317}
318
319impl<D> From<D> for Stores<D> {
320 fn from(db: D) -> Self {
321 Self(db)
322 }
323}
324
325#[derive(Debug)]
327pub struct Service<D, S, G> {
328 config: Config,
330 signer: Device<G>,
332 storage: S,
334 db: Stores<D>,
336 policies: policy::Config<Write>,
338 sessions: Sessions,
340 clock: LocalTime,
342 relayed_by: HashMap<gossip::AnnouncementId, Vec<NodeId>>,
345 outbox: Outbox,
347 node: NodeAnnouncement,
349 inventory: InventoryAnnouncement,
351 rng: Rng,
353 fetcher: FetcherService<command::Responder<FetchResult>>,
354 limiter: RateLimiter,
356 filter: Filter,
358 last_idle: LocalTime,
360 last_gossip: LocalTime,
362 last_sync: LocalTime,
364 last_prune: LocalTime,
366 last_announce: LocalTime,
368 last_inventory: LocalTime,
370 last_timestamp: Timestamp,
372 started_at: Option<LocalTime>,
374 last_online_at: Option<LocalTime>,
376 emitter: Emitter<Event>,
378 listening: Vec<net::SocketAddr>,
380 metrics: Metrics,
382}
383
384impl<D, S, G> Service<D, S, G> {
385 pub fn node_id(&self) -> NodeId {
387 *self.signer.public_key()
388 }
389
390 pub fn local_time(&self) -> LocalTime {
392 self.clock
393 }
394
395 pub fn emitter(&self) -> Emitter<Event> {
396 self.emitter.clone()
397 }
398}
399
400impl<D, S, G> Service<D, S, G>
401where
402 D: Store,
403 S: WriteStorage + 'static,
404 G: crypto::signature::Signer<crypto::Signature>,
405{
406 pub fn initialize(&mut self, time: LocalTime) -> Result<(), Error> {
408 debug!(target: "service", "Init @{}", time.as_millis());
409 assert_ne!(time, LocalTime::default());
410
411 let nid = self.node_id();
412
413 self.clock = time;
414 self.started_at = Some(time);
415 self.last_online_at = match self.db.gossip().last() {
416 Ok(Some(last)) => Some(last.to_local_time()),
417 Ok(None) => None,
418 Err(e) => {
419 warn!(target: "service", "Failed to get the latest gossip message from db: {e}");
420 None
421 }
422 };
423
424 match self.db.refs().count() {
427 Ok(0) => {
428 info!(target: "service", "Empty refs database, populating from storage..");
429 if let Err(e) = self.db.refs_mut().populate(&self.storage) {
430 warn!(target: "service", "Failed to populate refs database: {e}");
431 }
432 }
433 Ok(n) => debug!(target: "service", "Refs database has {n} cached references"),
434 Err(e) => {
435 warn!(target: "service", "Failed to retrieve count of refs from database: {e}")
436 }
437 }
438
439 let announced = self
440 .db
441 .seeds()
442 .seeded_by(&nid)?
443 .collect::<Result<HashMap<_, _>, _>>()?;
444 let mut inventory = BTreeSet::new();
445 let mut private = BTreeSet::new();
446
447 for repo in self.storage.repositories()? {
448 let repo = self.upgrade_sigrefs(repo)?;
449 let rid = repo.rid;
450
451 if !self.policies.is_seeding(&rid)? {
453 debug!(target: "service", "Local repository {rid} is not seeded");
454 continue;
455 }
456 if repo.doc.is_public() {
458 inventory.insert(rid);
459 } else {
460 private.insert(rid);
461 }
462 let Some(updated_at) = repo.synced_at else {
464 continue;
465 };
466 if let Some(announced) = announced.get(&rid) {
468 if updated_at.oid == announced.oid {
469 continue;
470 }
471 }
472 if self.db.seeds_mut().synced(
474 &rid,
475 &nid,
476 updated_at.oid,
477 updated_at.timestamp.into(),
478 )? {
479 debug!(target: "service", "Saved local sync status for {rid}..");
480 }
481 if let Ok((ann, _)) = self.refs_announcement_for(rid, [nid]) {
485 debug!(target: "service", "Adding refs announcement for {rid} to historical gossip messages..");
486 self.db.gossip_mut().announced(&nid, &ann)?;
487 }
488 }
489
490 self.db
494 .routing_mut()
495 .add_inventory(inventory.iter(), nid, time.into())?;
496 self.inventory = gossip::inventory(self.timestamp(), inventory);
497
498 self.db
501 .routing_mut()
502 .remove_inventories(private.iter(), &nid)?;
503
504 self.filter = Filter::allowed_by(self.policies.seed_policies()?);
506 let addrs = self.config.connect.clone();
508 for (id, addr) in addrs.into_iter().map(|ca| ca.into()) {
509 if let Err(e) = self.connect(id, addr) {
510 debug!(target: "service", "Service::initialization connection error: {e}");
511 }
512 }
513 self.maintain_connections();
515 self.outbox.wakeup(IDLE_INTERVAL);
517 self.outbox.wakeup(GOSSIP_INTERVAL);
518
519 Ok(())
520 }
521
522 fn upgrade_sigrefs(&mut self, mut info: RepositoryInfo) -> Result<RepositoryInfo, Error> {
523 if !matches!(info.refs, SignedRefsInfo::NeedsMigration) {
524 return Ok(info);
525 }
526
527 let rid = info.rid;
528
529 log::info!(
530 "Migrating `rad/sigrefs` of {rid} to force feature level {}.",
531 FeatureLevel::LATEST
532 );
533
534 let repo = self.storage.repository_mut(rid)?;
535 let refs = repo.force_sign_refs(&self.signer)?;
537
538 let repo = self.storage.repository(rid)?;
539 let synced_at = SyncedAt::new(refs.at, &repo)?;
540
541 info.synced_at = Some(synced_at);
542 info.refs = SignedRefsInfo::Some(refs);
543
544 Ok(info)
545 }
546}
547
548impl<D, S, G> Service<D, S, G>
549where
550 D: Store,
551 S: ReadStorage + 'static,
552 G: crypto::signature::Signer<crypto::Signature>,
553{
554 pub fn new(
555 config: Config,
556 db: Stores<D>,
557 storage: S,
558 policies: policy::Config<Write>,
559 signer: Device<G>,
560 rng: Rng,
561 node: NodeAnnouncement,
562 emitter: Emitter<Event>,
563 ) -> Self {
564 let sessions = Sessions::new(rng.clone());
565 let limiter = RateLimiter::new(config.peers());
566 let last_timestamp = node.timestamp;
567 let clock = LocalTime::default(); let inventory = gossip::inventory(clock.into(), []); let fetcher = {
570 let config = fetcher::Config::new()
571 .with_max_concurrency(
572 std::num::NonZeroUsize::new(config.limits.fetch_concurrency.into())
573 .expect("fetch concurrency was zero, must be at least 1"),
574 )
575 .with_max_capacity(fetcher::MaxQueueSize::default());
576 FetcherService::new(config)
577 };
578 Self {
579 config,
580 storage,
581 policies,
582 signer,
583 rng,
584 inventory,
585 node,
586 clock,
587 db,
588 outbox: Outbox::default(),
589 limiter,
590 sessions,
591 fetcher,
592 filter: Filter::empty(),
593 relayed_by: HashMap::default(),
594 last_idle: LocalTime::default(),
595 last_gossip: LocalTime::default(),
596 last_sync: LocalTime::default(),
597 last_prune: LocalTime::default(),
598 last_timestamp,
599 last_announce: LocalTime::default(),
600 last_inventory: LocalTime::default(),
601 started_at: None, last_online_at: None, emitter,
604 listening: vec![],
605 metrics: Metrics::default(),
606 }
607 }
608
609 pub fn started(&self) -> Option<LocalTime> {
611 self.started_at
612 }
613
614 #[allow(clippy::should_implement_trait)]
616 pub fn next(&mut self) -> Option<io::Io> {
617 self.outbox.next()
618 }
619
620 pub fn seed(&mut self, id: &RepoId, scope: Scope) -> Result<bool, policy::Error> {
623 let updated = self.policies.seed(id, scope)?;
624 self.filter.insert(id);
625
626 Ok(updated)
627 }
628
629 pub fn unseed(&mut self, id: &RepoId) -> Result<bool, policy::Error> {
634 let updated = self.policies.unseed(id)?;
635
636 if updated {
637 self.filter = Filter::allowed_by(self.policies.seed_policies()?);
641 if let Err(e) = self.remove_inventory(id) {
643 warn!(target: "service", "Failed to update inventory after unseed: {e}");
644 }
645 }
646 Ok(updated)
647 }
648
649 #[allow(unused)]
653 pub fn closest_peers(&self, n: usize) -> Vec<NodeId> {
654 todo!()
655 }
656
657 pub fn database(&self) -> &Stores<D> {
659 &self.db
660 }
661
662 pub fn database_mut(&mut self) -> &mut Stores<D> {
664 &mut self.db
665 }
666
667 pub fn storage(&self) -> &S {
669 &self.storage
670 }
671
672 pub fn storage_mut(&mut self) -> &mut S {
674 &mut self.storage
675 }
676
677 pub fn policies(&self) -> &policy::Config<Write> {
679 &self.policies
680 }
681
682 pub fn signer(&self) -> &Device<G> {
684 &self.signer
685 }
686
687 pub fn events(&mut self) -> Events {
689 Events::from(self.emitter.subscribe())
690 }
691
692 pub fn fetcher(&self) -> &FetcherState {
693 self.fetcher.state()
694 }
695
696 pub fn outbox(&mut self) -> &mut Outbox {
698 &mut self.outbox
699 }
700
701 pub fn config(&self) -> &Config {
703 &self.config
704 }
705
706 pub fn lookup(&self, rid: RepoId) -> Result<Lookup, LookupError> {
708 let this = self.nid();
709 let local = self.storage.get(rid)?;
710 let remote = self
711 .db
712 .routing()
713 .get(&rid)?
714 .iter()
715 .filter(|nid| nid != &this)
716 .cloned()
717 .collect();
718
719 Ok(Lookup { local, remote })
720 }
721
722 pub fn tick(&mut self, now: LocalTime, metrics: &Metrics) {
723 trace!(
724 target: "service",
725 "Tick +{}",
726 now - self.started_at.expect("Service::tick: service must be initialized")
727 );
728 if now >= self.clock {
729 self.clock = now;
730 } else {
731 #[cfg(not(test))]
734 warn!(
735 target: "service",
736 "System clock is not monotonic: {now} is not greater or equal to {}", self.clock
737 );
738 }
739 self.metrics = metrics.clone();
740 }
741
742 pub fn wake(&mut self) {
743 let now = self.clock;
744
745 trace!(
746 target: "service",
747 "Wake +{}",
748 now - self.started_at.expect("Service::wake: service must be initialized")
749 );
750
751 if now - self.last_idle >= IDLE_INTERVAL {
752 trace!(target: "service", "Running 'idle' task...");
753
754 self.keep_alive(&now);
755 self.disconnect_unresponsive_peers(&now);
756 self.idle_connections();
757 self.maintain_connections();
758 self.dequeue_fetches();
759 self.outbox.wakeup(IDLE_INTERVAL);
760 self.last_idle = now;
761 }
762 if now - self.last_gossip >= GOSSIP_INTERVAL {
763 trace!(target: "service", "Running 'gossip' task...");
764
765 if let Err(e) = self.relay_announcements() {
766 warn!(target: "service", "Failed to relay stored announcements: {e}");
767 }
768 self.outbox.wakeup(GOSSIP_INTERVAL);
769 self.last_gossip = now;
770 }
771 if now - self.last_sync >= SYNC_INTERVAL {
772 trace!(target: "service", "Running 'sync' task...");
773
774 if let Err(e) = self.fetch_missing_repositories() {
775 warn!(target: "service", "Failed to fetch missing inventory: {e}");
776 }
777 self.outbox.wakeup(SYNC_INTERVAL);
778 self.last_sync = now;
779 }
780 if now - self.last_announce >= ANNOUNCE_INTERVAL {
781 trace!(target: "service", "Running 'announce' task...");
782
783 self.announce_inventory();
784 self.outbox.wakeup(ANNOUNCE_INTERVAL);
785 self.last_announce = now;
786 }
787 if now - self.last_prune >= PRUNE_INTERVAL {
788 trace!(target: "service", "Running 'prune' task...");
789
790 if let Err(err) = self.prune_routing_entries(&now) {
791 warn!(target: "service", "Failed to prune routing entries: {err}");
792 }
793 if let Err(err) = self
794 .db
795 .gossip_mut()
796 .prune((now - LocalDuration::from(self.config.limits.gossip_max_age)).into())
797 {
798 warn!(target: "service", "Failed to prune gossip entries: {err}");
799 }
800
801 self.outbox.wakeup(PRUNE_INTERVAL);
802 self.last_prune = now;
803 }
804
805 self.maintain_persistent();
807 }
808
809 pub fn command(&mut self, cmd: Command) {
810 info!(target: "service", "Received command {cmd:?}");
811
812 match cmd {
813 Command::Connect(nid, addr, opts) => {
814 if opts.persistent {
815 self.config.connect.insert((nid, addr.clone()).into());
816 }
817 if let Err(e) = self.connect(nid, addr) {
818 match e {
819 ConnectError::SessionExists { nid } => {
820 self.emitter.emit(Event::PeerConnected { nid });
821 }
822 e => {
823 self.emitter.emit(Event::PeerDisconnected {
825 nid,
826 reason: e.to_string(),
827 });
828 }
829 }
830 }
831 }
832 Command::Disconnect(nid) => {
833 self.outbox.disconnect(nid, DisconnectReason::Command);
834 }
835 Command::Config(resp) => {
836 resp.ok(self.config.clone()).ok();
837 }
838 Command::ListenAddrs(resp) => {
839 resp.ok(self.listening.clone()).ok();
840 }
841 Command::Seeds(rid, namespaces, resp) => match self.seeds(&rid, namespaces) {
842 Ok(seeds) => {
843 let (connected, disconnected) = seeds.partition();
844 debug!(
845 target: "service",
846 "Found {} connected seed(s) and {} disconnected seed(s) for {}",
847 connected.len(), disconnected.len(), rid
848 );
849 resp.ok(seeds).ok();
850 }
851 Err(e) => {
852 warn!(target: "service", "Failed to get seeds for {rid}: {e}");
853 resp.err(e).ok();
854 }
855 },
856 Command::Fetch(rid, seed, timeout, signed_references_minimum_feature_level, resp) => {
857 let feature_level = signed_references_minimum_feature_level
858 .unwrap_or(self.config.fetch.feature_level_min());
859 let config = self
860 .fetch_config()
861 .with_timeout(timeout)
862 .with_minimum_feature_level(feature_level);
863 self.fetch(rid, seed, vec![], config, Some(resp));
864 }
865 Command::Seed(rid, scope, resp) => {
866 let seeded = self
868 .seed(&rid, scope)
869 .expect("Service::command: error seeding repository");
870 resp.ok(seeded).ok();
871
872 self.outbox.broadcast(
874 Message::subscribe(self.filter(), self.clock.into(), Timestamp::MAX),
875 self.sessions.connected().map(|(_, s)| s),
876 );
877 }
878 Command::Unseed(id, resp) => {
879 let updated = self
880 .unseed(&id)
881 .expect("Service::command: error unseeding repository");
882 resp.ok(updated).ok();
883 }
884 Command::Follow(id, alias, resp) => {
885 let seeded = self
886 .policies
887 .follow(&id, alias.as_ref())
888 .expect("Service::command: error following node");
889 resp.ok(seeded).ok();
890 }
891 Command::Unfollow(id, resp) => {
892 let updated = self
893 .policies
894 .unfollow(&id)
895 .expect("Service::command: error unfollowing node");
896 resp.ok(updated).ok();
897 }
898 Command::Block(id, resp) => {
899 let updated = self
900 .policies
901 .set_follow_policy(&id, policy::Policy::Block)
902 .expect("Service::command: error blocking node");
903 if updated {
904 self.outbox.disconnect(id, DisconnectReason::Policy);
905 }
906 resp.send(updated).ok();
907 }
908 Command::AnnounceRefs(id, namespaces, resp) => {
909 let doc = match self.storage.get(id) {
910 Ok(Some(doc)) => doc,
911 Ok(None) => {
912 warn!(target: "service", "Failed to announce refs: repository {id} not found");
913 resp.err(command::Error::custom(format!("repository {id} not found")))
914 .ok();
915 return;
916 }
917 Err(e) => {
918 warn!(target: "service", "Failed to announce refs: doc error: {e}");
919 resp.err(e).ok();
920 return;
921 }
922 };
923
924 match self.announce_own_refs(id, doc, namespaces) {
925 Ok((refs, _timestamp)) => {
926 if let Some(refs) = refs.first() {
930 resp.ok(*refs).ok();
931 } else {
932 resp.err(command::Error::custom(format!(
933 "no refs were announced for {id}"
934 )))
935 .ok();
936 }
937 }
938 Err(err) => {
939 warn!(target: "service", "Failed to announce refs: {err}");
940 resp.err(err).ok();
941 }
942 }
943 }
944 Command::AnnounceInventory => {
945 self.announce_inventory();
946 }
947 Command::AddInventory(rid, resp) => match self.add_inventory(rid) {
948 Ok(updated) => {
949 resp.ok(updated).ok();
950 }
951 Err(e) => {
952 warn!(target: "service", "Failed to add {rid} to inventory: {e}");
953 resp.err(e).ok();
954 }
955 },
956 Command::QueryState(query, sender) => {
957 sender.send(query(self)).ok();
958 }
959 }
960 }
961
962 fn fetch_refs_at(
965 &mut self,
966 rid: RepoId,
967 from: NodeId,
968 refs: NonEmpty<RefsAt>,
969 scope: Scope,
970 config: fetcher::FetchConfig,
971 ) -> bool {
972 match self.refs_status_of(rid, refs, &scope) {
973 Ok(status) => {
974 if status.want.is_empty() {
975 debug!(target: "service", "Skipping fetch for {rid}, all refs are already in storage");
976 } else {
977 self.fetch(rid, from, status.want, config, None);
978 return true;
979 }
980 }
981 Err(e) => {
982 warn!(target: "service", "Failed to get the refs status of {rid}: {e}");
983 }
984 }
985 false
987 }
988
989 fn fetch(
990 &mut self,
991 rid: RepoId,
992 from: NodeId,
993 refs_at: Vec<RefsAt>,
994 config: fetcher::FetchConfig,
995 channel: Option<command::Responder<FetchResult>>,
996 ) {
997 let session = {
998 let reason = format!("peer {from} is not connected; cannot initiate fetch");
999 let Some(session) = self.sessions.get_mut(&from) else {
1000 if let Some(c) = channel {
1001 c.ok(FetchResult::Failed { reason }).ok();
1002 }
1003 return;
1004 };
1005 if !session.is_connected() {
1006 if let Some(c) = channel {
1007 c.ok(FetchResult::Failed { reason }).ok();
1008 }
1009 return;
1010 }
1011 session
1012 };
1013
1014 let cmd = fetcher::state::command::Fetch {
1015 from,
1016 rid,
1017 refs: refs_at.into(),
1018 config,
1019 };
1020 let fetcher::service::FetchInitiated { event, rejected } = self.fetcher.fetch(cmd, channel);
1021
1022 if let Some(c) = rejected {
1023 c.ok(FetchResult::Failed {
1024 reason: "fetch queue at capacity".to_string(),
1025 })
1026 .ok();
1027 }
1028
1029 match event {
1030 fetcher::state::event::Fetch::Started {
1031 rid,
1032 from,
1033 refs: refs_at,
1034 config,
1035 } => {
1036 debug!(target: "service", "Starting fetch for {rid} from {from}");
1037 self.outbox.fetch(
1038 session,
1039 rid,
1040 refs_at.into(),
1041 self.config.limits.fetch_pack_receive,
1042 config,
1043 );
1044 }
1045 fetcher::state::event::Fetch::Queued { rid, from } => {
1046 debug!(target: "service", "Queued fetch for {rid} from {from}");
1047 }
1048 fetcher::state::event::Fetch::AlreadyFetching { rid, from } => {
1049 debug!(target: "service", "Already fetching {rid} from {from}");
1050 }
1051 fetcher::state::event::Fetch::QueueAtCapacity { rid, from, .. } => {
1052 debug!(target: "service", "Queue at capacity for {from}, rejected {rid}");
1053 }
1054 }
1055 }
1056
1057 pub fn fetched(
1058 &mut self,
1059 rid: RepoId,
1060 from: NodeId,
1061 result: Result<crate::worker::fetch::FetchResult, crate::worker::FetchError>,
1062 ) {
1063 let cmd = fetcher::state::command::Fetched { from, rid };
1064 let fetcher::service::FetchCompleted { event, subscribers } = self.fetcher.fetched(cmd);
1065
1066 self.dequeue_fetches();
1068
1069 match event {
1070 fetcher::state::event::Fetched::NotFound { from, rid } => {
1071 debug!(target: "service", "Unexpected fetch result for {rid} from {from}");
1072 }
1073 fetcher::state::event::Fetched::Completed { from, rid, refs: _ } => {
1074 let fetch_result = match &result {
1076 Ok(success) => FetchResult::Success {
1077 updated: success.updated.clone(),
1078 namespaces: success.namespaces.clone(),
1079 clone: success.clone,
1080 },
1081 Err(e) => FetchResult::Failed {
1082 reason: e.to_string(),
1083 },
1084 };
1085 for responder in subscribers {
1086 responder.ok(fetch_result.clone()).ok();
1087 }
1088 match result {
1089 Ok(crate::worker::fetch::FetchResult {
1090 updated,
1091 canonical,
1092 namespaces,
1093 clone,
1094 doc,
1095 }) => {
1096 info!(target: "service", "Fetched {rid} from {from} successfully");
1097 self.seed_discovered(rid, from, self.clock.into());
1100
1101 for update in &updated {
1102 if update.is_skipped() {
1103 trace!(target: "service", "Ref skipped: {update} for {rid}");
1104 } else {
1105 debug!(target: "service", "Ref updated: {update} for {rid}");
1106 }
1107 }
1108 self.emitter.emit(Event::RefsFetched {
1109 remote: from,
1110 rid,
1111 updated: updated.clone(),
1112 });
1113 self.emitter
1114 .emit_all(canonical.into_iter().map(|(refname, target)| {
1115 Event::CanonicalRefUpdated {
1116 rid,
1117 refname,
1118 target,
1119 }
1120 }));
1121
1122 if clone && doc.is_public() {
1125 debug!(target: "service", "Updating and announcing inventory for cloned repository {rid}..");
1126
1127 if let Err(e) = self.add_inventory(rid) {
1128 warn!(target: "service", "Failed to announce inventory for {rid}: {e}");
1129 }
1130 }
1131
1132 if updated.is_empty() || updated.iter().all(|u| u.is_skipped()) {
1134 debug!(target: "service", "Nothing to announce, no refs were updated..");
1135 } else {
1136 if let Err(e) = self.announce_refs(rid, doc.into(), namespaces, false) {
1139 warn!(target: "service", "Failed to announce new refs: {e}");
1140 }
1141 }
1142 }
1143 Err(err) => {
1144 warn!(target: "service", "Fetch failed for {rid} from {from}: {err}");
1145
1146 if err.is_timeout() {
1149 self.outbox.disconnect(from, DisconnectReason::Fetch(err));
1150 }
1151 }
1152 }
1153 }
1154 }
1155 }
1156
1157 pub fn dequeue_fetches(&mut self) {
1165 let sessions = self
1166 .sessions
1167 .shuffled()
1168 .map(|(k, _)| *k)
1169 .collect::<Vec<_>>();
1170
1171 for nid in sessions {
1172 #[allow(clippy::unwrap_used)]
1173 let sess = self.sessions.get_mut(&nid).unwrap();
1174 if !sess.is_connected() {
1175 continue;
1176 }
1177
1178 let Some(fetcher::QueuedFetch {
1179 rid,
1180 refs: refs_at,
1181 config,
1182 }) = self.fetcher.dequeue(&nid)
1183 else {
1184 continue;
1185 };
1186
1187 let repo_entry = self
1189 .policies
1190 .seed_policy(&rid)
1191 .expect("error accessing repo seeding configuration");
1192
1193 let SeedingPolicy::Allow { scope } = repo_entry.policy else {
1194 debug!(target: "service", "Repository {} no longer seeded, skipping", rid);
1195 continue;
1196 };
1197
1198 debug!(target: "service", "Dequeued fetch for {} from {}", rid, nid);
1199
1200 match refs_at {
1201 RefsToFetch::Refs(refs) => {
1202 self.fetch_refs_at(rid, nid, refs, scope, config);
1203 }
1204 RefsToFetch::All => {
1205 self.fetch(rid, nid, vec![], config, None);
1208 }
1209 }
1210 }
1211 }
1212
1213 pub fn accepted(&mut self, ip: IpAddr) -> bool {
1215 if ip.is_loopback() || ip.is_unspecified() {
1218 return true;
1219 }
1220 if self.sessions.inbound().count() >= self.config.limits.connection.inbound.into() {
1222 return false;
1223 }
1224 match self.db.addresses().is_ip_banned(ip) {
1225 Ok(banned) => {
1226 if banned {
1227 debug!(target: "service", "Rejecting inbound connection from banned ip {ip}");
1228 return false;
1229 }
1230 }
1231 Err(e) => warn!(target: "service", "Failed to query ban status for {ip}: {e}"),
1232 }
1233 let host: HostName = ip.into();
1234 let tokens = self.config.limits.rate.inbound;
1235
1236 if self.limiter.limit(host.clone(), None, &tokens, self.clock) {
1237 trace!(target: "service", "Rate limiting inbound connection from {host}..");
1238 return false;
1239 }
1240 true
1241 }
1242
1243 pub fn attempted(&mut self, nid: NodeId, addr: Address) {
1244 debug!(target: "service", "Attempted connection to {nid} ({addr})");
1245
1246 if let Some(sess) = self.sessions.get_mut(&nid) {
1247 sess.to_attempted();
1248 } else {
1249 #[cfg(debug_assertions)]
1250 panic!("Service::attempted: unknown session {nid}@{addr}");
1251 }
1252 }
1253
1254 pub fn listening(&mut self, local_addr: net::SocketAddr) {
1255 info!(target: "node", "Listening on {local_addr}..");
1256
1257 self.listening.push(local_addr);
1258 }
1259
1260 pub fn connected(&mut self, remote: NodeId, addr: Address, link: Link) {
1261 if let Ok(true) = self.policies.is_blocked(&remote) {
1262 self.emitter.emit(Event::PeerDisconnected {
1263 nid: remote,
1264 reason: format!("{remote} is blocked"),
1265 });
1266 info!(target: "service", "Disconnecting blocked inbound peer {remote}");
1267 self.outbox.disconnect(remote, DisconnectReason::Policy);
1268 return;
1269 }
1270
1271 info!(target: "service", "Connected to {remote} ({addr}) ({link:?})");
1272 self.emitter.emit(Event::PeerConnected { nid: remote });
1273
1274 let msgs = self.initial(link);
1275
1276 if link.is_outbound() {
1277 if let Some(peer) = self.sessions.get_mut(&remote) {
1278 peer.to_connected(self.clock);
1279 self.outbox.write_all(peer, msgs);
1280 }
1281 } else {
1282 match self.sessions.entry(remote) {
1283 Entry::Occupied(mut e) => {
1284 let peer = e.get_mut();
1294 debug!(
1295 target: "service",
1296 "Connecting peer {remote} already has a session open ({peer})"
1297 );
1298 peer.link = link;
1299 peer.to_connected(self.clock);
1300 self.outbox.write_all(peer, msgs);
1301 }
1302 Entry::Vacant(e) => {
1303 if let HostName::Ip(ip) = addr.host {
1304 if !address::is_local(&ip) {
1305 if let Err(e) =
1306 self.db
1307 .addresses_mut()
1308 .record_ip(&remote, ip, self.clock.into())
1309 {
1310 log::debug!(target: "service", "Failed to record IP address for {remote}: {e}");
1311 }
1312 }
1313 }
1314 let peer = e.insert(Session::inbound(
1315 remote,
1316 addr,
1317 self.config.is_persistent(&remote),
1318 self.rng.clone(),
1319 self.clock,
1320 ));
1321 self.outbox.write_all(peer, msgs);
1322 }
1323 }
1324 }
1325 }
1326
1327 pub fn disconnected(&mut self, remote: NodeId, link: Link, reason: &DisconnectReason) {
1328 let since = self.local_time();
1329 let Some(session) = self.sessions.get_mut(&remote) else {
1330 trace!(target: "service", "Redundant disconnection for {remote} ({reason})");
1333 return;
1334 };
1335 if session.link != link {
1338 return;
1339 }
1340
1341 info!(target: "service", "Disconnected from {remote} ({reason})");
1342 self.emitter.emit(Event::PeerDisconnected {
1343 nid: remote,
1344 reason: reason.to_string(),
1345 });
1346
1347 let link = session.link;
1348 let addr = session.addr.clone();
1349
1350 let cmd = fetcher::state::command::Cancel { from: remote };
1351 let fetcher::service::FetchesCancelled { event, orphaned } = self.fetcher.cancel(cmd);
1352
1353 match event {
1354 fetcher::state::event::Cancel::Unexpected { from } => {
1355 debug!(target: "service", "No fetches to cancel for {from}");
1356 }
1357 fetcher::state::event::Cancel::Canceled {
1358 from,
1359 active,
1360 queued,
1361 } => {
1362 debug!(target: "service", "Cancelled {} ongoing, {} queued for {from}", active.len(), queued.len());
1363 }
1364 }
1365
1366 for (rid, responder) in orphaned {
1368 responder
1369 .ok(FetchResult::Failed {
1370 reason: format!("failed fetch to {rid}, peer disconnected: {reason}"),
1371 })
1372 .ok();
1373 }
1374
1375 if self.config.is_persistent(&remote) {
1377 let delay = LocalDuration::from_secs(2u64.saturating_pow(session.attempts() as u32))
1378 .clamp(MIN_RECONNECTION_DELTA, MAX_RECONNECTION_DELTA);
1379
1380 session.to_disconnected(since, since + delay);
1383
1384 debug!(target: "service", "Reconnecting to {remote} in {delay}..");
1385
1386 self.outbox.wakeup(delay);
1387 } else {
1388 debug!(target: "service", "Dropping peer {remote}..");
1389 self.sessions.remove(&remote);
1390
1391 let severity = match reason {
1392 DisconnectReason::Dial(_)
1393 | DisconnectReason::Fetch(_)
1394 | DisconnectReason::Connection(_) => {
1395 if self.is_online() {
1396 Severity::Medium
1399 } else {
1400 Severity::Low
1401 }
1402 }
1403 DisconnectReason::Session(e) => e.severity(),
1404 DisconnectReason::Command
1405 | DisconnectReason::Conflict
1406 | DisconnectReason::Policy
1407 | DisconnectReason::SelfConnection => Severity::Low,
1408 };
1409
1410 if let Err(e) = self
1411 .db
1412 .addresses_mut()
1413 .disconnected(&remote, &addr, severity)
1414 {
1415 debug!(target: "service", "Failed to update address store: {e}");
1416 }
1417 if link.is_outbound() {
1420 self.maintain_connections();
1421 }
1422 }
1423 self.dequeue_fetches();
1424 }
1425
1426 pub fn received_message(&mut self, remote: NodeId, message: Message) {
1427 if let Err(err) = self.handle_message(&remote, message) {
1428 self.outbox
1431 .disconnect(remote, DisconnectReason::Session(err));
1432
1433 }
1436 }
1437
1438 pub fn handle_announcement(
1443 &mut self,
1444 relayer: &NodeId,
1445 relayer_addr: &Address,
1446 announcement: &Announcement,
1447 ) -> Result<Option<gossip::AnnouncementId>, session::Error> {
1448 if !announcement.verify() {
1449 return Err(session::Error::Misbehavior);
1450 }
1451 let Announcement {
1452 node: announcer,
1453 message,
1454 ..
1455 } = announcement;
1456
1457 if announcer == self.nid() {
1459 return Ok(None);
1460 }
1461 let now = self.clock;
1462 let timestamp = message.timestamp();
1463
1464 if timestamp.saturating_sub(now.as_millis()) > MAX_TIME_DELTA.as_millis() as u64 {
1466 return Err(session::Error::InvalidTimestamp(timestamp));
1467 }
1468
1469 if let AnnouncementMessage::Inventory(_) | AnnouncementMessage::Refs(_) = message {
1478 match self.db.addresses().get(announcer) {
1479 Ok(node) => {
1480 if node.is_none() {
1481 debug!(target: "service", "Ignoring announcement from unknown node {announcer} (t={timestamp})");
1482 return Ok(None);
1483 }
1484 }
1485 Err(e) => {
1486 debug!(target: "service", "Failed to look up node in address book: {e}");
1487 return Ok(None);
1488 }
1489 }
1490 }
1491
1492 let relay = match self.db.gossip_mut().announced(announcer, announcement) {
1494 Ok(Some(id)) => {
1495 log::debug!(
1496 target: "service",
1497 "Stored announcement from {announcer} to be broadcast in {} (t={timestamp})",
1498 (self.last_gossip + GOSSIP_INTERVAL) - self.clock
1499 );
1500 self.relayed_by.entry(id).or_default().push(*relayer);
1502
1503 let relay = message.is_node_announcement()
1508 || now - timestamp.to_local_time() <= MAX_TIME_DELTA;
1509 relay.then_some(id)
1510 }
1511 Ok(None) => {
1512 debug!(target: "service", "Ignoring stale announcement from {announcer} (t={timestamp})");
1516 return Ok(None);
1517 }
1518 Err(e) => {
1519 debug!(target: "service", "Failed to update gossip entry from {announcer}: {e}");
1520 return Ok(None);
1521 }
1522 };
1523
1524 match message {
1525 AnnouncementMessage::Inventory(message) => {
1527 self.emitter.emit(Event::InventoryAnnounced {
1528 nid: *announcer,
1529 inventory: message.inventory.to_vec(),
1530 timestamp: message.timestamp,
1531 });
1532 match self.sync_routing(
1533 message.inventory.iter().cloned(),
1534 *announcer,
1535 message.timestamp,
1536 ) {
1537 Ok(synced) => {
1538 if synced.is_empty() {
1539 trace!(target: "service", "No routes updated by inventory announcement from {announcer}");
1540 return Ok(None);
1541 }
1542 }
1543 Err(e) => {
1544 debug!(target: "service", "Failed to process inventory from {announcer}: {e}");
1545 return Ok(None);
1546 }
1547 }
1548 let mut missing = Vec::new();
1549 let nid = *self.nid();
1550
1551 if let Some(sess) = self.sessions.get_mut(announcer) {
1554 for id in message.inventory.as_slice() {
1555 if let Some(sub) = &mut sess.subscribe {
1559 sub.filter.insert(id);
1560 }
1561
1562 if self.policies.is_seeding(id).expect(
1565 "Service::handle_announcement: error accessing seeding configuration",
1566 ) {
1567 match self.db.routing().entry(id, &nid) {
1570 Ok(entry) => {
1571 if entry.is_none() {
1572 missing.push(*id);
1573 }
1574 }
1575 Err(e) => debug!(
1576 target: "service",
1577 "Error checking local inventory for {id}: {e}"
1578 ),
1579 }
1580 }
1581 }
1582 }
1583 self.rng.shuffle(&mut missing);
1588
1589 for rid in missing {
1590 debug!(target: "service", "Missing seeded inventory {rid}; initiating fetch..");
1591 self.fetch(rid, *announcer, vec![], self.fetch_config(), None);
1592 }
1593 return Ok(relay);
1594 }
1595 AnnouncementMessage::Refs(message) => {
1596 self.emitter.emit(Event::RefsAnnounced {
1597 nid: *announcer,
1598 rid: message.rid,
1599 refs: message.refs.to_vec(),
1600 timestamp: message.timestamp,
1601 });
1602 let Some(refs) = NonEmpty::from_vec(message.refs.to_vec()) else {
1604 debug!(target: "service", "Skipping fetch, no refs in announcement for {} (t={timestamp})", message.rid);
1605 return Ok(None);
1606 };
1607 self.seed_discovered(message.rid, *announcer, message.timestamp);
1610
1611 if let Some(refs) = refs.iter().find(|r| &r.remote == self.nid()) {
1613 debug!(
1614 target: "service",
1615 "Refs announcement of {announcer} for {} contains our own remote at {} (t={})",
1616 message.rid, refs.at, message.timestamp
1617 );
1618 match self.db.seeds_mut().synced(
1619 &message.rid,
1620 announcer,
1621 refs.at,
1622 message.timestamp,
1623 ) {
1624 Ok(updated) => {
1625 if updated {
1626 debug!(
1627 target: "service",
1628 "Updating sync status of {announcer} for {} to {}",
1629 message.rid, refs.at
1630 );
1631 self.emitter.emit(Event::RefsSynced {
1632 rid: message.rid,
1633 remote: *announcer,
1634 at: refs.at,
1635 });
1636 } else {
1637 debug!(
1638 target: "service",
1639 "Sync status of {announcer} was not updated for {}",
1640 message.rid,
1641 );
1642 }
1643 }
1644 Err(e) => {
1645 debug!(target: "service", "Failed to update sync status for {}: {e}", message.rid);
1646 }
1647 }
1648 }
1649 let repo_entry = self.policies.seed_policy(&message.rid).expect(
1650 "Service::handle_announcement: error accessing repo seeding configuration",
1651 );
1652 let SeedingPolicy::Allow { scope } = repo_entry.policy else {
1653 debug!(
1654 target: "service",
1655 "Ignoring refs announcement from {announcer}: repository {} isn't seeded (t={timestamp})",
1656 message.rid
1657 );
1658 return Ok(None);
1659 };
1660 let Some(remote) = self.sessions.get(announcer).cloned() else {
1673 trace!(
1674 target: "service",
1675 "Skipping fetch of {}, no sessions connected to {announcer}",
1676 message.rid
1677 );
1678 return Ok(relay);
1679 };
1680 self.fetch_refs_at(message.rid, remote.id, refs, scope, self.fetch_config());
1682
1683 return Ok(relay);
1684 }
1685 AnnouncementMessage::Node(
1686 ann @ NodeAnnouncement {
1687 features,
1688 addresses,
1689 ..
1690 },
1691 ) => {
1692 self.emitter.emit(Event::NodeAnnounced {
1693 nid: *announcer,
1694 alias: ann.alias.clone(),
1695 timestamp: ann.timestamp,
1696 features: *features,
1697 addresses: addresses.to_vec(),
1698 });
1699 if !features.has(Features::SEED) {
1702 return Ok(relay);
1703 }
1704
1705 match self.db.addresses_mut().insert(
1706 announcer,
1707 ann.version,
1708 ann.features,
1709 &ann.alias,
1710 ann.work(),
1711 &ann.agent,
1712 timestamp,
1713 addresses
1714 .iter()
1715 .filter(|a| a.is_routable() || relayer_addr.is_local())
1718 .map(|a| KnownAddress::new(a.clone(), address::Source::Peer)),
1719 ) {
1720 Ok(updated) => {
1721 if updated {
1723 debug!(
1724 target: "service",
1725 "Address store entry for node {announcer} updated at {timestamp}"
1726 );
1727 return Ok(relay);
1728 }
1729 }
1730 Err(err) => {
1731 warn!(target: "service", "Failed to process node announcement from {announcer}: {err}");
1733 }
1734 }
1735 }
1736 }
1737 Ok(None)
1738 }
1739
1740 pub fn handle_info(&mut self, remote: NodeId, info: &Info) -> Result<(), session::Error> {
1741 match info {
1742 Info::RefsAlreadySynced { rid, at } => {
1744 debug!(target: "service", "Refs already synced for {rid} by {remote}");
1745 self.emitter.emit(Event::RefsSynced {
1746 rid: *rid,
1747 remote,
1748 at: *at,
1749 });
1750 }
1751 }
1752
1753 Ok(())
1754 }
1755
1756 pub fn handle_message(
1757 &mut self,
1758 remote: &NodeId,
1759 message: Message,
1760 ) -> Result<(), session::Error> {
1761 let local = self.node_id();
1762 let relay = self.config.is_relay();
1763 let Some(peer) = self.sessions.get_mut(remote) else {
1764 debug!(target: "service", "Session not found for {remote}");
1765 return Ok(());
1766 };
1767 peer.last_active = self.clock;
1768
1769 let limit: RateLimit = match peer.link {
1770 Link::Outbound => self.config.limits.rate.outbound.into(),
1771 Link::Inbound => self.config.limits.rate.inbound.into(),
1772 };
1773 if self
1774 .limiter
1775 .limit(peer.addr.clone().into(), Some(remote), &limit, self.clock)
1776 {
1777 debug!(target: "service", "Rate limiting message from {remote} ({})", peer.addr);
1778 return Ok(());
1779 }
1780 message.log(log::Level::Debug, remote, Link::Inbound);
1781
1782 let connected = match &mut peer.state {
1783 session::State::Disconnected { .. } => {
1784 debug!(target: "service", "Ignoring message from disconnected peer {}", peer.id);
1785 return Ok(());
1786 }
1787 session::State::Attempted | session::State::Initial => {
1794 debug!(target: "service", "Received unexpected message from connecting peer {}", peer.id);
1795 debug!(target: "service", "Transitioning peer {} to 'connected' state", peer.id);
1796
1797 peer.to_connected(self.clock);
1798
1799 None
1800 }
1801 session::State::Connected {
1802 ping, latencies, ..
1803 } => Some((ping, latencies)),
1804 };
1805
1806 trace!(target: "service", "Received message {message:?} from {remote}");
1807
1808 match message {
1809 Message::Announcement(ann) => {
1811 let relayer = remote;
1812 let relayer_addr = peer.addr.clone();
1813
1814 if let Some(id) = self.handle_announcement(relayer, &relayer_addr, &ann)? {
1815 if self.config.is_relay() {
1816 if let AnnouncementMessage::Inventory(_) = ann.message {
1817 if let Err(e) = self
1818 .database_mut()
1819 .gossip_mut()
1820 .set_relay(id, gossip::RelayStatus::Relay)
1821 {
1822 warn!(target: "service", "Failed to set relay flag for message: {e}");
1823 return Ok(());
1824 }
1825 } else {
1826 self.relay(id, ann);
1827 }
1828 }
1829 }
1830 }
1831 Message::Subscribe(subscribe) => {
1832 match self
1834 .db
1835 .gossip()
1836 .filtered(&subscribe.filter, subscribe.since, subscribe.until)
1837 {
1838 Ok(anns) => {
1839 for ann in anns {
1840 let ann = match ann {
1841 Ok(a) => a,
1842 Err(e) => {
1843 debug!(target: "service", "Failed to read gossip message from store: {e}");
1844 continue;
1845 }
1846 };
1847 if ann.node == *remote {
1849 continue;
1850 }
1851 if relay || ann.node == local {
1853 self.outbox.write(peer, ann.into());
1854 }
1855 }
1856 }
1857 Err(e) => {
1858 warn!(target: "service", "Failed to query gossip messages from store: {e}");
1859 }
1860 }
1861 peer.subscribe = Some(subscribe);
1862 }
1863 Message::Info(info) => {
1864 self.handle_info(*remote, &info)?;
1865 }
1866 Message::Ping(Ping { ponglen, .. }) => {
1867 if ponglen > Ping::MAX_PONG_ZEROES {
1869 return Ok(());
1870 }
1871 self.outbox.write(
1872 peer,
1873 Message::Pong {
1874 zeroes: ZeroBytes::new(ponglen),
1875 },
1876 );
1877 }
1878 Message::Pong { zeroes } => {
1879 if let Some((ping, latencies)) = connected {
1880 if let session::PingState::AwaitingResponse {
1881 len: ponglen,
1882 since,
1883 } = *ping
1884 {
1885 if (ponglen as usize) == zeroes.len() {
1886 *ping = session::PingState::Ok;
1887 latencies.push_back(self.clock - since);
1889 if latencies.len() > MAX_LATENCIES {
1890 latencies.pop_front();
1891 }
1892 }
1893 }
1894 }
1895 }
1896 }
1897 Ok(())
1898 }
1899
1900 fn refs_status_of(
1902 &self,
1903 rid: RepoId,
1904 refs: NonEmpty<RefsAt>,
1905 scope: &policy::Scope,
1906 ) -> Result<RefsStatus, Error> {
1907 let mut refs = RefsStatus::new(rid, refs, self.db.refs())?;
1908 if refs.want.is_empty() {
1910 return Ok(refs);
1911 }
1912 let mut refs = match scope {
1914 policy::Scope::All => refs,
1915 policy::Scope::Followed => match self.policies.namespaces_for(&self.storage, &rid) {
1916 Ok(Namespaces::All) => refs,
1917 Ok(Namespaces::Followed(followed)) => {
1918 refs.want.retain(|r| followed.contains(&r.remote));
1919 refs
1920 }
1921 Err(e) => return Err(e.into()),
1922 },
1923 };
1924 refs.want.retain(|r| r.remote != self.node_id());
1926
1927 Ok(refs)
1928 }
1929
1930 fn seed_discovered(&mut self, rid: RepoId, nid: NodeId, time: Timestamp) {
1932 if let Ok(result) = self.db.routing_mut().add_inventory([&rid], nid, time) {
1933 if let &[(_, InsertResult::SeedAdded)] = result.as_slice() {
1934 self.emitter.emit(Event::SeedDiscovered { rid, nid });
1935 debug!(target: "service", "Routing table updated for {rid} with seed {nid}");
1936 }
1937 }
1938 }
1939
1940 fn initial(&mut self, _link: Link) -> Vec<Message> {
1942 let now = self.clock();
1943 let filter = self.filter();
1944
1945 let since = if let Some(last) = self.last_online_at {
1955 Timestamp::from(last - SUBSCRIBE_BACKLOG_DELTA)
1956 } else {
1957 (*now - INITIAL_SUBSCRIBE_BACKLOG_DELTA).into()
1958 };
1959 debug!(target: "service", "Subscribing to messages since timestamp {since}..");
1960
1961 vec![
1962 Message::node(self.node.clone(), &self.signer),
1963 Message::inventory(self.inventory.clone(), &self.signer),
1964 Message::subscribe(filter, since, Timestamp::MAX),
1965 ]
1966 }
1967
1968 fn is_online(&self) -> bool {
1970 self.sessions
1971 .connected()
1972 .filter(|(_, s)| s.addr.is_routable() && s.last_active >= self.clock - IDLE_INTERVAL)
1973 .count()
1974 > 0
1975 }
1976
1977 fn remove_inventory(&mut self, rid: &RepoId) -> Result<bool, Error> {
1979 let node = self.node_id();
1980 let now = self.timestamp();
1981
1982 let removed = self.db.routing_mut().remove_inventory(rid, &node)?;
1983 if removed {
1984 self.refresh_and_announce_inventory(now)?;
1985 }
1986 Ok(removed)
1987 }
1988
1989 fn add_inventory(&mut self, rid: RepoId) -> Result<bool, Error> {
1991 let node = self.node_id();
1992 let now = self.timestamp();
1993
1994 if !self.storage.contains(&rid)? {
1995 debug!(target: "service", "Attempt to add non-existing inventory {rid}: repository not found in storage");
1996 return Ok(false);
1997 }
1998 let updates = self.db.routing_mut().add_inventory([&rid], node, now)?;
2000 let updated = !updates.is_empty();
2001
2002 if updated {
2003 self.refresh_and_announce_inventory(now)?;
2004 }
2005 Ok(updated)
2006 }
2007
2008 fn refresh_and_announce_inventory(&mut self, time: Timestamp) -> Result<(), Error> {
2010 let inventory = self.inventory()?;
2011
2012 self.inventory = gossip::inventory(time, inventory);
2013 self.announce_inventory();
2014
2015 Ok(())
2016 }
2017
2018 fn inventory(&self) -> Result<HashSet<RepoId>, Error> {
2029 self.db
2030 .routing()
2031 .get_inventory(self.nid())
2032 .map_err(Error::from)
2033 }
2034
2035 fn sync_routing(
2039 &mut self,
2040 inventory: impl IntoIterator<Item = RepoId>,
2041 from: NodeId,
2042 timestamp: Timestamp,
2043 ) -> Result<SyncedRouting, Error> {
2044 let mut synced = SyncedRouting::default();
2045 let included = inventory.into_iter().collect::<BTreeSet<_>>();
2046 let mut events = Vec::new();
2047
2048 for (rid, result) in
2049 self.db
2050 .routing_mut()
2051 .add_inventory(included.iter(), from, timestamp)?
2052 {
2053 match result {
2054 InsertResult::SeedAdded => {
2055 debug!(target: "service", "Routing table updated for {rid} with seed {from}");
2056 events.push(Event::SeedDiscovered { rid, nid: from });
2057
2058 if self
2059 .policies
2060 .is_seeding(&rid)
2061 .expect("Service::process_inventory: error accessing seeding configuration")
2062 {
2063 }
2066 synced.added.push(rid);
2067 }
2068 InsertResult::TimeUpdated => {
2069 synced.updated.push(rid);
2070 }
2071 InsertResult::NotUpdated => {}
2072 }
2073 }
2074
2075 synced.removed.extend(
2076 self.db
2077 .routing()
2078 .get_inventory(&from)?
2079 .into_iter()
2080 .filter(|rid| !included.contains(rid)),
2081 );
2082 self.db
2083 .routing_mut()
2084 .remove_inventories(&synced.removed, &from)?;
2085 events.extend(
2086 synced
2087 .removed
2088 .iter()
2089 .map(|&rid| Event::SeedDropped { rid, nid: from }),
2090 );
2091
2092 self.emitter.emit_all(events);
2093
2094 Ok(synced)
2095 }
2096
2097 fn refs_announcement_for(
2099 &mut self,
2100 rid: RepoId,
2101 remotes: impl IntoIterator<Item = NodeId>,
2102 ) -> Result<(Announcement, Vec<RefsAt>), Error> {
2103 let repo = self.storage.repository(rid)?;
2104 let timestamp = self.timestamp();
2105 let mut refs = BoundedVec::<_, REF_REMOTE_LIMIT>::new();
2106
2107 for remote_id in remotes.into_iter() {
2108 let refs_at = RefsAt::new(&repo, remote_id).map_err(|err| {
2109 radicle::storage::Error::Refs(radicle::storage::refs::Error::Read(err))
2110 })?;
2111
2112 if refs.push(refs_at).is_err() {
2113 warn!(
2114 target: "service",
2115 "refs announcement limit ({REF_REMOTE_LIMIT}) exceeded, peers will see only some of your repository references",
2116 );
2117 break;
2118 }
2119 }
2120
2121 let msg = AnnouncementMessage::from(RefsAnnouncement {
2122 rid,
2123 refs: refs.clone(),
2124 timestamp,
2125 });
2126 Ok((msg.signed(&self.signer), refs.into()))
2127 }
2128
2129 fn announce_own_refs(
2131 &mut self,
2132 rid: RepoId,
2133 doc: Doc,
2134 namespaces: impl IntoIterator<Item = NodeId>,
2135 ) -> Result<(Vec<RefsAt>, Timestamp), Error> {
2136 let (refs, timestamp) = self.announce_refs(rid, doc, namespaces, true)?;
2137
2138 for r in refs.iter() {
2142 self.emitter.emit(Event::LocalRefsAnnounced {
2143 rid,
2144 refs: *r,
2145 timestamp,
2146 });
2147 if let Err(e) = self.database_mut().refs_mut().set(
2148 &rid,
2149 &r.remote,
2150 &SIGREFS_BRANCH,
2151 r.at,
2152 timestamp.to_local_time(),
2153 ) {
2154 warn!(
2155 target: "service",
2156 "Error updating refs database for `rad/sigrefs` of {} in {rid}: {e}",
2157 r.remote
2158 );
2159 }
2160 }
2161 Ok((refs, timestamp))
2162 }
2163
2164 fn announce_refs(
2166 &mut self,
2167 rid: RepoId,
2168 doc: Doc,
2169 remotes: impl IntoIterator<Item = NodeId>,
2170 own: bool,
2171 ) -> Result<(Vec<RefsAt>, Timestamp), Error> {
2172 let (ann, refs) = self.refs_announcement_for(rid, remotes)?;
2173 let timestamp = ann.timestamp();
2174 let peers = self.sessions.connected().map(|(_, p)| p);
2175
2176 for r in refs.iter().filter(|r| own || r.remote == ann.node) {
2179 info!(
2180 target: "service",
2181 "Announcing refs {rid}/{r} to peers (t={timestamp})..",
2182 );
2183 if let Err(e) = self.db.seeds_mut().synced(&rid, &ann.node, r.at, timestamp) {
2185 warn!(target: "service", "Failed to update sync status for local node: {e}");
2186 } else {
2187 debug!(target: "service", "Saved local sync status for {rid}..");
2188 }
2189 }
2190
2191 self.outbox.announce(
2192 ann,
2193 peers.filter(|p| {
2194 doc.is_visible_to(&p.id.into())
2196 }),
2197 self.db.gossip_mut(),
2198 );
2199 Ok((refs, timestamp))
2200 }
2201
2202 fn reconnect(&mut self, nid: NodeId, addr: Address) -> bool {
2203 if let Some(sess) = self.sessions.get_mut(&nid) {
2204 sess.to_initial();
2205 self.outbox.connect(nid, addr);
2206
2207 return true;
2208 }
2209 false
2210 }
2211
2212 fn connect(&mut self, nid: NodeId, addr: Address) -> Result<(), ConnectError> {
2213 debug!(target: "service", "Connecting to {nid} ({addr})..");
2214
2215 if nid == self.node_id() {
2216 return Err(ConnectError::SelfConnection);
2217 }
2218 if let Ok(true) = self.policies.is_blocked(&nid) {
2219 return Err(ConnectError::Blocked { nid });
2220 }
2221 if !self.is_supported_address(&addr) {
2222 return Err(ConnectError::UnsupportedAddress { nid, addr });
2223 }
2224 if self.sessions.contains_key(&nid) {
2225 return Err(ConnectError::SessionExists { nid });
2226 }
2227 if self.sessions.outbound().count() >= self.config.limits.connection.outbound.into() {
2228 return Err(ConnectError::LimitReached { nid, addr });
2229 }
2230 let persistent = self.config.is_persistent(&nid);
2231 let timestamp: Timestamp = self.clock.into();
2232
2233 if let Err(e) = self.db.addresses_mut().attempted(&nid, &addr, timestamp) {
2234 warn!(target: "service", "Failed to update address book with connection attempt: {e}");
2235 }
2236 self.sessions.insert(
2237 nid,
2238 Session::outbound(nid, addr.clone(), persistent, self.rng.clone()),
2239 );
2240 self.outbox.connect(nid, addr);
2241
2242 Ok(())
2243 }
2244
2245 fn seeds(&self, rid: &RepoId, namespaces: HashSet<PublicKey>) -> Result<Seeds, Error> {
2246 let mut seeds = Seeds::new(self.rng.clone());
2247
2248 if let Ok(repo) = self.storage.repository(*rid) {
2253 for namespace in namespaces.iter() {
2254 let Ok(local) = RefsAt::new(&repo, *namespace) else {
2255 continue;
2256 };
2257
2258 for seed in self.db.seeds().seeds_for(rid)? {
2259 let seed = seed?;
2260 let state = self.sessions.get(&seed.nid).map(|s| s.state.clone());
2261 let synced = if local.at == seed.synced_at.oid {
2262 SyncStatus::Synced { at: seed.synced_at }
2263 } else {
2264 let local = SyncedAt::new(local.at, &repo)?;
2265
2266 SyncStatus::OutOfSync {
2267 local,
2268 remote: seed.synced_at,
2269 }
2270 };
2271 seeds.insert(Seed::new(seed.nid, seed.addresses, state, Some(synced)));
2272 }
2273 }
2274 }
2275
2276 for nid in self.db.routing().get(rid)? {
2280 if namespaces.contains(&nid) {
2281 continue;
2282 }
2283 if seeds.contains(&nid) {
2284 continue;
2286 }
2287 let addrs = self.db.addresses().addresses_of(&nid)?;
2288 let state = self.sessions.get(&nid).map(|s| s.state.clone());
2289
2290 seeds.insert(Seed::new(nid, addrs, state, None));
2291 }
2292 Ok(seeds)
2293 }
2294
2295 fn filter(&self) -> Filter {
2297 if self.config.seeding_policy.is_allow() {
2298 Filter::default()
2300 } else {
2301 self.filter.clone()
2302 }
2303 }
2304
2305 fn timestamp(&mut self) -> Timestamp {
2308 let now = Timestamp::from(self.clock);
2309 if *now > *self.last_timestamp {
2310 self.last_timestamp = now;
2311 } else {
2312 self.last_timestamp = self.last_timestamp + 1;
2313 }
2314 self.last_timestamp
2315 }
2316
2317 fn relay(&mut self, id: gossip::AnnouncementId, ann: Announcement) {
2318 let announcer = ann.node;
2319 let relayed_by = self.relayed_by.get(&id);
2320 let rid = if let AnnouncementMessage::Refs(RefsAnnouncement { rid, .. }) = ann.message {
2321 Some(rid)
2322 } else {
2323 None
2324 };
2325 let relay_to = self
2329 .sessions
2330 .connected()
2331 .filter(|(id, _)| {
2332 relayed_by
2333 .map(|relayers| !relayers.contains(id))
2334 .unwrap_or(true) })
2336 .filter(|(id, _)| **id != announcer)
2337 .filter(|(id, _)| {
2338 if let Some(rid) = rid {
2339 self.storage
2343 .get(rid)
2344 .ok()
2345 .flatten()
2346 .map(|doc| doc.is_visible_to(&(*id).into()))
2347 .unwrap_or(false)
2348 } else {
2349 true
2351 }
2352 })
2353 .map(|(_, p)| p);
2354
2355 self.outbox.relay(ann, relay_to);
2356 }
2357
2358 fn relay_announcements(&mut self) -> Result<(), Error> {
2363 let now = self.clock.into();
2364 let rows = self.database_mut().gossip_mut().relays(now)?;
2365 let local = self.node_id();
2366
2367 for (id, msg) in rows {
2368 let announcer = msg.node;
2369 if announcer == local {
2370 continue;
2372 }
2373 self.relay(id, msg);
2374 }
2375 Ok(())
2376 }
2377
2378 fn announce_inventory(&mut self) {
2380 let timestamp = self.inventory.timestamp.to_local_time();
2381
2382 if self.last_inventory == timestamp {
2383 debug!(target: "service", "Skipping redundant inventory announcement (t={})", self.inventory.timestamp);
2384 return;
2385 }
2386 let msg = AnnouncementMessage::from(self.inventory.clone());
2387
2388 self.outbox.announce(
2389 msg.signed(&self.signer),
2390 self.sessions.connected().map(|(_, p)| p),
2391 self.db.gossip_mut(),
2392 );
2393 self.last_inventory = timestamp;
2394 }
2395
2396 fn prune_routing_entries(&mut self, now: &LocalTime) -> Result<(), routing::Error> {
2397 let count = self.db.routing().len()?;
2398 if count <= self.config.limits.routing_max_size.into() {
2399 return Ok(());
2400 }
2401
2402 let delta = count - usize::from(self.config.limits.routing_max_size);
2403 let nid = self.node_id();
2404 self.db.routing_mut().prune(
2405 (*now - LocalDuration::from(self.config.limits.routing_max_age)).into(),
2406 Some(delta),
2407 &nid,
2408 )?;
2409 Ok(())
2410 }
2411
2412 fn disconnect_unresponsive_peers(&mut self, now: &LocalTime) {
2413 let stale = self
2414 .sessions
2415 .connected()
2416 .filter(|(_, session)| *now - session.last_active >= STALE_CONNECTION_TIMEOUT);
2417
2418 for (_, session) in stale {
2419 debug!(target: "service", "Disconnecting unresponsive peer {}..", session.id);
2420
2421 self.outbox.disconnect(
2425 session.id,
2426 DisconnectReason::Session(session::Error::Timeout),
2427 );
2428 }
2429 }
2430
2431 fn keep_alive(&mut self, now: &LocalTime) {
2433 let inactive_sessions = self
2434 .sessions
2435 .connected_mut()
2436 .filter(|(_, session)| *now - session.last_active >= KEEP_ALIVE_DELTA)
2437 .map(|(_, session)| session);
2438 for session in inactive_sessions {
2439 session.ping(self.clock, &mut self.outbox).ok();
2440 }
2441 }
2442
2443 fn available_peers(&mut self) -> Vec<Peer> {
2445 match self.db.addresses().entries() {
2446 Ok(entries) => {
2447 let mut peers = entries
2450 .filter(|entry| entry.version == PROTOCOL_VERSION)
2451 .filter(|entry| !entry.address.banned)
2452 .filter(|entry| !entry.penalty.is_connect_threshold_reached())
2453 .filter(|entry| !self.sessions.contains_key(&entry.node))
2454 .filter(|entry| !self.policies.is_blocked(&entry.node).unwrap_or(false))
2455 .filter(|entry| !self.config.external_addresses.contains(&entry.address.addr))
2456 .filter(|entry| &entry.node != self.nid())
2457 .filter(|entry| self.is_supported_address(&entry.address.addr))
2458 .fold(HashMap::new(), |mut acc, entry| {
2459 acc.entry(entry.node)
2460 .and_modify(|e: &mut Peer| e.addresses.push(entry.address.clone()))
2461 .or_insert_with(|| Peer {
2462 nid: entry.node,
2463 addresses: vec![entry.address],
2464 penalty: entry.penalty,
2465 });
2466 acc
2467 })
2468 .into_values()
2469 .collect::<Vec<_>>();
2470 peers.sort_by_key(|p| p.penalty);
2471 peers
2472 }
2473 Err(e) => {
2474 warn!(target: "service", "Unable to lookup available peers in address book: {e}");
2475 Vec::new()
2476 }
2477 }
2478 }
2479
2480 fn fetch_missing_repositories(&mut self) -> Result<(), Error> {
2482 let policies = self.policies.seed_policies()?.collect::<Vec<_>>();
2483 for policy in policies {
2484 let policy = match policy {
2485 Ok(policy) => policy,
2486 Err(err) => {
2487 debug!(target: "protocol::filter", "Failed to read seed policy: {err}");
2488 continue;
2489 }
2490 };
2491
2492 let rid = policy.rid;
2493
2494 if !policy.is_allow() {
2495 continue;
2496 }
2497 match self.storage.contains(&rid) {
2498 Ok(exists) => {
2499 if exists {
2500 continue;
2501 }
2502 }
2503 Err(err) => {
2504 log::debug!(target: "protocol::filter", "Failed to check if {rid} exists: {err}");
2505 continue;
2506 }
2507 }
2508 match self.seeds(&rid, [self.node_id()].into()) {
2509 Ok(seeds) => {
2510 if let Some(connected) = NonEmpty::from_vec(seeds.connected().collect()) {
2511 for seed in connected {
2512 self.fetch(rid, seed.nid, vec![], self.fetch_config(), None);
2513 }
2514 } else {
2515 debug!(target: "service", "No connected seeds found for {rid}..");
2525 }
2526 }
2527 Err(e) => {
2528 debug!(target: "service", "Couldn't fetch missing repo {rid}: failed to lookup seeds: {e}");
2529 }
2530 }
2531 }
2532 Ok(())
2533 }
2534
2535 fn idle_connections(&mut self) {
2537 for (_, sess) in self.sessions.iter_mut() {
2538 sess.idle(self.clock);
2539
2540 if sess.is_stable() {
2541 if let Err(e) =
2543 self.db
2544 .addresses_mut()
2545 .connected(&sess.id, &sess.addr, self.clock.into())
2546 {
2547 warn!(target: "service", "Failed to update address book with connection: {e}");
2548 }
2549 }
2550 }
2551 }
2552
2553 fn maintain_connections(&mut self) {
2555 let PeerConfig::Dynamic = self.config.peers else {
2556 return;
2557 };
2558 trace!(target: "service", "Maintaining connections..");
2559
2560 let target = TARGET_OUTBOUND_PEERS;
2561 let now = self.clock;
2562 let outbound = self
2563 .sessions
2564 .values()
2565 .filter(|s| s.link.is_outbound())
2566 .filter(|s| s.is_connected() || s.is_connecting())
2567 .count();
2568 let wanted = target.saturating_sub(outbound);
2569
2570 if wanted == 0 {
2572 return;
2573 }
2574
2575 let available = self
2577 .available_peers()
2578 .into_iter()
2579 .filter_map(|peer| {
2580 peer.addresses
2581 .into_iter()
2582 .find(|ka| match (ka.last_success, ka.last_attempt) {
2583 (Some(success), Some(attempt)) => {
2586 success >= attempt || now - attempt >= CONNECTION_RETRY_DELTA
2587 }
2588 (None, Some(attempt)) => now - attempt >= CONNECTION_RETRY_DELTA,
2590 (_, None) => true,
2592 })
2593 .map(|ka| (peer.nid, ka))
2594 })
2595 .filter(|(_, ka)| self.is_supported_address(&ka.addr));
2596
2597 let connect = available.take(wanted).collect::<Vec<_>>();
2599 if connect.len() < wanted {
2600 log::debug!(
2601 target: "service",
2602 "Not enough available peers to connect to (available={}, wanted={wanted})",
2603 connect.len()
2604 );
2605 }
2606 for (id, ka) in connect {
2607 if let Err(e) = self.connect(id, ka.addr.clone()) {
2608 warn!(target: "service", "Service::maintain_connections connection error: {e}");
2609 }
2610 }
2611 }
2612
2613 fn maintain_persistent(&mut self) {
2615 trace!(target: "service", "Maintaining persistent peers..");
2616
2617 let now = self.local_time();
2618 let mut reconnect = Vec::new();
2619
2620 for (nid, session) in self.sessions.iter_mut() {
2621 if self.config.is_persistent(nid) {
2622 if self.policies.is_blocked(nid).unwrap_or(false) {
2623 continue;
2624 }
2625 if let session::State::Disconnected { retry_at, .. } = &mut session.state {
2626 if now >= *retry_at {
2630 reconnect.push((*nid, session.addr.clone(), session.attempts()));
2631 }
2632 }
2633 }
2634 }
2635
2636 for (nid, addr, attempts) in reconnect {
2637 if self.reconnect(nid, addr) {
2638 debug!(target: "service", "Reconnecting to {nid} (attempts={attempts})...");
2639 }
2640 }
2641 }
2642
2643 fn is_supported_address(&self, address: &Address) -> bool {
2659 match AddressType::from(address) {
2660 #[cfg(feature = "tor")]
2662 AddressType::Onion => self.config.onion != radicle::node::config::AddressConfig::Drop,
2663 #[cfg(feature = "i2p")]
2664 AddressType::I2p => self.config.i2p != radicle::node::config::AddressConfig::Drop,
2665 AddressType::Dns | AddressType::Ipv4 | AddressType::Ipv6 => true,
2666 }
2667 }
2668
2669 fn fetch_config(&self) -> fetcher::FetchConfig {
2670 fetcher::FetchConfig::default()
2671 .with_minimum_feature_level(self.config.fetch.feature_level_min())
2672 }
2673}
2674
2675pub trait ServiceState {
2677 fn nid(&self) -> &NodeId;
2679 fn sessions(&self) -> &Sessions;
2681 fn fetching(&self) -> &FetcherState;
2683 fn outbox(&self) -> &Outbox;
2685 fn limiter(&self) -> &RateLimiter;
2687 fn emitter(&self) -> &Emitter<Event>;
2689 fn get(&self, rid: RepoId) -> Result<Option<Doc>, RepositoryError>;
2691 fn clock(&self) -> &LocalTime;
2693 fn clock_mut(&mut self) -> &mut LocalTime;
2695 fn config(&self) -> &Config;
2697 fn metrics(&self) -> &Metrics;
2699}
2700
2701impl<D, S, G> ServiceState for Service<D, S, G>
2702where
2703 D: routing::Store,
2704 G: crypto::signature::Signer<crypto::Signature>,
2705 S: ReadStorage,
2706{
2707 fn nid(&self) -> &NodeId {
2708 self.signer.public_key()
2709 }
2710
2711 fn sessions(&self) -> &Sessions {
2712 &self.sessions
2713 }
2714
2715 fn fetching(&self) -> &FetcherState {
2716 self.fetcher.state()
2717 }
2718
2719 fn outbox(&self) -> &Outbox {
2720 &self.outbox
2721 }
2722
2723 fn limiter(&self) -> &RateLimiter {
2724 &self.limiter
2725 }
2726
2727 fn emitter(&self) -> &Emitter<Event> {
2728 &self.emitter
2729 }
2730
2731 fn get(&self, rid: RepoId) -> Result<Option<Doc>, RepositoryError> {
2732 self.storage.get(rid)
2733 }
2734
2735 fn clock(&self) -> &LocalTime {
2736 &self.clock
2737 }
2738
2739 fn clock_mut(&mut self) -> &mut LocalTime {
2740 &mut self.clock
2741 }
2742
2743 fn config(&self) -> &Config {
2744 &self.config
2745 }
2746
2747 fn metrics(&self) -> &Metrics {
2748 &self.metrics
2749 }
2750}
2751
2752#[derive(Debug)]
2754pub enum DisconnectReason {
2755 Dial(Arc<dyn std::error::Error + Sync + Send>),
2758 Connection(Arc<dyn std::error::Error + Sync + Send>),
2761 Fetch(FetchError),
2763 Session(session::Error),
2765 Conflict,
2767 SelfConnection,
2769 Policy,
2771 Command,
2773}
2774
2775impl DisconnectReason {
2776 pub fn is_dial_err(&self) -> bool {
2777 matches!(self, Self::Dial(_))
2778 }
2779
2780 pub fn is_connection_err(&self) -> bool {
2781 matches!(self, Self::Connection(_))
2782 }
2783
2784 pub fn connection() -> Self {
2785 DisconnectReason::Connection(Arc::new(std::io::Error::from(
2786 std::io::ErrorKind::ConnectionReset,
2787 )))
2788 }
2789}
2790
2791impl fmt::Display for DisconnectReason {
2792 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2793 match self {
2794 Self::Dial(err) => write!(f, "{err}"),
2795 Self::Connection(err) => write!(f, "{err}"),
2796 Self::Command => write!(f, "command"),
2797 Self::SelfConnection => write!(f, "self-connection"),
2798 Self::Conflict => write!(f, "conflict"),
2799 Self::Policy => write!(f, "policy"),
2800 Self::Session(err) => write!(f, "{err}"),
2801 Self::Fetch(err) => write!(f, "fetch: {err}"),
2802 }
2803 }
2804}
2805
2806#[derive(Debug)]
2808pub struct Lookup {
2809 pub local: Option<Doc>,
2811 pub remote: Vec<NodeId>,
2813}
2814
2815#[derive(thiserror::Error, Debug)]
2816pub enum LookupError {
2817 #[error(transparent)]
2818 Routing(#[from] routing::Error),
2819 #[error(transparent)]
2820 Repository(#[from] RepositoryError),
2821}
2822
2823#[derive(Debug, Clone)]
2824pub struct Sessions(AddressBook<NodeId, Session>);
2826
2827impl Sessions {
2828 pub fn new(rng: Rng) -> Self {
2829 Self(AddressBook::new(rng))
2830 }
2831
2832 pub fn connected(&self) -> impl Iterator<Item = (&NodeId, &Session)> + Clone {
2834 self.0
2835 .iter()
2836 .filter_map(move |(id, sess)| match &sess.state {
2837 session::State::Connected { .. } => Some((id, sess)),
2838 _ => None,
2839 })
2840 }
2841
2842 pub fn inbound(&self) -> impl Iterator<Item = (&NodeId, &Session)> + Clone {
2844 self.connected().filter(|(_, s)| s.link.is_inbound())
2845 }
2846
2847 pub fn outbound(&self) -> impl Iterator<Item = (&NodeId, &Session)> + Clone {
2849 self.connected().filter(|(_, s)| s.link.is_outbound())
2850 }
2851
2852 pub fn connected_mut(&mut self) -> impl Iterator<Item = (&NodeId, &mut Session)> {
2854 self.0.iter_mut().filter(move |(_, s)| s.is_connected())
2855 }
2856
2857 pub fn disconnected_mut(&mut self) -> impl Iterator<Item = (&NodeId, &mut Session)> {
2859 self.0.iter_mut().filter(move |(_, s)| s.is_disconnected())
2860 }
2861
2862 pub fn is_connected(&self, id: &NodeId) -> bool {
2864 self.0.get(id).map(|s| s.is_connected()).unwrap_or(false)
2865 }
2866
2867 pub fn is_disconnected(&self, id: &NodeId) -> bool {
2869 self.0.get(id).map(|s| s.is_disconnected()).unwrap_or(true)
2870 }
2871}
2872
2873impl Deref for Sessions {
2874 type Target = AddressBook<NodeId, Session>;
2875
2876 fn deref(&self) -> &Self::Target {
2877 &self.0
2878 }
2879}
2880
2881impl DerefMut for Sessions {
2882 fn deref_mut(&mut self) -> &mut Self::Target {
2883 &mut self.0
2884 }
2885}