1#![allow(clippy::too_many_arguments)]
2#![allow(clippy::collapsible_match)]
3#![allow(clippy::collapsible_if)]
4#![warn(clippy::unwrap_used)]
5pub mod filter;
6pub mod gossip;
7pub mod io;
8pub mod limiter;
9pub mod message;
10pub mod session;
11
12use std::collections::hash_map::Entry;
13use std::collections::{BTreeSet, HashMap, HashSet};
14use std::net::IpAddr;
15use std::ops::{Deref, DerefMut};
16use std::sync::Arc;
17use std::{fmt, net, time};
18
19use crossbeam_channel as chan;
20use fastrand::Rng;
21use localtime::{LocalDuration, LocalTime};
22use log::*;
23use nonempty::NonEmpty;
24
25use radicle::identity::Doc;
26use radicle::node;
27use radicle::node::address;
28use radicle::node::address::Store as _;
29use radicle::node::address::{AddressBook, AddressType, KnownAddress};
30use radicle::node::config::{PeerConfig, RateLimit};
31use radicle::node::device::Device;
32use radicle::node::refs::Store as _;
33use radicle::node::routing::Store as _;
34use radicle::node::seed;
35use radicle::node::seed::Store as _;
36use radicle::node::{ConnectOptions, Penalty, Severity};
37use radicle::storage::refs::SIGREFS_BRANCH;
38use radicle::storage::RepositoryError;
39use radicle_fetch::policy::SeedingPolicy;
40
41use crate::service::gossip::Store as _;
42use crate::service::message::{
43 Announcement, AnnouncementMessage, Info, NodeAnnouncement, Ping, RefsAnnouncement, RefsStatus,
44};
45use crate::service::policy::{store::Write, Scope};
46use radicle::identity::RepoId;
47use radicle::node::events::Emitter;
48use radicle::node::routing;
49use radicle::node::routing::InsertResult;
50use radicle::node::{
51 Address, Alias, Features, FetchResult, HostName, Seed, Seeds, SyncStatus, SyncedAt,
52};
53use radicle::prelude::*;
54use radicle::storage;
55use radicle::storage::{refs::RefsAt, Namespaces, ReadStorage};
56use radicle::crypto;
59use radicle::node::Link;
60use radicle::node::PROTOCOL_VERSION;
61
62use crate::bounded::BoundedVec;
63use crate::service::filter::Filter;
64pub use crate::service::message::{Message, ZeroBytes};
65pub use crate::service::session::{QueuedFetch, Session};
66use crate::worker::FetchError;
67use radicle::node::events::{Event, Events};
68use radicle::node::{Config, NodeId};
69
70use radicle::node::policy::config as policy;
71
72use self::io::Outbox;
73use self::limiter::RateLimiter;
74use self::message::InventoryAnnouncement;
75use self::policy::NamespacesError;
76
77pub const IDLE_INTERVAL: LocalDuration = LocalDuration::from_secs(30);
79pub const GOSSIP_INTERVAL: LocalDuration = LocalDuration::from_secs(6);
81pub const ANNOUNCE_INTERVAL: LocalDuration = LocalDuration::from_mins(60);
83pub const SYNC_INTERVAL: LocalDuration = LocalDuration::from_secs(60);
85pub const PRUNE_INTERVAL: LocalDuration = LocalDuration::from_mins(30);
87pub const STALE_CONNECTION_TIMEOUT: LocalDuration = LocalDuration::from_mins(2);
89pub const KEEP_ALIVE_DELTA: LocalDuration = LocalDuration::from_mins(1);
91pub const MAX_LATENCIES: usize = 16;
93pub const MAX_TIME_DELTA: LocalDuration = LocalDuration::from_mins(60);
95pub const MAX_CONNECTION_ATTEMPTS: usize = 3;
97pub const INITIAL_SUBSCRIBE_BACKLOG_DELTA: LocalDuration = LocalDuration::from_mins(60 * 24);
100pub const SUBSCRIBE_BACKLOG_DELTA: LocalDuration = LocalDuration::from_mins(3);
103pub const MIN_RECONNECTION_DELTA: LocalDuration = LocalDuration::from_secs(3);
105pub const MAX_RECONNECTION_DELTA: LocalDuration = LocalDuration::from_mins(60);
107pub const CONNECTION_RETRY_DELTA: LocalDuration = LocalDuration::from_mins(10);
109pub const FETCH_TIMEOUT: time::Duration = time::Duration::from_secs(3);
111pub const TARGET_OUTBOUND_PEERS: usize = 8;
113
114pub use message::ADDRESS_LIMIT;
116pub use message::INVENTORY_LIMIT;
118pub use message::REF_REMOTE_LIMIT;
120
121#[derive(Clone, Debug, Default, serde::Serialize)]
123#[serde(rename_all = "camelCase")]
124pub struct Metrics {
125 pub peers: HashMap<NodeId, PeerMetrics>,
127 pub worker_queue_size: usize,
129 pub open_channels: usize,
131}
132
133impl Metrics {
134 pub fn peer(&mut self, nid: NodeId) -> &mut PeerMetrics {
136 self.peers.entry(nid).or_default()
137 }
138}
139
140#[derive(Clone, Debug, Default, serde::Serialize)]
142#[serde(rename_all = "camelCase")]
143pub struct PeerMetrics {
144 pub received_git_bytes: usize,
145 pub received_fetch_requests: usize,
146 pub received_bytes: usize,
147 pub received_gossip_messages: usize,
148 pub sent_bytes: usize,
149 pub sent_fetch_requests: usize,
150 pub sent_git_bytes: usize,
151 pub sent_gossip_messages: usize,
152 pub streams_opened: usize,
153 pub inbound_connection_attempts: usize,
154 pub outbound_connection_attempts: usize,
155 pub disconnects: usize,
156}
157
158#[derive(Default)]
160struct SyncedRouting {
161 added: Vec<RepoId>,
163 removed: Vec<RepoId>,
165 updated: Vec<RepoId>,
167}
168
169impl SyncedRouting {
170 fn is_empty(&self) -> bool {
171 self.added.is_empty() && self.removed.is_empty() && self.updated.is_empty()
172 }
173}
174
175#[derive(Debug, Clone)]
177struct Peer {
178 nid: NodeId,
179 addresses: Vec<KnownAddress>,
180 penalty: Penalty,
181}
182
183#[derive(thiserror::Error, Debug)]
185pub enum Error {
186 #[error(transparent)]
187 Git(#[from] radicle::git::raw::Error),
188 #[error(transparent)]
189 GitExt(#[from] radicle::git::ext::Error),
190 #[error(transparent)]
191 Storage(#[from] storage::Error),
192 #[error(transparent)]
193 Gossip(#[from] gossip::Error),
194 #[error(transparent)]
195 Refs(#[from] storage::refs::Error),
196 #[error(transparent)]
197 Routing(#[from] routing::Error),
198 #[error(transparent)]
199 Address(#[from] address::Error),
200 #[error(transparent)]
201 Database(#[from] node::db::Error),
202 #[error(transparent)]
203 Seeds(#[from] seed::Error),
204 #[error(transparent)]
205 Policy(#[from] policy::Error),
206 #[error(transparent)]
207 Repository(#[from] radicle::storage::RepositoryError),
208 #[error("namespaces error: {0}")]
209 Namespaces(Box<NamespacesError>),
210}
211
212impl From<NamespacesError> for Error {
213 fn from(e: NamespacesError) -> Self {
214 Self::Namespaces(Box::new(e))
215 }
216}
217
218#[derive(thiserror::Error, Debug)]
219pub enum ConnectError {
220 #[error("attempted connection to peer {nid} which already has a session")]
221 SessionExists { nid: NodeId },
222 #[error("attempted connection to self")]
223 SelfConnection,
224 #[error("outbound connection limit reached when attempting {nid} ({addr})")]
225 LimitReached { nid: NodeId, addr: Address },
226}
227
228pub trait Store:
230 address::Store + gossip::Store + routing::Store + seed::Store + node::refs::Store
231{
232}
233
234impl Store for radicle::node::Database {}
235
236pub type QueryState = dyn Fn(&dyn ServiceState) -> Result<(), CommandError> + Send + Sync;
238
239pub enum Command {
241 AnnounceRefs(RepoId, chan::Sender<RefsAt>),
243 AnnounceInventory,
245 AddInventory(RepoId, chan::Sender<bool>),
247 Connect(NodeId, Address, ConnectOptions),
249 Disconnect(NodeId),
251 Config(chan::Sender<Config>),
253 ListenAddrs(chan::Sender<Vec<std::net::SocketAddr>>),
255 Seeds(RepoId, chan::Sender<Seeds>),
257 Fetch(RepoId, NodeId, time::Duration, chan::Sender<FetchResult>),
259 Seed(RepoId, Scope, chan::Sender<bool>),
261 Unseed(RepoId, chan::Sender<bool>),
263 Follow(NodeId, Option<Alias>, chan::Sender<bool>),
265 Unfollow(NodeId, chan::Sender<bool>),
267 QueryState(Arc<QueryState>, chan::Sender<Result<(), CommandError>>),
269}
270
271impl fmt::Debug for Command {
272 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
273 match self {
274 Self::AnnounceRefs(id, _) => write!(f, "AnnounceRefs({id})"),
275 Self::AnnounceInventory => write!(f, "AnnounceInventory"),
276 Self::AddInventory(rid, _) => write!(f, "AddInventory({rid})"),
277 Self::Connect(id, addr, opts) => write!(f, "Connect({id}, {addr}, {opts:?})"),
278 Self::Disconnect(id) => write!(f, "Disconnect({id})"),
279 Self::Config(_) => write!(f, "Config"),
280 Self::ListenAddrs(_) => write!(f, "ListenAddrs"),
281 Self::Seeds(id, _) => write!(f, "Seeds({id})"),
282 Self::Fetch(id, node, _, _) => write!(f, "Fetch({id}, {node})"),
283 Self::Seed(id, scope, _) => write!(f, "Seed({id}, {scope})"),
284 Self::Unseed(id, _) => write!(f, "Unseed({id})"),
285 Self::Follow(id, _, _) => write!(f, "Follow({id})"),
286 Self::Unfollow(id, _) => write!(f, "Unfollow({id})"),
287 Self::QueryState { .. } => write!(f, "QueryState(..)"),
288 }
289 }
290}
291
292#[derive(thiserror::Error, Debug)]
294pub enum CommandError {
295 #[error(transparent)]
296 Storage(#[from] storage::Error),
297 #[error(transparent)]
298 Routing(#[from] routing::Error),
299 #[error(transparent)]
300 Policy(#[from] policy::Error),
301}
302
303#[derive(thiserror::Error, Debug)]
305enum TryFetchError<'a> {
306 #[error("ongoing fetch for repository exists")]
307 AlreadyFetching(&'a mut FetchState),
308 #[error("peer is not connected; cannot initiate fetch")]
309 SessionNotConnected,
310 #[error("peer fetch capacity reached; cannot initiate fetch")]
311 SessionCapacityReached,
312 #[error(transparent)]
313 Namespaces(Box<NamespacesError>),
314}
315
316impl From<NamespacesError> for TryFetchError<'_> {
317 fn from(e: NamespacesError) -> Self {
318 Self::Namespaces(Box::new(e))
319 }
320}
321
322#[derive(Debug)]
324pub struct FetchState {
325 pub from: NodeId,
327 pub refs_at: Vec<RefsAt>,
329 pub subscribers: Vec<chan::Sender<FetchResult>>,
331}
332
333impl FetchState {
334 fn subscribe(&mut self, c: chan::Sender<FetchResult>) {
336 if !self.subscribers.iter().any(|s| s.same_channel(&c)) {
337 self.subscribers.push(c);
338 }
339 }
340}
341
342#[derive(Debug)]
344pub struct Stores<D>(D);
345
346impl<D> Stores<D>
347where
348 D: Store,
349{
350 pub fn routing(&self) -> &impl routing::Store {
352 &self.0
353 }
354
355 pub fn routing_mut(&mut self) -> &mut impl routing::Store {
357 &mut self.0
358 }
359
360 pub fn addresses(&self) -> &impl address::Store {
362 &self.0
363 }
364
365 pub fn addresses_mut(&mut self) -> &mut impl address::Store {
367 &mut self.0
368 }
369
370 pub fn gossip(&self) -> &impl gossip::Store {
372 &self.0
373 }
374
375 pub fn gossip_mut(&mut self) -> &mut impl gossip::Store {
377 &mut self.0
378 }
379
380 pub fn seeds(&self) -> &impl seed::Store {
382 &self.0
383 }
384
385 pub fn seeds_mut(&mut self) -> &mut impl seed::Store {
387 &mut self.0
388 }
389
390 pub fn refs(&self) -> &impl node::refs::Store {
392 &self.0
393 }
394
395 pub fn refs_mut(&mut self) -> &mut impl node::refs::Store {
397 &mut self.0
398 }
399}
400
401impl<D> AsMut<D> for Stores<D> {
402 fn as_mut(&mut self) -> &mut D {
403 &mut self.0
404 }
405}
406
407impl<D> From<D> for Stores<D> {
408 fn from(db: D) -> Self {
409 Self(db)
410 }
411}
412
413#[derive(Debug)]
415pub struct Service<D, S, G> {
416 config: Config,
418 signer: Device<G>,
420 storage: S,
422 db: Stores<D>,
424 policies: policy::Config<Write>,
426 sessions: Sessions,
428 clock: LocalTime,
430 relayed_by: HashMap<gossip::AnnouncementId, Vec<NodeId>>,
433 outbox: Outbox,
435 node: NodeAnnouncement,
437 inventory: InventoryAnnouncement,
439 rng: Rng,
441 fetching: HashMap<RepoId, FetchState>,
443 limiter: RateLimiter,
445 filter: Filter,
447 last_idle: LocalTime,
449 last_gossip: LocalTime,
451 last_sync: LocalTime,
453 last_prune: LocalTime,
455 last_announce: LocalTime,
457 last_inventory: LocalTime,
459 last_timestamp: Timestamp,
461 started_at: Option<LocalTime>,
463 last_online_at: Option<LocalTime>,
465 emitter: Emitter<Event>,
467 listening: Vec<net::SocketAddr>,
469 metrics: Metrics,
471}
472
473impl<D, S, G> Service<D, S, G> {
474 pub fn node_id(&self) -> NodeId {
476 *self.signer.public_key()
477 }
478
479 pub fn local_time(&self) -> LocalTime {
481 self.clock
482 }
483
484 pub fn emitter(&self) -> Emitter<Event> {
485 self.emitter.clone()
486 }
487}
488
489impl<D, S, G> Service<D, S, G>
490where
491 D: Store,
492 S: ReadStorage + 'static,
493 G: crypto::signature::Signer<crypto::Signature>,
494{
495 pub fn new(
496 config: Config,
497 db: Stores<D>,
498 storage: S,
499 policies: policy::Config<Write>,
500 signer: Device<G>,
501 rng: Rng,
502 node: NodeAnnouncement,
503 emitter: Emitter<Event>,
504 ) -> Self {
505 let sessions = Sessions::new(rng.clone());
506 let limiter = RateLimiter::new(config.peers());
507 let last_timestamp = node.timestamp;
508 let clock = LocalTime::default(); let inventory = gossip::inventory(clock.into(), []); Self {
512 config,
513 storage,
514 policies,
515 signer,
516 rng,
517 inventory,
518 node,
519 clock,
520 db,
521 outbox: Outbox::default(),
522 limiter,
523 sessions,
524 fetching: HashMap::new(),
525 filter: Filter::empty(),
526 relayed_by: HashMap::default(),
527 last_idle: LocalTime::default(),
528 last_gossip: LocalTime::default(),
529 last_sync: LocalTime::default(),
530 last_prune: LocalTime::default(),
531 last_timestamp,
532 last_announce: LocalTime::default(),
533 last_inventory: LocalTime::default(),
534 started_at: None, last_online_at: None, emitter,
537 listening: vec![],
538 metrics: Metrics::default(),
539 }
540 }
541
542 pub fn started(&self) -> Option<LocalTime> {
544 self.started_at
545 }
546
547 #[allow(clippy::should_implement_trait)]
549 pub fn next(&mut self) -> Option<io::Io> {
550 self.outbox.next()
551 }
552
553 pub fn seed(&mut self, id: &RepoId, scope: Scope) -> Result<bool, policy::Error> {
556 let updated = self.policies.seed(id, scope)?;
557 self.filter.insert(id);
558
559 Ok(updated)
560 }
561
562 pub fn unseed(&mut self, id: &RepoId) -> Result<bool, policy::Error> {
567 let updated = self.policies.unseed(id)?;
568
569 if updated {
570 self.filter = Filter::allowed_by(self.policies.seed_policies()?);
574 if let Err(e) = self.remove_inventory(id) {
576 error!(target: "service", "Error updating inventory after unseed: {e}");
577 }
578 }
579 Ok(updated)
580 }
581
582 #[allow(unused)]
586 pub fn closest_peers(&self, n: usize) -> Vec<NodeId> {
587 todo!()
588 }
589
590 pub fn database(&self) -> &Stores<D> {
592 &self.db
593 }
594
595 pub fn database_mut(&mut self) -> &mut Stores<D> {
597 &mut self.db
598 }
599
600 pub fn storage(&self) -> &S {
602 &self.storage
603 }
604
605 pub fn storage_mut(&mut self) -> &mut S {
607 &mut self.storage
608 }
609
610 pub fn policies(&self) -> &policy::Config<Write> {
612 &self.policies
613 }
614
615 pub fn signer(&self) -> &Device<G> {
617 &self.signer
618 }
619
620 pub fn events(&mut self) -> Events {
622 Events::from(self.emitter.subscribe())
623 }
624
625 pub fn outbox(&mut self) -> &mut Outbox {
627 &mut self.outbox
628 }
629
630 pub fn config(&self) -> &Config {
632 &self.config
633 }
634
635 pub fn lookup(&self, rid: RepoId) -> Result<Lookup, LookupError> {
637 let this = self.nid();
638 let local = self.storage.get(rid)?;
639 let remote = self
640 .db
641 .routing()
642 .get(&rid)?
643 .iter()
644 .filter(|nid| nid != &this)
645 .cloned()
646 .collect();
647
648 Ok(Lookup { local, remote })
649 }
650
651 pub fn initialize(&mut self, time: LocalTime) -> Result<(), Error> {
653 debug!(target: "service", "Init @{}", time.as_millis());
654 assert_ne!(time, LocalTime::default());
655
656 let nid = self.node_id();
657
658 self.clock = time;
659 self.started_at = Some(time);
660 self.last_online_at = match self.db.gossip().last() {
661 Ok(Some(last)) => Some(last.to_local_time()),
662 Ok(None) => None,
663 Err(e) => {
664 error!(target: "service", "Error getting the lastest gossip message from db: {e}");
665 None
666 }
667 };
668
669 match self.db.refs().count() {
672 Ok(0) => {
673 info!(target: "service", "Empty refs database, populating from storage..");
674 if let Err(e) = self.db.refs_mut().populate(&self.storage) {
675 error!(target: "service", "Failed to populate refs database: {e}");
676 }
677 }
678 Ok(n) => debug!(target: "service", "Refs database has {n} cached references"),
679 Err(e) => error!(target: "service", "Error checking refs database: {e}"),
680 }
681
682 let announced = self
683 .db
684 .seeds()
685 .seeded_by(&nid)?
686 .collect::<Result<HashMap<_, _>, _>>()?;
687 let mut inventory = BTreeSet::new();
688 let mut private = BTreeSet::new();
689
690 for repo in self.storage.repositories()? {
691 let rid = repo.rid;
692
693 if !self.policies.is_seeding(&rid)? {
695 warn!(target: "service", "Local repository {rid} is not seeded");
696 continue;
697 }
698 if repo.doc.is_public() {
700 inventory.insert(rid);
701 } else {
702 private.insert(rid);
703 }
704 let Some(updated_at) = repo.synced_at else {
706 continue;
707 };
708 if let Some(announced) = announced.get(&rid) {
710 if updated_at.oid == announced.oid {
711 continue;
712 }
713 }
714 if self.db.seeds_mut().synced(
716 &rid,
717 &nid,
718 updated_at.oid,
719 updated_at.timestamp.into(),
720 )? {
721 debug!(target: "service", "Saved local sync status for {rid}..");
722 }
723 if let Ok((ann, _)) = self.refs_announcement_for(rid, [nid]) {
727 debug!(target: "service", "Adding refs announcement for {rid} to historical gossip messages..");
728 self.db.gossip_mut().announced(&nid, &ann)?;
729 }
730 }
731
732 self.db
736 .routing_mut()
737 .add_inventory(inventory.iter(), nid, time.into())?;
738 self.inventory = gossip::inventory(self.timestamp(), inventory);
739
740 self.db
743 .routing_mut()
744 .remove_inventories(private.iter(), &nid)?;
745
746 self.filter = Filter::allowed_by(self.policies.seed_policies()?);
748 let addrs = self.config.connect.clone();
750 for (id, addr) in addrs.into_iter().map(|ca| ca.into()) {
751 if let Err(e) = self.connect(id, addr) {
752 error!(target: "service", "Service::initialization connection error: {e}");
753 }
754 }
755 self.maintain_connections();
757 self.outbox.wakeup(IDLE_INTERVAL);
759 self.outbox.wakeup(GOSSIP_INTERVAL);
760
761 Ok(())
762 }
763
764 pub fn tick(&mut self, now: LocalTime, metrics: &Metrics) {
765 trace!(
766 target: "service",
767 "Tick +{}",
768 now - self.started_at.expect("Service::tick: service must be initialized")
769 );
770 if now >= self.clock {
771 self.clock = now;
772 } else {
773 #[cfg(not(test))]
776 warn!(
777 target: "service",
778 "System clock is not monotonic: {now} is not greater or equal to {}", self.clock
779 );
780 }
781 self.metrics = metrics.clone();
782 }
783
784 pub fn wake(&mut self) {
785 let now = self.clock;
786
787 trace!(
788 target: "service",
789 "Wake +{}",
790 now - self.started_at.expect("Service::wake: service must be initialized")
791 );
792
793 if now - self.last_idle >= IDLE_INTERVAL {
794 trace!(target: "service", "Running 'idle' task...");
795
796 self.keep_alive(&now);
797 self.disconnect_unresponsive_peers(&now);
798 self.idle_connections();
799 self.maintain_connections();
800 self.dequeue_fetches();
801 self.outbox.wakeup(IDLE_INTERVAL);
802 self.last_idle = now;
803 }
804 if now - self.last_gossip >= GOSSIP_INTERVAL {
805 trace!(target: "service", "Running 'gossip' task...");
806
807 if let Err(e) = self.relay_announcements() {
808 error!(target: "service", "Error relaying stored announcements: {e}");
809 }
810 self.outbox.wakeup(GOSSIP_INTERVAL);
811 self.last_gossip = now;
812 }
813 if now - self.last_sync >= SYNC_INTERVAL {
814 trace!(target: "service", "Running 'sync' task...");
815
816 if let Err(e) = self.fetch_missing_repositories() {
817 error!(target: "service", "Error fetching missing inventory: {e}");
818 }
819 self.outbox.wakeup(SYNC_INTERVAL);
820 self.last_sync = now;
821 }
822 if now - self.last_announce >= ANNOUNCE_INTERVAL {
823 trace!(target: "service", "Running 'announce' task...");
824
825 self.announce_inventory();
826 self.outbox.wakeup(ANNOUNCE_INTERVAL);
827 self.last_announce = now;
828 }
829 if now - self.last_prune >= PRUNE_INTERVAL {
830 trace!(target: "service", "Running 'prune' task...");
831
832 if let Err(err) = self.prune_routing_entries(&now) {
833 error!(target: "service", "Error pruning routing entries: {err}");
834 }
835 if let Err(err) = self
836 .db
837 .gossip_mut()
838 .prune((now - LocalDuration::from(self.config.limits.gossip_max_age)).into())
839 {
840 error!(target: "service", "Error pruning gossip entries: {err}");
841 }
842
843 self.outbox.wakeup(PRUNE_INTERVAL);
844 self.last_prune = now;
845 }
846
847 self.maintain_persistent();
849 }
850
851 pub fn command(&mut self, cmd: Command) {
852 info!(target: "service", "Received command {cmd:?}");
853
854 match cmd {
855 Command::Connect(nid, addr, opts) => {
856 if opts.persistent {
857 self.config.connect.insert((nid, addr.clone()).into());
858 }
859 if let Err(e) = self.connect(nid, addr) {
860 match e {
861 ConnectError::SessionExists { nid } => {
862 self.emitter.emit(Event::PeerConnected { nid });
863 }
864 e => {
865 self.emitter.emit(Event::PeerDisconnected {
867 nid,
868 reason: e.to_string(),
869 });
870 }
871 }
872 }
873 }
874 Command::Disconnect(nid) => {
875 self.outbox.disconnect(nid, DisconnectReason::Command);
876 }
877 Command::Config(resp) => {
878 resp.send(self.config.clone()).ok();
879 }
880 Command::ListenAddrs(resp) => {
881 resp.send(self.listening.clone()).ok();
882 }
883 Command::Seeds(rid, resp) => match self.seeds(&rid) {
884 Ok(seeds) => {
885 let (connected, disconnected) = seeds.partition();
886 debug!(
887 target: "service",
888 "Found {} connected seed(s) and {} disconnected seed(s) for {}",
889 connected.len(), disconnected.len(), rid
890 );
891 resp.send(seeds).ok();
892 }
893 Err(e) => {
894 error!(target: "service", "Error getting seeds for {rid}: {e}");
895 }
896 },
897 Command::Fetch(rid, seed, timeout, resp) => {
898 self.fetch(rid, seed, timeout, Some(resp));
899 }
900 Command::Seed(rid, scope, resp) => {
901 let seeded = self
903 .seed(&rid, scope)
904 .expect("Service::command: error seeding repository");
905 resp.send(seeded).ok();
906
907 self.outbox.broadcast(
909 Message::subscribe(self.filter(), self.clock.into(), Timestamp::MAX),
910 self.sessions.connected().map(|(_, s)| s),
911 );
912 }
913 Command::Unseed(id, resp) => {
914 let updated = self
915 .unseed(&id)
916 .expect("Service::command: error unseeding repository");
917 resp.send(updated).ok();
918 }
919 Command::Follow(id, alias, resp) => {
920 let seeded = self
921 .policies
922 .follow(&id, alias.as_ref())
923 .expect("Service::command: error following node");
924 resp.send(seeded).ok();
925 }
926 Command::Unfollow(id, resp) => {
927 let updated = self
928 .policies
929 .unfollow(&id)
930 .expect("Service::command: error unfollowing node");
931 resp.send(updated).ok();
932 }
933 Command::AnnounceRefs(id, resp) => {
934 let doc = match self.storage.get(id) {
935 Ok(Some(doc)) => doc,
936 Ok(None) => {
937 error!(target: "service", "Error announcing refs: repository {id} not found");
938 return;
939 }
940 Err(e) => {
941 error!(target: "service", "Error announcing refs: doc error: {e}");
942 return;
943 }
944 };
945
946 match self.announce_own_refs(id, doc) {
947 Ok(refs) => match refs.as_slice() {
948 &[refs] => {
949 resp.send(refs).ok();
950 }
951 [..] => panic!("Service::command: unexpected refs returned"),
953 },
954 Err(err) => {
955 error!(target: "service", "Error announcing refs: {err}");
956 }
957 }
958 }
959 Command::AnnounceInventory => {
960 self.announce_inventory();
961 }
962 Command::AddInventory(rid, resp) => match self.add_inventory(rid) {
963 Ok(updated) => {
964 resp.send(updated).ok();
965 }
966 Err(e) => {
967 error!(target: "service", "Error adding {rid} to inventory: {e}");
968 }
969 },
970 Command::QueryState(query, sender) => {
971 sender.send(query(self)).ok();
972 }
973 }
974 }
975
976 fn fetch_refs_at(
979 &mut self,
980 rid: RepoId,
981 from: NodeId,
982 refs: NonEmpty<RefsAt>,
983 scope: Scope,
984 timeout: time::Duration,
985 channel: Option<chan::Sender<FetchResult>>,
986 ) -> bool {
987 match self.refs_status_of(rid, refs, &scope) {
988 Ok(status) => {
989 if status.want.is_empty() {
990 debug!(target: "service", "Skipping fetch for {rid}, all refs are already in storage");
991 } else {
992 return self._fetch(rid, from, status.want, timeout, channel);
993 }
994 }
995 Err(e) => {
996 error!(target: "service", "Error getting the refs status of {rid}: {e}");
997 }
998 }
999 false
1001 }
1002
1003 fn fetch(
1005 &mut self,
1006 rid: RepoId,
1007 from: NodeId,
1008 timeout: time::Duration,
1009 channel: Option<chan::Sender<FetchResult>>,
1010 ) -> bool {
1011 self._fetch(rid, from, vec![], timeout, channel)
1012 }
1013
1014 fn _fetch(
1015 &mut self,
1016 rid: RepoId,
1017 from: NodeId,
1018 refs_at: Vec<RefsAt>,
1019 timeout: time::Duration,
1020 channel: Option<chan::Sender<FetchResult>>,
1021 ) -> bool {
1022 match self.try_fetch(rid, &from, refs_at.clone(), timeout) {
1023 Ok(fetching) => {
1024 if let Some(c) = channel {
1025 fetching.subscribe(c);
1026 }
1027 return true;
1028 }
1029 Err(TryFetchError::AlreadyFetching(fetching)) => {
1030 if fetching.from == from && fetching.refs_at == refs_at {
1034 debug!(target: "service", "Ignoring redundant fetch of {rid} from {from}");
1035
1036 if let Some(c) = channel {
1037 fetching.subscribe(c);
1038 }
1039 } else {
1040 let fetch = QueuedFetch {
1041 rid,
1042 refs_at,
1043 from,
1044 timeout,
1045 channel,
1046 };
1047 debug!(target: "service", "Queueing fetch for {rid} with {from} (already fetching)..");
1048
1049 self.queue_fetch(fetch);
1050 }
1051 }
1052 Err(TryFetchError::SessionCapacityReached) => {
1053 debug!(target: "service", "Fetch capacity reached for {from}, queueing {rid}..");
1054 self.queue_fetch(QueuedFetch {
1055 rid,
1056 refs_at,
1057 from,
1058 timeout,
1059 channel,
1060 });
1061 }
1062 Err(e) => {
1063 if let Some(c) = channel {
1064 c.send(FetchResult::Failed {
1065 reason: e.to_string(),
1066 })
1067 .ok();
1068 }
1069 }
1070 }
1071 false
1072 }
1073
1074 fn queue_fetch(&mut self, fetch: QueuedFetch) {
1075 let Some(s) = self.sessions.get_mut(&fetch.from) else {
1076 log::error!(target: "service", "Cannot queue fetch for unknown session {}", fetch.from);
1077 return;
1078 };
1079 if let Err(e) = s.queue_fetch(fetch) {
1080 let fetch = e.inner();
1081 log::debug!(target: "service", "Unable to queue fetch for {} with {}: {e}", &fetch.rid, &fetch.from);
1082 }
1083 }
1084
1085 fn try_fetch(
1087 &mut self,
1088 rid: RepoId,
1089 from: &NodeId,
1090 refs_at: Vec<RefsAt>,
1091 timeout: time::Duration,
1092 ) -> Result<&mut FetchState, TryFetchError> {
1093 let from = *from;
1094 let Some(session) = self.sessions.get_mut(&from) else {
1095 return Err(TryFetchError::SessionNotConnected);
1096 };
1097 let fetching = self.fetching.entry(rid);
1098
1099 trace!(target: "service", "Trying to fetch {refs_at:?} for {rid}..");
1100
1101 let fetching = match fetching {
1102 Entry::Vacant(fetching) => fetching,
1103 Entry::Occupied(fetching) => {
1104 return Err(TryFetchError::AlreadyFetching(fetching.into_mut()));
1106 }
1107 };
1108 debug_assert!(!session.is_fetching(&rid));
1111
1112 if !session.is_connected() {
1113 return Err(TryFetchError::SessionNotConnected);
1116 }
1117 if session.is_at_capacity() {
1118 return Err(TryFetchError::SessionCapacityReached);
1120 }
1121
1122 let fetching = fetching.insert(FetchState {
1123 from,
1124 refs_at: refs_at.clone(),
1125 subscribers: vec![],
1126 });
1127 self.outbox.fetch(
1128 session,
1129 rid,
1130 refs_at,
1131 timeout,
1132 self.config.limits.fetch_pack_receive,
1133 );
1134
1135 Ok(fetching)
1136 }
1137
1138 pub fn fetched(
1139 &mut self,
1140 rid: RepoId,
1141 remote: NodeId,
1142 result: Result<crate::worker::fetch::FetchResult, crate::worker::FetchError>,
1143 ) {
1144 let Some(fetching) = self.fetching.remove(&rid) else {
1145 error!(target: "service", "Received unexpected fetch result for {rid}, from {remote}");
1146 return;
1147 };
1148 debug_assert_eq!(fetching.from, remote);
1149
1150 if let Some(s) = self.sessions.get_mut(&remote) {
1151 s.fetched(rid);
1153 }
1154
1155 for sub in &fetching.subscribers {
1158 debug!(target: "service", "Found existing fetch request from {remote}, sending result..");
1159
1160 let result = match &result {
1161 Ok(success) => FetchResult::Success {
1162 updated: success.updated.clone(),
1163 namespaces: success.namespaces.clone(),
1164 clone: success.clone,
1165 },
1166 Err(e) => FetchResult::Failed {
1167 reason: e.to_string(),
1168 },
1169 };
1170 if sub.send(result).is_err() {
1171 error!(target: "service", "Error sending fetch result for {rid} from {remote}..");
1172 } else {
1173 debug!(target: "service", "Sent fetch result for {rid} from {remote}..");
1174 }
1175 }
1176
1177 match result {
1178 Ok(crate::worker::fetch::FetchResult {
1179 updated,
1180 canonical,
1181 namespaces,
1182 clone,
1183 doc,
1184 }) => {
1185 info!(target: "service", "Fetched {rid} from {remote} successfully");
1186 self.seed_discovered(rid, remote, self.clock.into());
1189
1190 for update in &updated {
1191 if update.is_skipped() {
1192 trace!(target: "service", "Ref skipped: {update} for {rid}");
1193 } else {
1194 debug!(target: "service", "Ref updated: {update} for {rid}");
1195 }
1196 }
1197 self.emitter.emit(Event::RefsFetched {
1198 remote,
1199 rid,
1200 updated: updated.clone(),
1201 });
1202 self.emitter.emit_all(
1203 canonical
1204 .into_iter()
1205 .map(|(refname, target)| Event::CanonicalRefUpdated {
1206 rid,
1207 refname,
1208 target,
1209 })
1210 .collect(),
1211 );
1212
1213 if clone && doc.is_public() {
1216 debug!(target: "service", "Updating and announcing inventory for cloned repository {rid}..");
1217
1218 if let Err(e) = self.add_inventory(rid) {
1219 error!(target: "service", "Error announcing inventory for {rid}: {e}");
1220 }
1221 }
1222
1223 if updated.is_empty() || updated.iter().all(|u| u.is_skipped()) {
1225 debug!(target: "service", "Nothing to announce, no refs were updated..");
1226 } else {
1227 if let Err(e) = self.announce_refs(rid, doc.into(), namespaces) {
1230 error!(target: "service", "Failed to announce new refs: {e}");
1231 }
1232 }
1233 }
1234 Err(err) => {
1235 error!(target: "service", "Fetch failed for {rid} from {remote}: {err}");
1236
1237 if err.is_timeout() {
1240 self.outbox.disconnect(remote, DisconnectReason::Fetch(err));
1241 }
1242 }
1243 }
1244 self.dequeue_fetches();
1246 }
1247
1248 pub fn dequeue_fetches(&mut self) {
1256 let sessions = self
1257 .sessions
1258 .shuffled()
1259 .map(|(k, _)| *k)
1260 .collect::<Vec<_>>();
1261
1262 for nid in sessions {
1264 #[allow(clippy::unwrap_used)]
1266 let sess = self.sessions.get_mut(&nid).unwrap();
1267 if !sess.is_connected() || sess.is_at_capacity() {
1268 continue;
1269 }
1270
1271 if let Some(QueuedFetch {
1272 rid,
1273 from,
1274 refs_at,
1275 timeout,
1276 channel,
1277 }) = sess.dequeue_fetch()
1278 {
1279 debug!(target: "service", "Dequeued fetch for {rid} from session {from}..");
1280
1281 if let Some(refs) = NonEmpty::from_vec(refs_at) {
1282 let repo_entry = self.policies.seed_policy(&rid).expect(
1283 "Service::dequeue_fetch: error accessing repo seeding configuration",
1284 );
1285 let SeedingPolicy::Allow { scope } = repo_entry.policy else {
1286 debug!(target: "service", "Repository {rid} is no longer seeded, skipping..");
1287 continue;
1288 };
1289 self.fetch_refs_at(rid, from, refs, scope, timeout, channel);
1290 } else {
1291 self.fetch(rid, from, timeout, channel);
1293 }
1294 }
1295 }
1296 }
1297
1298 pub fn accepted(&mut self, ip: IpAddr) -> bool {
1300 if ip.is_loopback() || ip.is_unspecified() {
1303 return true;
1304 }
1305 if self.sessions.inbound().count() >= self.config.limits.connection.inbound.into() {
1307 return false;
1308 }
1309 match self.db.addresses().is_ip_banned(ip) {
1310 Ok(banned) => {
1311 if banned {
1312 debug!(target: "service", "Rejecting inbound connection from banned ip {ip}");
1313 return false;
1314 }
1315 }
1316 Err(e) => error!(target: "service", "Error querying ban status for {ip}: {e}"),
1317 }
1318 let host: HostName = ip.into();
1319 let tokens = RateLimit::from(self.config.limits.rate.inbound.clone());
1320
1321 if self.limiter.limit(host.clone(), None, &tokens, self.clock) {
1322 trace!(target: "service", "Rate limiting inbound connection from {host}..");
1323 return false;
1324 }
1325 true
1326 }
1327
1328 pub fn attempted(&mut self, nid: NodeId, addr: Address) {
1329 debug!(target: "service", "Attempted connection to {nid} ({addr})");
1330
1331 if let Some(sess) = self.sessions.get_mut(&nid) {
1332 sess.to_attempted();
1333 } else {
1334 #[cfg(debug_assertions)]
1335 panic!("Service::attempted: unknown session {nid}@{addr}");
1336 }
1337 }
1338
1339 pub fn listening(&mut self, local_addr: net::SocketAddr) {
1340 info!(target: "node", "Listening on {local_addr}..");
1341
1342 self.listening.push(local_addr);
1343 }
1344
1345 pub fn connected(&mut self, remote: NodeId, addr: Address, link: Link) {
1346 info!(target: "service", "Connected to {remote} ({addr}) ({link:?})");
1347 self.emitter.emit(Event::PeerConnected { nid: remote });
1348
1349 let msgs = self.initial(link);
1350
1351 if link.is_outbound() {
1352 if let Some(peer) = self.sessions.get_mut(&remote) {
1353 peer.to_connected(self.clock);
1354 self.outbox.write_all(peer, msgs);
1355 }
1356 } else {
1357 match self.sessions.entry(remote) {
1358 Entry::Occupied(mut e) => {
1359 let peer = e.get_mut();
1369 debug!(
1370 target: "service",
1371 "Connecting peer {remote} already has a session open ({peer})"
1372 );
1373 peer.link = link;
1374 peer.to_connected(self.clock);
1375 self.outbox.write_all(peer, msgs);
1376 }
1377 Entry::Vacant(e) => {
1378 if let HostName::Ip(ip) = addr.host {
1379 if !address::is_local(&ip) {
1380 if let Err(e) =
1381 self.db
1382 .addresses_mut()
1383 .record_ip(&remote, ip, self.clock.into())
1384 {
1385 log::error!(target: "service", "Error recording IP address for {remote}: {e}");
1386 }
1387 }
1388 }
1389 let peer = e.insert(Session::inbound(
1390 remote,
1391 addr,
1392 self.config.is_persistent(&remote),
1393 self.rng.clone(),
1394 self.clock,
1395 self.config.limits.clone(),
1396 ));
1397 self.outbox.write_all(peer, msgs);
1398 }
1399 }
1400 }
1401 }
1402
1403 pub fn disconnected(&mut self, remote: NodeId, link: Link, reason: &DisconnectReason) {
1404 let since = self.local_time();
1405 let Some(session) = self.sessions.get_mut(&remote) else {
1406 trace!(target: "service", "Redundant disconnection for {remote} ({reason})");
1409 return;
1410 };
1411 if session.link != link {
1414 return;
1415 }
1416
1417 info!(target: "service", "Disconnected from {remote} ({reason})");
1418 self.emitter.emit(Event::PeerDisconnected {
1419 nid: remote,
1420 reason: reason.to_string(),
1421 });
1422
1423 let link = session.link;
1424 let addr = session.addr.clone();
1425
1426 self.fetching.retain(|_, fetching| {
1427 if fetching.from != remote {
1428 return true;
1429 }
1430 for resp in &fetching.subscribers {
1432 resp.send(FetchResult::Failed {
1433 reason: format!("disconnected: {reason}"),
1434 })
1435 .ok();
1436 }
1437 false
1438 });
1439
1440 if self.config.peer(&remote).is_some() {
1442 let delay = LocalDuration::from_secs(2u64.saturating_pow(session.attempts() as u32))
1443 .clamp(MIN_RECONNECTION_DELTA, MAX_RECONNECTION_DELTA);
1444
1445 session.to_disconnected(since, since + delay);
1448
1449 debug!(target: "service", "Reconnecting to {remote} in {delay}..");
1450
1451 self.outbox.wakeup(delay);
1452 } else {
1453 debug!(target: "service", "Dropping peer {remote}..");
1454 self.sessions.remove(&remote);
1455
1456 let severity = match reason {
1457 DisconnectReason::Dial(_)
1458 | DisconnectReason::Fetch(_)
1459 | DisconnectReason::Connection(_) => {
1460 if self.is_online() {
1461 Severity::Medium
1464 } else {
1465 Severity::Low
1466 }
1467 }
1468 DisconnectReason::Session(e) => e.severity(),
1469 DisconnectReason::Command
1470 | DisconnectReason::Conflict
1471 | DisconnectReason::SelfConnection => Severity::Low,
1472 };
1473
1474 if let Err(e) = self
1475 .db
1476 .addresses_mut()
1477 .disconnected(&remote, &addr, severity)
1478 {
1479 error!(target: "service", "Error updating address store: {e}");
1480 }
1481 if link.is_outbound() {
1484 self.maintain_connections();
1485 }
1486 }
1487 self.dequeue_fetches();
1488 }
1489
1490 pub fn received_message(&mut self, remote: NodeId, message: Message) {
1491 if let Err(err) = self.handle_message(&remote, message) {
1492 self.outbox
1495 .disconnect(remote, DisconnectReason::Session(err));
1496
1497 }
1500 }
1501
1502 pub fn handle_announcement(
1507 &mut self,
1508 relayer: &NodeId,
1509 relayer_addr: &Address,
1510 announcement: &Announcement,
1511 ) -> Result<Option<gossip::AnnouncementId>, session::Error> {
1512 if !announcement.verify() {
1513 return Err(session::Error::Misbehavior);
1514 }
1515 let Announcement {
1516 node: announcer,
1517 message,
1518 ..
1519 } = announcement;
1520
1521 if announcer == self.nid() {
1523 return Ok(None);
1524 }
1525 let now = self.clock;
1526 let timestamp = message.timestamp();
1527
1528 if timestamp.saturating_sub(now.as_millis()) > MAX_TIME_DELTA.as_millis() as u64 {
1530 return Err(session::Error::InvalidTimestamp(timestamp));
1531 }
1532
1533 if let AnnouncementMessage::Inventory(_) | AnnouncementMessage::Refs(_) = message {
1542 match self.db.addresses().get(announcer) {
1543 Ok(node) => {
1544 if node.is_none() {
1545 debug!(target: "service", "Ignoring announcement from unknown node {announcer} (t={timestamp})");
1546 return Ok(None);
1547 }
1548 }
1549 Err(e) => {
1550 error!(target: "service", "Error looking up node in address book: {e}");
1551 return Ok(None);
1552 }
1553 }
1554 }
1555
1556 let relay = match self.db.gossip_mut().announced(announcer, announcement) {
1558 Ok(Some(id)) => {
1559 log::debug!(
1560 target: "service",
1561 "Stored announcement from {announcer} to be broadcast in {} (t={timestamp})",
1562 (self.last_gossip + GOSSIP_INTERVAL) - self.clock
1563 );
1564 self.relayed_by.entry(id).or_default().push(*relayer);
1566
1567 let relay = message.is_node_announcement()
1572 || now - timestamp.to_local_time() <= MAX_TIME_DELTA;
1573 relay.then_some(id)
1574 }
1575 Ok(None) => {
1576 debug!(target: "service", "Ignoring stale announcement from {announcer} (t={timestamp})");
1580 return Ok(None);
1581 }
1582 Err(e) => {
1583 error!(target: "service", "Error updating gossip entry from {announcer}: {e}");
1584 return Ok(None);
1585 }
1586 };
1587
1588 match message {
1589 AnnouncementMessage::Inventory(message) => {
1591 self.emitter.emit(Event::InventoryAnnounced {
1592 nid: *announcer,
1593 inventory: message.inventory.to_vec(),
1594 timestamp: message.timestamp,
1595 });
1596 match self.sync_routing(
1597 message.inventory.iter().cloned(),
1598 *announcer,
1599 message.timestamp,
1600 ) {
1601 Ok(synced) => {
1602 if synced.is_empty() {
1603 trace!(target: "service", "No routes updated by inventory announcement from {announcer}");
1604 return Ok(None);
1605 }
1606 }
1607 Err(e) => {
1608 error!(target: "service", "Error processing inventory from {announcer}: {e}");
1609 return Ok(None);
1610 }
1611 }
1612 let mut missing = Vec::new();
1613 let nid = *self.nid();
1614
1615 if let Some(sess) = self.sessions.get_mut(announcer) {
1618 for id in message.inventory.as_slice() {
1619 if let Some(sub) = &mut sess.subscribe {
1623 sub.filter.insert(id);
1624 }
1625
1626 if self.policies.is_seeding(id).expect(
1629 "Service::handle_announcement: error accessing seeding configuration",
1630 ) {
1631 match self.db.routing().entry(id, &nid) {
1634 Ok(entry) => {
1635 if entry.is_none() {
1636 missing.push(*id);
1637 }
1638 }
1639 Err(e) => error!(
1640 target: "service",
1641 "Error checking local inventory for {id}: {e}"
1642 ),
1643 }
1644 }
1645 }
1646 }
1647 self.rng.shuffle(&mut missing);
1652
1653 for rid in missing {
1654 debug!(target: "service", "Missing seeded inventory {rid}; initiating fetch..");
1655 self.fetch(rid, *announcer, FETCH_TIMEOUT, None);
1656 }
1657 return Ok(relay);
1658 }
1659 AnnouncementMessage::Refs(message) => {
1660 self.emitter.emit(Event::RefsAnnounced {
1661 nid: *announcer,
1662 rid: message.rid,
1663 refs: message.refs.to_vec(),
1664 timestamp: message.timestamp,
1665 });
1666 let Some(refs) = NonEmpty::from_vec(message.refs.to_vec()) else {
1668 debug!(target: "service", "Skipping fetch, no refs in announcement for {} (t={timestamp})", message.rid);
1669 return Ok(None);
1670 };
1671 self.seed_discovered(message.rid, *announcer, message.timestamp);
1674
1675 if let Some(refs) = refs.iter().find(|r| &r.remote == self.nid()) {
1677 debug!(
1678 target: "service",
1679 "Refs announcement of {announcer} for {} contains our own remote at {} (t={})",
1680 message.rid, refs.at, message.timestamp
1681 );
1682 match self.db.seeds_mut().synced(
1683 &message.rid,
1684 announcer,
1685 refs.at,
1686 message.timestamp,
1687 ) {
1688 Ok(updated) => {
1689 if updated {
1690 debug!(
1691 target: "service",
1692 "Updating sync status of {announcer} for {} to {}",
1693 message.rid, refs.at
1694 );
1695 self.emitter.emit(Event::RefsSynced {
1696 rid: message.rid,
1697 remote: *announcer,
1698 at: refs.at,
1699 });
1700 } else {
1701 debug!(
1702 target: "service",
1703 "Sync status of {announcer} was not updated for {}",
1704 message.rid,
1705 );
1706 }
1707 }
1708 Err(e) => {
1709 error!(target: "service", "Error updating sync status for {}: {e}", message.rid);
1710 }
1711 }
1712 }
1713 let repo_entry = self.policies.seed_policy(&message.rid).expect(
1714 "Service::handle_announcement: error accessing repo seeding configuration",
1715 );
1716 let SeedingPolicy::Allow { scope } = repo_entry.policy else {
1717 debug!(
1718 target: "service",
1719 "Ignoring refs announcement from {announcer}: repository {} isn't seeded (t={timestamp})",
1720 message.rid
1721 );
1722 return Ok(None);
1723 };
1724 let Some(remote) = self.sessions.get(announcer).cloned() else {
1728 trace!(
1729 target: "service",
1730 "Skipping fetch of {}, no sessions connected to {announcer}",
1731 message.rid
1732 );
1733 return Ok(relay);
1734 };
1735 self.fetch_refs_at(message.rid, remote.id, refs, scope, FETCH_TIMEOUT, None);
1737
1738 return Ok(relay);
1739 }
1740 AnnouncementMessage::Node(
1741 ann @ NodeAnnouncement {
1742 features,
1743 addresses,
1744 ..
1745 },
1746 ) => {
1747 self.emitter.emit(Event::NodeAnnounced {
1748 nid: *announcer,
1749 alias: ann.alias.clone(),
1750 timestamp: ann.timestamp,
1751 features: *features,
1752 addresses: addresses.to_vec(),
1753 });
1754 if !features.has(Features::SEED) {
1757 return Ok(relay);
1758 }
1759
1760 match self.db.addresses_mut().insert(
1761 announcer,
1762 ann.version,
1763 ann.features,
1764 &ann.alias,
1765 ann.work(),
1766 &ann.agent,
1767 timestamp,
1768 addresses
1769 .iter()
1770 .filter(|a| a.is_routable() || relayer_addr.is_local())
1773 .map(|a| KnownAddress::new(a.clone(), address::Source::Peer)),
1774 ) {
1775 Ok(updated) => {
1776 if updated {
1778 debug!(
1779 target: "service",
1780 "Address store entry for node {announcer} updated at {timestamp}"
1781 );
1782 return Ok(relay);
1783 }
1784 }
1785 Err(err) => {
1786 error!(target: "service", "Error processing node announcement from {announcer}: {err}");
1788 }
1789 }
1790 }
1791 }
1792 Ok(None)
1793 }
1794
1795 pub fn handle_info(&mut self, remote: NodeId, info: &Info) -> Result<(), session::Error> {
1796 match info {
1797 Info::RefsAlreadySynced { rid, at } => {
1799 debug!(target: "service", "Refs already synced for {rid} by {remote}");
1800 self.emitter.emit(Event::RefsSynced {
1801 rid: *rid,
1802 remote,
1803 at: *at,
1804 });
1805 }
1806 }
1807
1808 Ok(())
1809 }
1810
1811 pub fn handle_message(
1812 &mut self,
1813 remote: &NodeId,
1814 message: Message,
1815 ) -> Result<(), session::Error> {
1816 let local = self.node_id();
1817 let relay = self.config.is_relay();
1818 let Some(peer) = self.sessions.get_mut(remote) else {
1819 warn!(target: "service", "Session not found for {remote}");
1820 return Ok(());
1821 };
1822 peer.last_active = self.clock;
1823
1824 let limit: RateLimit = match peer.link {
1825 Link::Outbound => self.config.limits.rate.outbound.clone().into(),
1826 Link::Inbound => self.config.limits.rate.inbound.clone().into(),
1827 };
1828 if self
1829 .limiter
1830 .limit(peer.addr.clone().into(), Some(remote), &limit, self.clock)
1831 {
1832 debug!(target: "service", "Rate limiting message from {remote} ({})", peer.addr);
1833 return Ok(());
1834 }
1835 message.log(log::Level::Debug, remote, Link::Inbound);
1836
1837 let connected = match &mut peer.state {
1838 session::State::Disconnected { .. } => {
1839 debug!(target: "service", "Ignoring message from disconnected peer {}", peer.id);
1840 return Ok(());
1841 }
1842 session::State::Attempted | session::State::Initial => {
1849 debug!(target: "service", "Received unexpected message from connecting peer {}", peer.id);
1850 debug!(target: "service", "Transitioning peer {} to 'connected' state", peer.id);
1851
1852 peer.to_connected(self.clock);
1853
1854 None
1855 }
1856 session::State::Connected {
1857 ping, latencies, ..
1858 } => Some((ping, latencies)),
1859 };
1860
1861 trace!(target: "service", "Received message {message:?} from {remote}");
1862
1863 match message {
1864 Message::Announcement(ann) => {
1866 let relayer = remote;
1867 let relayer_addr = peer.addr.clone();
1868
1869 if let Some(id) = self.handle_announcement(relayer, &relayer_addr, &ann)? {
1870 if self.config.is_relay() {
1871 if let AnnouncementMessage::Inventory(_) = ann.message {
1872 if let Err(e) = self
1873 .database_mut()
1874 .gossip_mut()
1875 .set_relay(id, gossip::RelayStatus::Relay)
1876 {
1877 error!(target: "service", "Error setting relay flag for message: {e}");
1878 return Ok(());
1879 }
1880 } else {
1881 self.relay(id, ann);
1882 }
1883 }
1884 }
1885 }
1886 Message::Subscribe(subscribe) => {
1887 match self
1889 .db
1890 .gossip()
1891 .filtered(&subscribe.filter, subscribe.since, subscribe.until)
1892 {
1893 Ok(anns) => {
1894 for ann in anns {
1895 let ann = match ann {
1896 Ok(a) => a,
1897 Err(e) => {
1898 error!(target: "service", "Error reading gossip message from store: {e}");
1899 continue;
1900 }
1901 };
1902 if ann.node == *remote {
1904 continue;
1905 }
1906 if relay || ann.node == local {
1908 self.outbox.write(peer, ann.into());
1909 }
1910 }
1911 }
1912 Err(e) => {
1913 error!(target: "service", "Error querying gossip messages from store: {e}");
1914 }
1915 }
1916 peer.subscribe = Some(subscribe);
1917 }
1918 Message::Info(info) => {
1919 self.handle_info(*remote, &info)?;
1920 }
1921 Message::Ping(Ping { ponglen, .. }) => {
1922 if ponglen > Ping::MAX_PONG_ZEROES {
1924 return Ok(());
1925 }
1926 self.outbox.write(
1927 peer,
1928 Message::Pong {
1929 zeroes: ZeroBytes::new(ponglen),
1930 },
1931 );
1932 }
1933 Message::Pong { zeroes } => {
1934 if let Some((ping, latencies)) = connected {
1935 if let session::PingState::AwaitingResponse {
1936 len: ponglen,
1937 since,
1938 } = *ping
1939 {
1940 if (ponglen as usize) == zeroes.len() {
1941 *ping = session::PingState::Ok;
1942 latencies.push_back(self.clock - since);
1944 if latencies.len() > MAX_LATENCIES {
1945 latencies.pop_front();
1946 }
1947 }
1948 }
1949 }
1950 }
1951 }
1952 Ok(())
1953 }
1954
1955 fn refs_status_of(
1957 &self,
1958 rid: RepoId,
1959 refs: NonEmpty<RefsAt>,
1960 scope: &policy::Scope,
1961 ) -> Result<RefsStatus, Error> {
1962 let mut refs = RefsStatus::new(rid, refs, self.db.refs())?;
1963 if refs.want.is_empty() {
1965 return Ok(refs);
1966 }
1967 let mut refs = match scope {
1969 policy::Scope::All => refs,
1970 policy::Scope::Followed => match self.policies.namespaces_for(&self.storage, &rid) {
1971 Ok(Namespaces::All) => refs,
1972 Ok(Namespaces::Followed(followed)) => {
1973 refs.want.retain(|r| followed.contains(&r.remote));
1974 refs
1975 }
1976 Err(e) => return Err(e.into()),
1977 },
1978 };
1979 refs.want.retain(|r| r.remote != self.node_id());
1981
1982 Ok(refs)
1983 }
1984
1985 fn seed_discovered(&mut self, rid: RepoId, nid: NodeId, time: Timestamp) {
1987 if let Ok(result) = self.db.routing_mut().add_inventory([&rid], nid, time) {
1988 if let &[(_, InsertResult::SeedAdded)] = result.as_slice() {
1989 self.emitter.emit(Event::SeedDiscovered { rid, nid });
1990 debug!(target: "service", "Routing table updated for {rid} with seed {nid}");
1991 }
1992 }
1993 }
1994
1995 fn initial(&mut self, _link: Link) -> Vec<Message> {
1997 let now = self.clock();
1998 let filter = self.filter();
1999
2000 let since = if let Some(last) = self.last_online_at {
2010 Timestamp::from(last - SUBSCRIBE_BACKLOG_DELTA)
2011 } else {
2012 (*now - INITIAL_SUBSCRIBE_BACKLOG_DELTA).into()
2013 };
2014 debug!(target: "service", "Subscribing to messages since timestamp {since}..");
2015
2016 vec![
2017 Message::node(self.node.clone(), &self.signer),
2018 Message::inventory(self.inventory.clone(), &self.signer),
2019 Message::subscribe(filter, since, Timestamp::MAX),
2020 ]
2021 }
2022
2023 fn is_online(&self) -> bool {
2025 self.sessions
2026 .connected()
2027 .filter(|(_, s)| s.addr.is_routable() && s.last_active >= self.clock - IDLE_INTERVAL)
2028 .count()
2029 > 0
2030 }
2031
2032 fn remove_inventory(&mut self, rid: &RepoId) -> Result<bool, Error> {
2034 let node = self.node_id();
2035 let now = self.timestamp();
2036
2037 let removed = self.db.routing_mut().remove_inventory(rid, &node)?;
2038 if removed {
2039 self.refresh_and_announce_inventory(now)?;
2040 }
2041 Ok(removed)
2042 }
2043
2044 fn add_inventory(&mut self, rid: RepoId) -> Result<bool, Error> {
2046 let node = self.node_id();
2047 let now = self.timestamp();
2048
2049 if !self.storage.contains(&rid)? {
2050 error!(target: "service", "Attempt to add non-existing inventory {rid}: repository not found in storage");
2051 return Ok(false);
2052 }
2053 let updates = self.db.routing_mut().add_inventory([&rid], node, now)?;
2055 let updated = !updates.is_empty();
2056
2057 if updated {
2058 self.refresh_and_announce_inventory(now)?;
2059 }
2060 Ok(updated)
2061 }
2062
2063 fn refresh_and_announce_inventory(&mut self, time: Timestamp) -> Result<(), Error> {
2065 let inventory = self.inventory()?;
2066
2067 self.inventory = gossip::inventory(time, inventory);
2068 self.announce_inventory();
2069
2070 Ok(())
2071 }
2072
2073 fn inventory(&self) -> Result<HashSet<RepoId>, Error> {
2084 self.db
2085 .routing()
2086 .get_inventory(self.nid())
2087 .map_err(Error::from)
2088 }
2089
2090 fn sync_routing(
2094 &mut self,
2095 inventory: impl IntoIterator<Item = RepoId>,
2096 from: NodeId,
2097 timestamp: Timestamp,
2098 ) -> Result<SyncedRouting, Error> {
2099 let mut synced = SyncedRouting::default();
2100 let included = inventory.into_iter().collect::<BTreeSet<_>>();
2101
2102 for (rid, result) in
2103 self.db
2104 .routing_mut()
2105 .add_inventory(included.iter(), from, timestamp)?
2106 {
2107 match result {
2108 InsertResult::SeedAdded => {
2109 debug!(target: "service", "Routing table updated for {rid} with seed {from}");
2110 self.emitter.emit(Event::SeedDiscovered { rid, nid: from });
2111
2112 if self
2113 .policies
2114 .is_seeding(&rid)
2115 .expect("Service::process_inventory: error accessing seeding configuration")
2116 {
2117 }
2120 synced.added.push(rid);
2121 }
2122 InsertResult::TimeUpdated => {
2123 synced.updated.push(rid);
2124 }
2125 InsertResult::NotUpdated => {}
2126 }
2127 }
2128 for rid in self.db.routing().get_inventory(&from)?.into_iter() {
2129 if !included.contains(&rid) {
2130 if self.db.routing_mut().remove_inventory(&rid, &from)? {
2131 synced.removed.push(rid);
2132 self.emitter.emit(Event::SeedDropped { rid, nid: from });
2133 }
2134 }
2135 }
2136 Ok(synced)
2137 }
2138
2139 fn refs_announcement_for(
2141 &mut self,
2142 rid: RepoId,
2143 remotes: impl IntoIterator<Item = NodeId>,
2144 ) -> Result<(Announcement, Vec<RefsAt>), Error> {
2145 let repo = self.storage.repository(rid)?;
2146 let timestamp = self.timestamp();
2147 let mut refs = BoundedVec::<_, REF_REMOTE_LIMIT>::new();
2148
2149 for remote_id in remotes.into_iter() {
2150 let refs_at = RefsAt::new(&repo, remote_id)?;
2151
2152 if refs.push(refs_at).is_err() {
2153 warn!(
2154 target: "service",
2155 "refs announcement limit ({REF_REMOTE_LIMIT}) exceeded, peers will see only some of your repository references",
2156 );
2157 break;
2158 }
2159 }
2160
2161 let msg = AnnouncementMessage::from(RefsAnnouncement {
2162 rid,
2163 refs: refs.clone(),
2164 timestamp,
2165 });
2166 Ok((msg.signed(&self.signer), refs.into()))
2167 }
2168
2169 fn announce_own_refs(&mut self, rid: RepoId, doc: Doc) -> Result<Vec<RefsAt>, Error> {
2171 let (refs, timestamp) = self.announce_refs(rid, doc, [self.node_id()])?;
2172
2173 if let &[r] = refs.as_slice() {
2177 self.emitter.emit(Event::LocalRefsAnnounced {
2178 rid,
2179 refs: r,
2180 timestamp,
2181 });
2182 if let Err(e) = self.database_mut().refs_mut().set(
2183 &rid,
2184 &r.remote,
2185 &SIGREFS_BRANCH,
2186 r.at,
2187 timestamp.to_local_time(),
2188 ) {
2189 error!(
2190 target: "service",
2191 "Error updating refs database for `rad/sigrefs` of {} in {rid}: {e}",
2192 r.remote
2193 );
2194 }
2195 }
2196 Ok(refs)
2197 }
2198
2199 fn announce_refs(
2201 &mut self,
2202 rid: RepoId,
2203 doc: Doc,
2204 remotes: impl IntoIterator<Item = NodeId>,
2205 ) -> Result<(Vec<RefsAt>, Timestamp), Error> {
2206 let (ann, refs) = self.refs_announcement_for(rid, remotes)?;
2207 let timestamp = ann.timestamp();
2208 let peers = self.sessions.connected().map(|(_, p)| p);
2209
2210 if let Some(refs) = refs.iter().find(|r| r.remote == ann.node) {
2213 info!(
2214 target: "service",
2215 "Announcing own refs for {rid} to peers ({}) (t={timestamp})..",
2216 refs.at
2217 );
2218 if let Err(e) = self
2220 .db
2221 .seeds_mut()
2222 .synced(&rid, &ann.node, refs.at, timestamp)
2223 {
2224 error!(target: "service", "Error updating sync status for local node: {e}");
2225 } else {
2226 debug!(target: "service", "Saved local sync status for {rid}..");
2227 }
2228 }
2229
2230 self.outbox.announce(
2231 ann,
2232 peers.filter(|p| {
2233 doc.is_visible_to(&p.id.into())
2235 }),
2236 self.db.gossip_mut(),
2237 );
2238 Ok((refs, timestamp))
2239 }
2240
2241 fn reconnect(&mut self, nid: NodeId, addr: Address) -> bool {
2242 if let Some(sess) = self.sessions.get_mut(&nid) {
2243 sess.to_initial();
2244 self.outbox.connect(nid, addr);
2245
2246 return true;
2247 }
2248 false
2249 }
2250
2251 fn connect(&mut self, nid: NodeId, addr: Address) -> Result<(), ConnectError> {
2252 debug!(target: "service", "Connecting to {nid} ({addr})..");
2253
2254 if nid == self.node_id() {
2255 return Err(ConnectError::SelfConnection);
2256 }
2257 if self.sessions.contains_key(&nid) {
2258 return Err(ConnectError::SessionExists { nid });
2259 }
2260 if self.sessions.outbound().count() >= self.config.limits.connection.outbound.into() {
2261 return Err(ConnectError::LimitReached { nid, addr });
2262 }
2263 let persistent = self.config.is_persistent(&nid);
2264 let timestamp: Timestamp = self.clock.into();
2265
2266 if let Err(e) = self.db.addresses_mut().attempted(&nid, &addr, timestamp) {
2267 error!(target: "service", "Error updating address book with connection attempt: {e}");
2268 }
2269 self.sessions.insert(
2270 nid,
2271 Session::outbound(
2272 nid,
2273 addr.clone(),
2274 persistent,
2275 self.rng.clone(),
2276 self.config.limits.clone(),
2277 ),
2278 );
2279 self.outbox.connect(nid, addr);
2280
2281 Ok(())
2282 }
2283
2284 fn seeds(&self, rid: &RepoId) -> Result<Seeds, Error> {
2285 let mut seeds = Seeds::new(self.rng.clone());
2286
2287 if let Ok(repo) = self.storage.repository(*rid) {
2291 if let Ok(local) = RefsAt::new(&repo, self.node_id()) {
2292 for seed in self.db.seeds().seeds_for(rid)? {
2293 let seed = seed?;
2294 let state = self.sessions.get(&seed.nid).map(|s| s.state.clone());
2295 let synced = if local.at == seed.synced_at.oid {
2296 SyncStatus::Synced { at: seed.synced_at }
2297 } else {
2298 let local = SyncedAt::new(local.at, &repo)?;
2299
2300 SyncStatus::OutOfSync {
2301 local,
2302 remote: seed.synced_at,
2303 }
2304 };
2305 seeds.insert(Seed::new(seed.nid, seed.addresses, state, Some(synced)));
2306 }
2307 }
2308 }
2309
2310 for nid in self.db.routing().get(rid)? {
2314 if nid == self.node_id() {
2315 continue;
2316 }
2317 if seeds.contains(&nid) {
2318 continue;
2320 }
2321 let addrs = self.db.addresses().addresses_of(&nid)?;
2322 let state = self.sessions.get(&nid).map(|s| s.state.clone());
2323
2324 seeds.insert(Seed::new(nid, addrs, state, None));
2325 }
2326 Ok(seeds)
2327 }
2328
2329 fn filter(&self) -> Filter {
2331 if self.config.seeding_policy.is_allow() {
2332 Filter::default()
2334 } else {
2335 self.filter.clone()
2336 }
2337 }
2338
2339 fn timestamp(&mut self) -> Timestamp {
2342 let now = Timestamp::from(self.clock);
2343 if *now > *self.last_timestamp {
2344 self.last_timestamp = now;
2345 } else {
2346 self.last_timestamp = self.last_timestamp + 1;
2347 }
2348 self.last_timestamp
2349 }
2350
2351 fn relay(&mut self, id: gossip::AnnouncementId, ann: Announcement) {
2352 let announcer = ann.node;
2353 let relayed_by = self.relayed_by.get(&id);
2354 let rid = if let AnnouncementMessage::Refs(RefsAnnouncement { rid, .. }) = ann.message {
2355 Some(rid)
2356 } else {
2357 None
2358 };
2359 let relay_to = self
2363 .sessions
2364 .connected()
2365 .filter(|(id, _)| {
2366 relayed_by
2367 .map(|relayers| !relayers.contains(id))
2368 .unwrap_or(true) })
2370 .filter(|(id, _)| **id != announcer)
2371 .filter(|(id, _)| {
2372 if let Some(rid) = rid {
2373 self.storage
2377 .get(rid)
2378 .ok()
2379 .flatten()
2380 .map(|doc| doc.is_visible_to(&(*id).into()))
2381 .unwrap_or(false)
2382 } else {
2383 true
2385 }
2386 })
2387 .map(|(_, p)| p);
2388
2389 self.outbox.relay(ann, relay_to);
2390 }
2391
2392 fn relay_announcements(&mut self) -> Result<(), Error> {
2397 let now = self.clock.into();
2398 let rows = self.database_mut().gossip_mut().relays(now)?;
2399 let local = self.node_id();
2400
2401 for (id, msg) in rows {
2402 let announcer = msg.node;
2403 if announcer == local {
2404 continue;
2406 }
2407 self.relay(id, msg);
2408 }
2409 Ok(())
2410 }
2411
2412 fn announce_inventory(&mut self) {
2414 let timestamp = self.inventory.timestamp.to_local_time();
2415
2416 if self.last_inventory == timestamp {
2417 debug!(target: "service", "Skipping redundant inventory announcement (t={})", self.inventory.timestamp);
2418 return;
2419 }
2420 let msg = AnnouncementMessage::from(self.inventory.clone());
2421
2422 self.outbox.announce(
2423 msg.signed(&self.signer),
2424 self.sessions.connected().map(|(_, p)| p),
2425 self.db.gossip_mut(),
2426 );
2427 self.last_inventory = timestamp;
2428 }
2429
2430 fn prune_routing_entries(&mut self, now: &LocalTime) -> Result<(), routing::Error> {
2431 let count = self.db.routing().len()?;
2432 if count <= self.config.limits.routing_max_size.into() {
2433 return Ok(());
2434 }
2435
2436 let delta = count - usize::from(self.config.limits.routing_max_size);
2437 let nid = self.node_id();
2438 self.db.routing_mut().prune(
2439 (*now - LocalDuration::from(self.config.limits.routing_max_age)).into(),
2440 Some(delta),
2441 &nid,
2442 )?;
2443 Ok(())
2444 }
2445
2446 fn disconnect_unresponsive_peers(&mut self, now: &LocalTime) {
2447 let stale = self
2448 .sessions
2449 .connected()
2450 .filter(|(_, session)| *now - session.last_active >= STALE_CONNECTION_TIMEOUT);
2451
2452 for (_, session) in stale {
2453 debug!(target: "service", "Disconnecting unresponsive peer {}..", session.id);
2454
2455 self.outbox.disconnect(
2459 session.id,
2460 DisconnectReason::Session(session::Error::Timeout),
2461 );
2462 }
2463 }
2464
2465 fn keep_alive(&mut self, now: &LocalTime) {
2467 let inactive_sessions = self
2468 .sessions
2469 .connected_mut()
2470 .filter(|(_, session)| *now - session.last_active >= KEEP_ALIVE_DELTA)
2471 .map(|(_, session)| session);
2472 for session in inactive_sessions {
2473 session.ping(self.clock, &mut self.outbox).ok();
2474 }
2475 }
2476
2477 fn available_peers(&mut self) -> Vec<Peer> {
2479 match self.db.addresses().entries() {
2480 Ok(entries) => {
2481 let mut peers = entries
2484 .filter(|entry| entry.version == PROTOCOL_VERSION)
2485 .filter(|entry| !entry.address.banned)
2486 .filter(|entry| !entry.penalty.is_connect_threshold_reached())
2487 .filter(|entry| !self.sessions.contains_key(&entry.node))
2488 .filter(|entry| !self.config.external_addresses.contains(&entry.address.addr))
2489 .filter(|entry| &entry.node != self.nid())
2490 .fold(HashMap::new(), |mut acc, entry| {
2491 acc.entry(entry.node)
2492 .and_modify(|e: &mut Peer| e.addresses.push(entry.address.clone()))
2493 .or_insert_with(|| Peer {
2494 nid: entry.node,
2495 addresses: vec![entry.address],
2496 penalty: entry.penalty,
2497 });
2498 acc
2499 })
2500 .into_values()
2501 .collect::<Vec<_>>();
2502 peers.sort_by_key(|p| p.penalty);
2503 peers
2504 }
2505 Err(e) => {
2506 error!(target: "service", "Unable to lookup available peers in address book: {e}");
2507 Vec::new()
2508 }
2509 }
2510 }
2511
2512 fn fetch_missing_repositories(&mut self) -> Result<(), Error> {
2514 let policies = self.policies.seed_policies()?.collect::<Vec<_>>();
2515 for policy in policies {
2516 let policy = match policy {
2517 Ok(policy) => policy,
2518 Err(err) => {
2519 log::error!(target: "protocol::filter", "Failed to read seed policy: {err}");
2520 continue;
2521 }
2522 };
2523
2524 let rid = policy.rid;
2525
2526 if !policy.is_allow() {
2527 continue;
2528 }
2529 if self.storage.contains(&rid)? {
2530 continue;
2531 }
2532 match self.seeds(&rid) {
2533 Ok(seeds) => {
2534 if let Some(connected) = NonEmpty::from_vec(seeds.connected().collect()) {
2535 for seed in connected {
2536 self.fetch(rid, seed.nid, FETCH_TIMEOUT, None);
2537 }
2538 } else {
2539 debug!(target: "service", "No connected seeds found for {rid}..");
2549 }
2550 }
2551 Err(e) => {
2552 error!(target: "service", "Couldn't fetch missing repo {rid}: failed to lookup seeds: {e}");
2553 }
2554 }
2555 }
2556 Ok(())
2557 }
2558
2559 fn idle_connections(&mut self) {
2561 for (_, sess) in self.sessions.iter_mut() {
2562 sess.idle(self.clock);
2563
2564 if sess.is_stable() {
2565 if let Err(e) =
2567 self.db
2568 .addresses_mut()
2569 .connected(&sess.id, &sess.addr, self.clock.into())
2570 {
2571 error!(target: "service", "Error updating address book with connection: {e}");
2572 }
2573 }
2574 }
2575 }
2576
2577 fn maintain_connections(&mut self) {
2579 let PeerConfig::Dynamic = self.config.peers else {
2580 return;
2581 };
2582 trace!(target: "service", "Maintaining connections..");
2583
2584 let target = TARGET_OUTBOUND_PEERS;
2585 let now = self.clock;
2586 let outbound = self
2587 .sessions
2588 .values()
2589 .filter(|s| s.link.is_outbound())
2590 .filter(|s| s.is_connected() || s.is_connecting())
2591 .count();
2592 let wanted = target.saturating_sub(outbound);
2593
2594 if wanted == 0 {
2596 return;
2597 }
2598
2599 let available = self
2601 .available_peers()
2602 .into_iter()
2603 .filter_map(|peer| {
2604 peer.addresses
2605 .into_iter()
2606 .find(|ka| match (ka.last_success, ka.last_attempt) {
2607 (Some(success), Some(attempt)) => {
2610 success >= attempt || now - attempt >= CONNECTION_RETRY_DELTA
2611 }
2612 (None, Some(attempt)) => now - attempt >= CONNECTION_RETRY_DELTA,
2614 (_, None) => true,
2616 })
2617 .map(|ka| (peer.nid, ka))
2618 })
2619 .filter(|(_, ka)| match AddressType::from(&ka.addr) {
2620 AddressType::Onion => self.config.onion.is_some(),
2622 AddressType::Dns | AddressType::Ipv4 | AddressType::Ipv6 => true,
2623 });
2624
2625 let connect = available.take(wanted).collect::<Vec<_>>();
2627 if connect.len() < wanted {
2628 log::debug!(
2629 target: "service",
2630 "Not enough available peers to connect to (available={}, wanted={wanted})",
2631 connect.len()
2632 );
2633 }
2634 for (id, ka) in connect {
2635 if let Err(e) = self.connect(id, ka.addr.clone()) {
2636 error!(target: "service", "Service::maintain_connections connection error: {e}");
2637 }
2638 }
2639 }
2640
2641 fn maintain_persistent(&mut self) {
2643 trace!(target: "service", "Maintaining persistent peers..");
2644
2645 let now = self.local_time();
2646 let mut reconnect = Vec::new();
2647
2648 for (nid, session) in self.sessions.iter_mut() {
2649 if let Some(addr) = self.config.peer(nid) {
2650 if let session::State::Disconnected { retry_at, .. } = &mut session.state {
2651 if now >= *retry_at {
2655 reconnect.push((*nid, addr.clone(), session.attempts()));
2656 }
2657 }
2658 }
2659 }
2660
2661 for (nid, addr, attempts) in reconnect {
2662 if self.reconnect(nid, addr) {
2663 debug!(target: "service", "Reconnecting to {nid} (attempts={attempts})...");
2664 }
2665 }
2666 }
2667}
2668
2669pub trait ServiceState {
2671 fn nid(&self) -> &NodeId;
2673 fn sessions(&self) -> &Sessions;
2675 fn fetching(&self) -> &HashMap<RepoId, FetchState>;
2677 fn outbox(&self) -> &Outbox;
2679 fn limiter(&self) -> &RateLimiter;
2681 fn emitter(&self) -> &Emitter<Event>;
2683 fn get(&self, rid: RepoId) -> Result<Option<Doc>, RepositoryError>;
2685 fn clock(&self) -> &LocalTime;
2687 fn clock_mut(&mut self) -> &mut LocalTime;
2689 fn config(&self) -> &Config;
2691 fn metrics(&self) -> &Metrics;
2693}
2694
2695impl<D, S, G> ServiceState for Service<D, S, G>
2696where
2697 D: routing::Store,
2698 G: crypto::signature::Signer<crypto::Signature>,
2699 S: ReadStorage,
2700{
2701 fn nid(&self) -> &NodeId {
2702 self.signer.public_key()
2703 }
2704
2705 fn sessions(&self) -> &Sessions {
2706 &self.sessions
2707 }
2708
2709 fn fetching(&self) -> &HashMap<RepoId, FetchState> {
2710 &self.fetching
2711 }
2712
2713 fn outbox(&self) -> &Outbox {
2714 &self.outbox
2715 }
2716
2717 fn limiter(&self) -> &RateLimiter {
2718 &self.limiter
2719 }
2720
2721 fn emitter(&self) -> &Emitter<Event> {
2722 &self.emitter
2723 }
2724
2725 fn get(&self, rid: RepoId) -> Result<Option<Doc>, RepositoryError> {
2726 self.storage.get(rid)
2727 }
2728
2729 fn clock(&self) -> &LocalTime {
2730 &self.clock
2731 }
2732
2733 fn clock_mut(&mut self) -> &mut LocalTime {
2734 &mut self.clock
2735 }
2736
2737 fn config(&self) -> &Config {
2738 &self.config
2739 }
2740
2741 fn metrics(&self) -> &Metrics {
2742 &self.metrics
2743 }
2744}
2745
2746#[derive(Debug)]
2748pub enum DisconnectReason {
2749 Dial(Arc<dyn std::error::Error + Sync + Send>),
2752 Connection(Arc<dyn std::error::Error + Sync + Send>),
2755 Fetch(FetchError),
2757 Session(session::Error),
2759 Conflict,
2761 SelfConnection,
2763 Command,
2765}
2766
2767impl DisconnectReason {
2768 pub fn is_dial_err(&self) -> bool {
2769 matches!(self, Self::Dial(_))
2770 }
2771
2772 pub fn is_connection_err(&self) -> bool {
2773 matches!(self, Self::Connection(_))
2774 }
2775
2776 pub fn connection() -> Self {
2777 DisconnectReason::Connection(Arc::new(std::io::Error::from(
2778 std::io::ErrorKind::ConnectionReset,
2779 )))
2780 }
2781}
2782
2783impl fmt::Display for DisconnectReason {
2784 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2785 match self {
2786 Self::Dial(err) => write!(f, "{err}"),
2787 Self::Connection(err) => write!(f, "{err}"),
2788 Self::Command => write!(f, "command"),
2789 Self::SelfConnection => write!(f, "self-connection"),
2790 Self::Conflict => write!(f, "conflict"),
2791 Self::Session(err) => write!(f, "{err}"),
2792 Self::Fetch(err) => write!(f, "fetch: {err}"),
2793 }
2794 }
2795}
2796
2797#[derive(Debug)]
2799pub struct Lookup {
2800 pub local: Option<Doc>,
2802 pub remote: Vec<NodeId>,
2804}
2805
2806#[derive(thiserror::Error, Debug)]
2807pub enum LookupError {
2808 #[error(transparent)]
2809 Routing(#[from] routing::Error),
2810 #[error(transparent)]
2811 Repository(#[from] RepositoryError),
2812}
2813
2814#[derive(Debug, Clone)]
2815pub struct Sessions(AddressBook<NodeId, Session>);
2817
2818impl Sessions {
2819 pub fn new(rng: Rng) -> Self {
2820 Self(AddressBook::new(rng))
2821 }
2822
2823 pub fn connected(&self) -> impl Iterator<Item = (&NodeId, &Session)> + Clone {
2825 self.0
2826 .iter()
2827 .filter_map(move |(id, sess)| match &sess.state {
2828 session::State::Connected { .. } => Some((id, sess)),
2829 _ => None,
2830 })
2831 }
2832
2833 pub fn inbound(&self) -> impl Iterator<Item = (&NodeId, &Session)> + Clone {
2835 self.connected().filter(|(_, s)| s.link.is_inbound())
2836 }
2837
2838 pub fn outbound(&self) -> impl Iterator<Item = (&NodeId, &Session)> + Clone {
2840 self.connected().filter(|(_, s)| s.link.is_outbound())
2841 }
2842
2843 pub fn connected_mut(&mut self) -> impl Iterator<Item = (&NodeId, &mut Session)> {
2845 self.0.iter_mut().filter(move |(_, s)| s.is_connected())
2846 }
2847
2848 pub fn disconnected_mut(&mut self) -> impl Iterator<Item = (&NodeId, &mut Session)> {
2850 self.0.iter_mut().filter(move |(_, s)| s.is_disconnected())
2851 }
2852
2853 pub fn is_connected(&self, id: &NodeId) -> bool {
2855 self.0.get(id).map(|s| s.is_connected()).unwrap_or(false)
2856 }
2857
2858 pub fn is_disconnected(&self, id: &NodeId) -> bool {
2860 self.0.get(id).map(|s| s.is_disconnected()).unwrap_or(true)
2861 }
2862}
2863
2864impl Deref for Sessions {
2865 type Target = AddressBook<NodeId, Session>;
2866
2867 fn deref(&self) -> &Self::Target {
2868 &self.0
2869 }
2870}
2871
2872impl DerefMut for Sessions {
2873 fn deref_mut(&mut self) -> &mut Self::Target {
2874 &mut self.0
2875 }
2876}