1#![allow(clippy::too_many_arguments)]
2#![allow(clippy::collapsible_match)]
3#![allow(clippy::collapsible_if)]
4#![warn(clippy::unwrap_used)]
5pub mod filter;
6pub mod gossip;
7pub mod io;
8pub mod limiter;
9pub mod message;
10pub mod session;
11
12use std::collections::hash_map::Entry;
13use std::collections::{BTreeSet, HashMap, HashSet};
14use std::net::IpAddr;
15use std::ops::{Deref, DerefMut};
16use std::sync::Arc;
17use std::{fmt, net, time};
18
19use crossbeam_channel as chan;
20use fastrand::Rng;
21use localtime::{LocalDuration, LocalTime};
22use log::*;
23use nonempty::NonEmpty;
24
25use radicle::identity::Doc;
26use radicle::node;
27use radicle::node::address;
28use radicle::node::address::Store as _;
29use radicle::node::address::{AddressBook, AddressType, KnownAddress};
30use radicle::node::config::PeerConfig;
31use radicle::node::device::Device;
32use radicle::node::refs::Store as _;
33use radicle::node::routing::Store as _;
34use radicle::node::seed;
35use radicle::node::seed::Store as _;
36use radicle::node::{ConnectOptions, Penalty, Severity};
37use radicle::storage::refs::SIGREFS_BRANCH;
38use radicle::storage::RepositoryError;
39use radicle_fetch::policy::SeedingPolicy;
40
41use crate::identity::RepoId;
42use crate::node::routing;
43use crate::node::routing::InsertResult;
44use crate::node::{
45 Address, Alias, Features, FetchResult, HostName, Seed, Seeds, SyncStatus, SyncedAt,
46};
47use crate::prelude::*;
48use crate::runtime::Emitter;
49use crate::service::gossip::Store as _;
50use crate::service::message::{
51 Announcement, AnnouncementMessage, Info, NodeAnnouncement, Ping, RefsAnnouncement, RefsStatus,
52};
53use crate::service::policy::{store::Write, Scope};
54use crate::storage;
55use crate::storage::{refs::RefsAt, Namespaces, ReadStorage};
56use crate::worker::fetch;
57use crate::worker::FetchError;
58use crate::Link;
59use crate::{crypto, PROTOCOL_VERSION};
60
61pub use crate::node::events::{Event, Events};
62pub use crate::node::{config::Network, Config, NodeId};
63pub use crate::service::message::{Message, ZeroBytes};
64pub use crate::service::session::{QueuedFetch, Session};
65
66pub use radicle::node::policy::config as policy;
67
68use self::io::Outbox;
69use self::limiter::RateLimiter;
70use self::message::InventoryAnnouncement;
71use self::policy::NamespacesError;
72
73pub const IDLE_INTERVAL: LocalDuration = LocalDuration::from_secs(30);
75pub const GOSSIP_INTERVAL: LocalDuration = LocalDuration::from_secs(6);
77pub const ANNOUNCE_INTERVAL: LocalDuration = LocalDuration::from_mins(60);
79pub const SYNC_INTERVAL: LocalDuration = LocalDuration::from_secs(60);
81pub const PRUNE_INTERVAL: LocalDuration = LocalDuration::from_mins(30);
83pub const STALE_CONNECTION_TIMEOUT: LocalDuration = LocalDuration::from_mins(2);
85pub const KEEP_ALIVE_DELTA: LocalDuration = LocalDuration::from_mins(1);
87pub const MAX_LATENCIES: usize = 16;
89pub const MAX_TIME_DELTA: LocalDuration = LocalDuration::from_mins(60);
91pub const MAX_CONNECTION_ATTEMPTS: usize = 3;
93pub const INITIAL_SUBSCRIBE_BACKLOG_DELTA: LocalDuration = LocalDuration::from_mins(60 * 24);
96pub const SUBSCRIBE_BACKLOG_DELTA: LocalDuration = LocalDuration::from_mins(3);
99pub const MIN_RECONNECTION_DELTA: LocalDuration = LocalDuration::from_secs(3);
101pub const MAX_RECONNECTION_DELTA: LocalDuration = LocalDuration::from_mins(60);
103pub const CONNECTION_RETRY_DELTA: LocalDuration = LocalDuration::from_mins(10);
105pub const FETCH_TIMEOUT: time::Duration = time::Duration::from_secs(3);
107pub const TARGET_OUTBOUND_PEERS: usize = 8;
109
110pub use message::ADDRESS_LIMIT;
112pub use message::INVENTORY_LIMIT;
114pub use message::REF_REMOTE_LIMIT;
116
117#[derive(Clone, Debug, Default, serde::Serialize)]
119#[serde(rename_all = "camelCase")]
120pub struct Metrics {
121 pub peers: HashMap<NodeId, PeerMetrics>,
123 pub worker_queue_size: usize,
125 pub open_channels: usize,
127}
128
129impl Metrics {
130 pub fn peer(&mut self, nid: NodeId) -> &mut PeerMetrics {
132 self.peers.entry(nid).or_default()
133 }
134}
135
136#[derive(Clone, Debug, Default, serde::Serialize)]
138#[serde(rename_all = "camelCase")]
139pub struct PeerMetrics {
140 pub received_git_bytes: usize,
141 pub received_fetch_requests: usize,
142 pub received_bytes: usize,
143 pub received_gossip_messages: usize,
144 pub sent_bytes: usize,
145 pub sent_fetch_requests: usize,
146 pub sent_git_bytes: usize,
147 pub sent_gossip_messages: usize,
148 pub streams_opened: usize,
149 pub inbound_connection_attempts: usize,
150 pub outbound_connection_attempts: usize,
151 pub disconnects: usize,
152}
153
154#[derive(Default)]
156struct SyncedRouting {
157 added: Vec<RepoId>,
159 removed: Vec<RepoId>,
161 updated: Vec<RepoId>,
163}
164
165impl SyncedRouting {
166 fn is_empty(&self) -> bool {
167 self.added.is_empty() && self.removed.is_empty() && self.updated.is_empty()
168 }
169}
170
171#[derive(Debug, Clone)]
173struct Peer {
174 nid: NodeId,
175 addresses: Vec<KnownAddress>,
176 penalty: Penalty,
177}
178
179#[derive(thiserror::Error, Debug)]
181pub enum Error {
182 #[error(transparent)]
183 Git(#[from] radicle::git::raw::Error),
184 #[error(transparent)]
185 GitExt(#[from] radicle::git::ext::Error),
186 #[error(transparent)]
187 Storage(#[from] storage::Error),
188 #[error(transparent)]
189 Gossip(#[from] gossip::Error),
190 #[error(transparent)]
191 Refs(#[from] storage::refs::Error),
192 #[error(transparent)]
193 Routing(#[from] routing::Error),
194 #[error(transparent)]
195 Address(#[from] address::Error),
196 #[error(transparent)]
197 Database(#[from] node::db::Error),
198 #[error(transparent)]
199 Seeds(#[from] seed::Error),
200 #[error(transparent)]
201 Policy(#[from] policy::Error),
202 #[error(transparent)]
203 Repository(#[from] radicle::storage::RepositoryError),
204 #[error("namespaces error: {0}")]
205 Namespaces(#[from] NamespacesError),
206}
207
208pub trait Store:
210 address::Store + gossip::Store + routing::Store + seed::Store + node::refs::Store
211{
212}
213
214impl Store for node::Database {}
215
216pub type QueryState = dyn Fn(&dyn ServiceState) -> Result<(), CommandError> + Send + Sync;
218
219pub enum Command {
221 AnnounceRefs(RepoId, chan::Sender<RefsAt>),
223 AnnounceInventory,
225 AddInventory(RepoId, chan::Sender<bool>),
227 Connect(NodeId, Address, ConnectOptions),
229 Disconnect(NodeId),
231 Config(chan::Sender<Config>),
233 ListenAddrs(chan::Sender<Vec<std::net::SocketAddr>>),
235 Seeds(RepoId, chan::Sender<Seeds>),
237 Fetch(RepoId, NodeId, time::Duration, chan::Sender<FetchResult>),
239 Seed(RepoId, Scope, chan::Sender<bool>),
241 Unseed(RepoId, chan::Sender<bool>),
243 Follow(NodeId, Option<Alias>, chan::Sender<bool>),
245 Unfollow(NodeId, chan::Sender<bool>),
247 QueryState(Arc<QueryState>, chan::Sender<Result<(), CommandError>>),
249}
250
251impl fmt::Debug for Command {
252 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
253 match self {
254 Self::AnnounceRefs(id, _) => write!(f, "AnnounceRefs({id})"),
255 Self::AnnounceInventory => write!(f, "AnnounceInventory"),
256 Self::AddInventory(rid, _) => write!(f, "AddInventory({rid})"),
257 Self::Connect(id, addr, opts) => write!(f, "Connect({id}, {addr}, {opts:?})"),
258 Self::Disconnect(id) => write!(f, "Disconnect({id})"),
259 Self::Config(_) => write!(f, "Config"),
260 Self::ListenAddrs(_) => write!(f, "ListenAddrs"),
261 Self::Seeds(id, _) => write!(f, "Seeds({id})"),
262 Self::Fetch(id, node, _, _) => write!(f, "Fetch({id}, {node})"),
263 Self::Seed(id, scope, _) => write!(f, "Seed({id}, {scope})"),
264 Self::Unseed(id, _) => write!(f, "Unseed({id})"),
265 Self::Follow(id, _, _) => write!(f, "Follow({id})"),
266 Self::Unfollow(id, _) => write!(f, "Unfollow({id})"),
267 Self::QueryState { .. } => write!(f, "QueryState(..)"),
268 }
269 }
270}
271
272#[derive(thiserror::Error, Debug)]
274pub enum CommandError {
275 #[error(transparent)]
276 Storage(#[from] storage::Error),
277 #[error(transparent)]
278 Routing(#[from] routing::Error),
279 #[error(transparent)]
280 Policy(#[from] policy::Error),
281}
282
283#[derive(thiserror::Error, Debug)]
285enum TryFetchError<'a> {
286 #[error("ongoing fetch for repository exists")]
287 AlreadyFetching(&'a mut FetchState),
288 #[error("peer is not connected; cannot initiate fetch")]
289 SessionNotConnected,
290 #[error("peer fetch capacity reached; cannot initiate fetch")]
291 SessionCapacityReached,
292 #[error(transparent)]
293 Namespaces(#[from] NamespacesError),
294}
295
296#[derive(Debug)]
298pub struct FetchState {
299 pub from: NodeId,
301 pub refs_at: Vec<RefsAt>,
303 pub subscribers: Vec<chan::Sender<FetchResult>>,
305}
306
307impl FetchState {
308 fn subscribe(&mut self, c: chan::Sender<FetchResult>) {
310 if !self.subscribers.iter().any(|s| s.same_channel(&c)) {
311 self.subscribers.push(c);
312 }
313 }
314}
315
316#[derive(Debug)]
318pub struct Stores<D>(D);
319
320impl<D> Stores<D>
321where
322 D: Store,
323{
324 pub fn routing(&self) -> &impl routing::Store {
326 &self.0
327 }
328
329 pub fn routing_mut(&mut self) -> &mut impl routing::Store {
331 &mut self.0
332 }
333
334 pub fn addresses(&self) -> &impl address::Store {
336 &self.0
337 }
338
339 pub fn addresses_mut(&mut self) -> &mut impl address::Store {
341 &mut self.0
342 }
343
344 pub fn gossip(&self) -> &impl gossip::Store {
346 &self.0
347 }
348
349 pub fn gossip_mut(&mut self) -> &mut impl gossip::Store {
351 &mut self.0
352 }
353
354 pub fn seeds(&self) -> &impl seed::Store {
356 &self.0
357 }
358
359 pub fn seeds_mut(&mut self) -> &mut impl seed::Store {
361 &mut self.0
362 }
363
364 pub fn refs(&self) -> &impl node::refs::Store {
366 &self.0
367 }
368
369 pub fn refs_mut(&mut self) -> &mut impl node::refs::Store {
371 &mut self.0
372 }
373}
374
375impl<D> AsMut<D> for Stores<D> {
376 fn as_mut(&mut self) -> &mut D {
377 &mut self.0
378 }
379}
380
381impl<D> From<D> for Stores<D> {
382 fn from(db: D) -> Self {
383 Self(db)
384 }
385}
386
387#[derive(Debug)]
389pub struct Service<D, S, G> {
390 config: Config,
392 signer: Device<G>,
394 storage: S,
396 db: Stores<D>,
398 policies: policy::Config<Write>,
400 sessions: Sessions,
402 clock: LocalTime,
404 relayed_by: HashMap<gossip::AnnouncementId, Vec<NodeId>>,
407 outbox: Outbox,
409 node: NodeAnnouncement,
411 inventory: InventoryAnnouncement,
413 rng: Rng,
415 fetching: HashMap<RepoId, FetchState>,
417 limiter: RateLimiter,
419 filter: Filter,
421 last_idle: LocalTime,
423 last_gossip: LocalTime,
425 last_sync: LocalTime,
427 last_prune: LocalTime,
429 last_announce: LocalTime,
431 last_inventory: LocalTime,
433 last_timestamp: Timestamp,
435 started_at: Option<LocalTime>,
437 last_online_at: Option<LocalTime>,
439 emitter: Emitter<Event>,
441 listening: Vec<net::SocketAddr>,
443 metrics: Metrics,
445}
446
447impl<D, S, G> Service<D, S, G> {
448 pub fn node_id(&self) -> NodeId {
450 *self.signer.public_key()
451 }
452
453 pub fn local_time(&self) -> LocalTime {
455 self.clock
456 }
457
458 pub fn emitter(&self) -> Emitter<Event> {
459 self.emitter.clone()
460 }
461}
462
463impl<D, S, G> Service<D, S, G>
464where
465 D: Store,
466 S: ReadStorage + 'static,
467 G: crypto::signature::Signer<crypto::Signature>,
468{
469 pub fn new(
470 config: Config,
471 db: Stores<D>,
472 storage: S,
473 policies: policy::Config<Write>,
474 signer: Device<G>,
475 rng: Rng,
476 node: NodeAnnouncement,
477 emitter: Emitter<Event>,
478 ) -> Self {
479 let sessions = Sessions::new(rng.clone());
480 let limiter = RateLimiter::new(config.peers());
481 let last_timestamp = node.timestamp;
482 let clock = LocalTime::default(); let inventory = gossip::inventory(clock.into(), []); Self {
486 config,
487 storage,
488 policies,
489 signer,
490 rng,
491 inventory,
492 node,
493 clock,
494 db,
495 outbox: Outbox::default(),
496 limiter,
497 sessions,
498 fetching: HashMap::new(),
499 filter: Filter::empty(),
500 relayed_by: HashMap::default(),
501 last_idle: LocalTime::default(),
502 last_gossip: LocalTime::default(),
503 last_sync: LocalTime::default(),
504 last_prune: LocalTime::default(),
505 last_timestamp,
506 last_announce: LocalTime::default(),
507 last_inventory: LocalTime::default(),
508 started_at: None, last_online_at: None, emitter,
511 listening: vec![],
512 metrics: Metrics::default(),
513 }
514 }
515
516 pub fn started(&self) -> Option<LocalTime> {
518 self.started_at
519 }
520
521 #[allow(clippy::should_implement_trait)]
523 pub fn next(&mut self) -> Option<io::Io> {
524 self.outbox.next()
525 }
526
527 pub fn seed(&mut self, id: &RepoId, scope: Scope) -> Result<bool, policy::Error> {
530 let updated = self.policies.seed(id, scope)?;
531 self.filter.insert(id);
532
533 Ok(updated)
534 }
535
536 pub fn unseed(&mut self, id: &RepoId) -> Result<bool, policy::Error> {
541 let updated = self.policies.unseed(id)?;
542
543 if updated {
544 self.filter = Filter::new(
550 self.policies
551 .seed_policies()?
552 .filter_map(|t| (t.policy.is_allow()).then_some(t.rid)),
553 );
554 if let Err(e) = self.remove_inventory(id) {
556 error!(target: "service", "Error updating inventory after unseed: {e}");
557 }
558 }
559 Ok(updated)
560 }
561
562 #[allow(unused)]
566 pub fn closest_peers(&self, n: usize) -> Vec<NodeId> {
567 todo!()
568 }
569
570 pub fn database(&self) -> &Stores<D> {
572 &self.db
573 }
574
575 pub fn database_mut(&mut self) -> &mut Stores<D> {
577 &mut self.db
578 }
579
580 pub fn storage(&self) -> &S {
582 &self.storage
583 }
584
585 pub fn storage_mut(&mut self) -> &mut S {
587 &mut self.storage
588 }
589
590 pub fn policies(&self) -> &policy::Config<Write> {
592 &self.policies
593 }
594
595 pub fn signer(&self) -> &Device<G> {
597 &self.signer
598 }
599
600 pub fn events(&mut self) -> Events {
602 Events::from(self.emitter.subscribe())
603 }
604
605 pub fn outbox(&mut self) -> &mut Outbox {
607 &mut self.outbox
608 }
609
610 pub fn config(&self) -> &Config {
612 &self.config
613 }
614
615 pub fn lookup(&self, rid: RepoId) -> Result<Lookup, LookupError> {
617 let this = self.nid();
618 let local = self.storage.get(rid)?;
619 let remote = self
620 .db
621 .routing()
622 .get(&rid)?
623 .iter()
624 .filter(|nid| nid != &this)
625 .cloned()
626 .collect();
627
628 Ok(Lookup { local, remote })
629 }
630
631 pub fn initialize(&mut self, time: LocalTime) -> Result<(), Error> {
633 debug!(target: "service", "Init @{}", time.as_millis());
634 assert_ne!(time, LocalTime::default());
635
636 let nid = self.node_id();
637
638 self.clock = time;
639 self.started_at = Some(time);
640 self.last_online_at = match self.db.gossip().last() {
641 Ok(Some(last)) => Some(last.to_local_time()),
642 Ok(None) => None,
643 Err(e) => {
644 error!(target: "service", "Error getting the lastest gossip message from db: {e}");
645 None
646 }
647 };
648
649 match self.db.refs().count() {
652 Ok(0) => {
653 info!(target: "service", "Empty refs database, populating from storage..");
654 if let Err(e) = self.db.refs_mut().populate(&self.storage) {
655 error!(target: "service", "Failed to populate refs database: {e}");
656 }
657 }
658 Ok(n) => debug!(target: "service", "Refs database has {n} cached references"),
659 Err(e) => error!(target: "service", "Error checking refs database: {e}"),
660 }
661
662 let announced = self
663 .db
664 .seeds()
665 .seeded_by(&nid)?
666 .collect::<Result<HashMap<_, _>, _>>()?;
667 let mut inventory = BTreeSet::new();
668 let mut private = BTreeSet::new();
669
670 for repo in self.storage.repositories()? {
671 let rid = repo.rid;
672
673 if !self.policies.is_seeding(&rid)? {
675 warn!(target: "service", "Local repository {rid} is not seeded");
676 continue;
677 }
678 if repo.doc.is_public() {
680 inventory.insert(rid);
681 } else {
682 private.insert(rid);
683 }
684 let Some(updated_at) = repo.synced_at else {
686 continue;
687 };
688 if let Some(announced) = announced.get(&rid) {
690 if updated_at.oid == announced.oid {
691 continue;
692 }
693 }
694 if self.db.seeds_mut().synced(
696 &rid,
697 &nid,
698 updated_at.oid,
699 updated_at.timestamp.into(),
700 )? {
701 debug!(target: "service", "Saved local sync status for {rid}..");
702 }
703 if let Ok((ann, _)) = self.refs_announcement_for(rid, [nid]) {
707 debug!(target: "service", "Adding refs announcement for {rid} to historical gossip messages..");
708 self.db.gossip_mut().announced(&nid, &ann)?;
709 }
710 }
711
712 self.db
716 .routing_mut()
717 .add_inventory(inventory.iter(), nid, time.into())?;
718 self.inventory = gossip::inventory(self.timestamp(), inventory);
719
720 self.db
723 .routing_mut()
724 .remove_inventories(private.iter(), &nid)?;
725
726 self.filter = Filter::new(
728 self.policies
729 .seed_policies()?
730 .filter_map(|t| (t.policy.is_allow()).then_some(t.rid)),
731 );
732 let addrs = self.config.connect.clone();
734 for (id, addr) in addrs.into_iter().map(|ca| ca.into()) {
735 self.connect(id, addr);
736 }
737 self.maintain_connections();
739 self.outbox.wakeup(IDLE_INTERVAL);
741 self.outbox.wakeup(GOSSIP_INTERVAL);
742
743 Ok(())
744 }
745
746 pub fn tick(&mut self, now: LocalTime, metrics: &Metrics) {
747 trace!(
748 target: "service",
749 "Tick +{}",
750 now - self.started_at.expect("Service::tick: service must be initialized")
751 );
752 if now >= self.clock {
753 self.clock = now;
754 } else {
755 #[cfg(not(test))]
758 warn!(
759 target: "service",
760 "System clock is not monotonic: {now} is not greater or equal to {}", self.clock
761 );
762 }
763 self.metrics = metrics.clone();
764 }
765
766 pub fn wake(&mut self) {
767 let now = self.clock;
768
769 trace!(
770 target: "service",
771 "Wake +{}",
772 now - self.started_at.expect("Service::wake: service must be initialized")
773 );
774
775 if now - self.last_idle >= IDLE_INTERVAL {
776 trace!(target: "service", "Running 'idle' task...");
777
778 self.keep_alive(&now);
779 self.disconnect_unresponsive_peers(&now);
780 self.idle_connections();
781 self.maintain_connections();
782 self.dequeue_fetches();
783 self.outbox.wakeup(IDLE_INTERVAL);
784 self.last_idle = now;
785 }
786 if now - self.last_gossip >= GOSSIP_INTERVAL {
787 trace!(target: "service", "Running 'gossip' task...");
788
789 if let Err(e) = self.relay_announcements() {
790 error!(target: "service", "Error relaying stored announcements: {e}");
791 }
792 self.outbox.wakeup(GOSSIP_INTERVAL);
793 self.last_gossip = now;
794 }
795 if now - self.last_sync >= SYNC_INTERVAL {
796 trace!(target: "service", "Running 'sync' task...");
797
798 if let Err(e) = self.fetch_missing_repositories() {
799 error!(target: "service", "Error fetching missing inventory: {e}");
800 }
801 self.outbox.wakeup(SYNC_INTERVAL);
802 self.last_sync = now;
803 }
804 if now - self.last_announce >= ANNOUNCE_INTERVAL {
805 trace!(target: "service", "Running 'announce' task...");
806
807 self.announce_inventory();
808 self.outbox.wakeup(ANNOUNCE_INTERVAL);
809 self.last_announce = now;
810 }
811 if now - self.last_prune >= PRUNE_INTERVAL {
812 trace!(target: "service", "Running 'prune' task...");
813
814 if let Err(err) = self.prune_routing_entries(&now) {
815 error!(target: "service", "Error pruning routing entries: {err}");
816 }
817 if let Err(err) = self
818 .db
819 .gossip_mut()
820 .prune((now - self.config.limits.gossip_max_age).into())
821 {
822 error!(target: "service", "Error pruning gossip entries: {err}");
823 }
824
825 self.outbox.wakeup(PRUNE_INTERVAL);
826 self.last_prune = now;
827 }
828
829 self.maintain_persistent();
831 }
832
833 pub fn command(&mut self, cmd: Command) {
834 info!(target: "service", "Received command {:?}", cmd);
835
836 match cmd {
837 Command::Connect(nid, addr, opts) => {
838 if opts.persistent {
839 self.config.connect.insert((nid, addr.clone()).into());
840 }
841 if !self.connect(nid, addr) {
842 }
844 }
845 Command::Disconnect(nid) => {
846 self.outbox.disconnect(nid, DisconnectReason::Command);
847 }
848 Command::Config(resp) => {
849 resp.send(self.config.clone()).ok();
850 }
851 Command::ListenAddrs(resp) => {
852 resp.send(self.listening.clone()).ok();
853 }
854 Command::Seeds(rid, resp) => match self.seeds(&rid) {
855 Ok(seeds) => {
856 let (connected, disconnected) = seeds.partition();
857 debug!(
858 target: "service",
859 "Found {} connected seed(s) and {} disconnected seed(s) for {}",
860 connected.len(), disconnected.len(), rid
861 );
862 resp.send(seeds).ok();
863 }
864 Err(e) => {
865 error!(target: "service", "Error getting seeds for {rid}: {e}");
866 }
867 },
868 Command::Fetch(rid, seed, timeout, resp) => {
869 self.fetch(rid, seed, timeout, Some(resp));
870 }
871 Command::Seed(rid, scope, resp) => {
872 let seeded = self
874 .seed(&rid, scope)
875 .expect("Service::command: error seeding repository");
876 resp.send(seeded).ok();
877
878 self.outbox.broadcast(
880 Message::subscribe(self.filter(), self.clock.into(), Timestamp::MAX),
881 self.sessions.connected().map(|(_, s)| s),
882 );
883 }
884 Command::Unseed(id, resp) => {
885 let updated = self
886 .unseed(&id)
887 .expect("Service::command: error unseeding repository");
888 resp.send(updated).ok();
889 }
890 Command::Follow(id, alias, resp) => {
891 let seeded = self
892 .policies
893 .follow(&id, alias.as_ref())
894 .expect("Service::command: error following node");
895 resp.send(seeded).ok();
896 }
897 Command::Unfollow(id, resp) => {
898 let updated = self
899 .policies
900 .unfollow(&id)
901 .expect("Service::command: error unfollowing node");
902 resp.send(updated).ok();
903 }
904 Command::AnnounceRefs(id, resp) => {
905 let doc = match self.storage.get(id) {
906 Ok(Some(doc)) => doc,
907 Ok(None) => {
908 error!(target: "service", "Error announcing refs: repository {id} not found");
909 return;
910 }
911 Err(e) => {
912 error!(target: "service", "Error announcing refs: doc error: {e}");
913 return;
914 }
915 };
916
917 match self.announce_own_refs(id, doc) {
918 Ok(refs) => match refs.as_slice() {
919 &[refs] => {
920 resp.send(refs).ok();
921 }
922 [..] => panic!("Service::command: unexpected refs returned"),
924 },
925 Err(err) => {
926 error!(target: "service", "Error announcing refs: {err}");
927 }
928 }
929 }
930 Command::AnnounceInventory => {
931 self.announce_inventory();
932 }
933 Command::AddInventory(rid, resp) => match self.add_inventory(rid) {
934 Ok(updated) => {
935 resp.send(updated).ok();
936 }
937 Err(e) => {
938 error!(target: "service", "Error adding {rid} to inventory: {e}");
939 }
940 },
941 Command::QueryState(query, sender) => {
942 sender.send(query(self)).ok();
943 }
944 }
945 }
946
947 fn fetch_refs_at(
950 &mut self,
951 rid: RepoId,
952 from: NodeId,
953 refs: NonEmpty<RefsAt>,
954 scope: Scope,
955 timeout: time::Duration,
956 channel: Option<chan::Sender<FetchResult>>,
957 ) -> bool {
958 match self.refs_status_of(rid, refs, &scope) {
959 Ok(status) => {
960 if status.want.is_empty() {
961 debug!(target: "service", "Skipping fetch for {rid}, all refs are already in storage");
962 } else {
963 return self._fetch(rid, from, status.want, timeout, channel);
964 }
965 }
966 Err(e) => {
967 error!(target: "service", "Error getting the refs status of {rid}: {e}");
968 }
969 }
970 false
972 }
973
974 fn fetch(
976 &mut self,
977 rid: RepoId,
978 from: NodeId,
979 timeout: time::Duration,
980 channel: Option<chan::Sender<FetchResult>>,
981 ) -> bool {
982 self._fetch(rid, from, vec![], timeout, channel)
983 }
984
985 fn _fetch(
986 &mut self,
987 rid: RepoId,
988 from: NodeId,
989 refs_at: Vec<RefsAt>,
990 timeout: time::Duration,
991 channel: Option<chan::Sender<FetchResult>>,
992 ) -> bool {
993 match self.try_fetch(rid, &from, refs_at.clone(), timeout) {
994 Ok(fetching) => {
995 if let Some(c) = channel {
996 fetching.subscribe(c);
997 }
998 return true;
999 }
1000 Err(TryFetchError::AlreadyFetching(fetching)) => {
1001 if fetching.from == from && fetching.refs_at == refs_at {
1005 debug!(target: "service", "Ignoring redundant fetch of {rid} from {from}");
1006
1007 if let Some(c) = channel {
1008 fetching.subscribe(c);
1009 }
1010 } else {
1011 let fetch = QueuedFetch {
1012 rid,
1013 refs_at,
1014 from,
1015 timeout,
1016 channel,
1017 };
1018 debug!(target: "service", "Queueing fetch for {rid} with {from} (already fetching)..");
1019
1020 self.queue_fetch(fetch);
1021 }
1022 }
1023 Err(TryFetchError::SessionCapacityReached) => {
1024 debug!(target: "service", "Fetch capacity reached for {from}, queueing {rid}..");
1025 self.queue_fetch(QueuedFetch {
1026 rid,
1027 refs_at,
1028 from,
1029 timeout,
1030 channel,
1031 });
1032 }
1033 Err(e) => {
1034 if let Some(c) = channel {
1035 c.send(FetchResult::Failed {
1036 reason: e.to_string(),
1037 })
1038 .ok();
1039 }
1040 }
1041 }
1042 false
1043 }
1044
1045 fn queue_fetch(&mut self, fetch: QueuedFetch) {
1046 let Some(s) = self.sessions.get_mut(&fetch.from) else {
1047 log::error!(target: "service", "Cannot queue fetch for unknown session {}", fetch.from);
1048 return;
1049 };
1050 if let Err(e) = s.queue_fetch(fetch) {
1051 let fetch = e.inner();
1052 log::debug!(target: "service", "Unable to queue fetch for {} with {}: {e}", &fetch.rid, &fetch.from);
1053 }
1054 }
1055
1056 fn try_fetch(
1058 &mut self,
1059 rid: RepoId,
1060 from: &NodeId,
1061 refs_at: Vec<RefsAt>,
1062 timeout: time::Duration,
1063 ) -> Result<&mut FetchState, TryFetchError> {
1064 let from = *from;
1065 let Some(session) = self.sessions.get_mut(&from) else {
1066 return Err(TryFetchError::SessionNotConnected);
1067 };
1068 let fetching = self.fetching.entry(rid);
1069
1070 trace!(target: "service", "Trying to fetch {refs_at:?} for {rid}..");
1071
1072 let fetching = match fetching {
1073 Entry::Vacant(fetching) => fetching,
1074 Entry::Occupied(fetching) => {
1075 return Err(TryFetchError::AlreadyFetching(fetching.into_mut()));
1077 }
1078 };
1079 debug_assert!(!session.is_fetching(&rid));
1082
1083 if !session.is_connected() {
1084 return Err(TryFetchError::SessionNotConnected);
1087 }
1088 if session.is_at_capacity() {
1089 return Err(TryFetchError::SessionCapacityReached);
1091 }
1092
1093 let fetching = fetching.insert(FetchState {
1094 from,
1095 refs_at: refs_at.clone(),
1096 subscribers: vec![],
1097 });
1098 self.outbox.fetch(
1099 session,
1100 rid,
1101 refs_at,
1102 timeout,
1103 self.config.limits.fetch_pack_receive,
1104 );
1105
1106 Ok(fetching)
1107 }
1108
1109 pub fn fetched(
1110 &mut self,
1111 rid: RepoId,
1112 remote: NodeId,
1113 result: Result<fetch::FetchResult, FetchError>,
1114 ) {
1115 let Some(fetching) = self.fetching.remove(&rid) else {
1116 error!(target: "service", "Received unexpected fetch result for {rid}, from {remote}");
1117 return;
1118 };
1119 debug_assert_eq!(fetching.from, remote);
1120
1121 if let Some(s) = self.sessions.get_mut(&remote) {
1122 s.fetched(rid);
1124 }
1125
1126 for sub in &fetching.subscribers {
1129 debug!(target: "service", "Found existing fetch request from {remote}, sending result..");
1130
1131 let result = match &result {
1132 Ok(success) => FetchResult::Success {
1133 updated: success.updated.clone(),
1134 namespaces: success.namespaces.clone(),
1135 clone: success.clone,
1136 },
1137 Err(e) => FetchResult::Failed {
1138 reason: e.to_string(),
1139 },
1140 };
1141 if sub.send(result).is_err() {
1142 error!(target: "service", "Error sending fetch result for {rid} from {remote}..");
1143 } else {
1144 debug!(target: "service", "Sent fetch result for {rid} from {remote}..");
1145 }
1146 }
1147
1148 match result {
1149 Ok(fetch::FetchResult {
1150 updated,
1151 namespaces,
1152 clone,
1153 doc,
1154 }) => {
1155 info!(target: "service", "Fetched {rid} from {remote} successfully");
1156 self.seed_discovered(rid, remote, self.clock.into());
1159
1160 for update in &updated {
1161 if update.is_skipped() {
1162 trace!(target: "service", "Ref skipped: {update} for {rid}");
1163 } else {
1164 debug!(target: "service", "Ref updated: {update} for {rid}");
1165 }
1166 }
1167 self.emitter.emit(Event::RefsFetched {
1168 remote,
1169 rid,
1170 updated: updated.clone(),
1171 });
1172
1173 if clone && doc.is_public() {
1176 debug!(target: "service", "Updating and announcing inventory for cloned repository {rid}..");
1177
1178 if let Err(e) = self.add_inventory(rid) {
1179 error!(target: "service", "Error announcing inventory for {rid}: {e}");
1180 }
1181 }
1182
1183 if updated.is_empty() || updated.iter().all(|u| u.is_skipped()) {
1185 debug!(target: "service", "Nothing to announce, no refs were updated..");
1186 } else {
1187 if let Err(e) = self.announce_refs(rid, doc.into(), namespaces) {
1190 error!(target: "service", "Failed to announce new refs: {e}");
1191 }
1192 }
1193 }
1194 Err(err) => {
1195 error!(target: "service", "Fetch failed for {rid} from {remote}: {err}");
1196
1197 if err.is_timeout() {
1200 self.outbox.disconnect(remote, DisconnectReason::Fetch(err));
1201 }
1202 }
1203 }
1204 self.dequeue_fetches();
1206 }
1207
1208 pub fn dequeue_fetches(&mut self) {
1216 let sessions = self
1217 .sessions
1218 .shuffled()
1219 .map(|(k, _)| *k)
1220 .collect::<Vec<_>>();
1221
1222 for nid in sessions {
1224 #[allow(clippy::unwrap_used)]
1226 let sess = self.sessions.get_mut(&nid).unwrap();
1227 if !sess.is_connected() || sess.is_at_capacity() {
1228 continue;
1229 }
1230
1231 if let Some(QueuedFetch {
1232 rid,
1233 from,
1234 refs_at,
1235 timeout,
1236 channel,
1237 }) = sess.dequeue_fetch()
1238 {
1239 debug!(target: "service", "Dequeued fetch for {rid} from session {from}..");
1240
1241 if let Some(refs) = NonEmpty::from_vec(refs_at) {
1242 let repo_entry = self.policies.seed_policy(&rid).expect(
1243 "Service::dequeue_fetch: error accessing repo seeding configuration",
1244 );
1245 let SeedingPolicy::Allow { scope } = repo_entry.policy else {
1246 debug!(target: "service", "Repository {rid} is no longer seeded, skipping..");
1247 continue;
1248 };
1249 self.fetch_refs_at(rid, from, refs, scope, timeout, channel);
1250 } else {
1251 self.fetch(rid, from, timeout, channel);
1253 }
1254 }
1255 }
1256 }
1257
1258 pub fn accepted(&mut self, ip: IpAddr) -> bool {
1260 if ip.is_loopback() || ip.is_unspecified() {
1263 return true;
1264 }
1265 if self.sessions.inbound().count() >= self.config.limits.connection.inbound {
1267 return false;
1268 }
1269 match self.db.addresses().is_ip_banned(ip) {
1270 Ok(banned) => {
1271 if banned {
1272 debug!(target: "service", "Rejecting inbound connection from banned ip {ip}");
1273 return false;
1274 }
1275 }
1276 Err(e) => error!(target: "service", "Error querying ban status for {ip}: {e}"),
1277 }
1278 let host: HostName = ip.into();
1279
1280 if self.limiter.limit(
1281 host.clone(),
1282 None,
1283 &self.config.limits.rate.inbound,
1284 self.clock,
1285 ) {
1286 trace!(target: "service", "Rate limiting inbound connection from {host}..");
1287 return false;
1288 }
1289 true
1290 }
1291
1292 pub fn attempted(&mut self, nid: NodeId, addr: Address) {
1293 debug!(target: "service", "Attempted connection to {nid} ({addr})");
1294
1295 if let Some(sess) = self.sessions.get_mut(&nid) {
1296 sess.to_attempted();
1297 } else {
1298 #[cfg(debug_assertions)]
1299 panic!("Service::attempted: unknown session {nid}@{addr}");
1300 }
1301 }
1302
1303 pub fn listening(&mut self, local_addr: net::SocketAddr) {
1304 info!(target: "node", "Listening on {local_addr}..");
1305
1306 self.listening.push(local_addr);
1307 }
1308
1309 pub fn connected(&mut self, remote: NodeId, addr: Address, link: Link) {
1310 info!(target: "service", "Connected to {remote} ({addr}) ({link:?})");
1311 self.emitter.emit(Event::PeerConnected { nid: remote });
1312
1313 let msgs = self.initial(link);
1314
1315 if link.is_outbound() {
1316 if let Some(peer) = self.sessions.get_mut(&remote) {
1317 peer.to_connected(self.clock);
1318 self.outbox.write_all(peer, msgs);
1319 }
1320 } else {
1321 match self.sessions.entry(remote) {
1322 Entry::Occupied(mut e) => {
1323 let peer = e.get_mut();
1333 debug!(
1334 target: "service",
1335 "Connecting peer {remote} already has a session open ({peer})"
1336 );
1337 peer.link = link;
1338 peer.to_connected(self.clock);
1339 self.outbox.write_all(peer, msgs);
1340 }
1341 Entry::Vacant(e) => {
1342 if let HostName::Ip(ip) = addr.host {
1343 if !address::is_local(&ip) {
1344 if let Err(e) =
1345 self.db
1346 .addresses_mut()
1347 .record_ip(&remote, ip, self.clock.into())
1348 {
1349 log::error!(target: "service", "Error recording IP address for {remote}: {e}");
1350 }
1351 }
1352 }
1353 let peer = e.insert(Session::inbound(
1354 remote,
1355 addr,
1356 self.config.is_persistent(&remote),
1357 self.rng.clone(),
1358 self.clock,
1359 self.config.limits.clone(),
1360 ));
1361 self.outbox.write_all(peer, msgs);
1362 }
1363 }
1364 }
1365 }
1366
1367 pub fn disconnected(&mut self, remote: NodeId, link: Link, reason: &DisconnectReason) {
1368 let since = self.local_time();
1369 let Some(session) = self.sessions.get_mut(&remote) else {
1370 trace!(target: "service", "Redundant disconnection for {} ({})", remote, reason);
1373 return;
1374 };
1375 if session.link != link {
1378 return;
1379 }
1380
1381 info!(target: "service", "Disconnected from {} ({})", remote, reason);
1382 self.emitter.emit(Event::PeerDisconnected {
1383 nid: remote,
1384 reason: reason.to_string(),
1385 });
1386
1387 let link = session.link;
1388 let addr = session.addr.clone();
1389
1390 self.fetching.retain(|_, fetching| {
1391 if fetching.from != remote {
1392 return true;
1393 }
1394 for resp in &fetching.subscribers {
1396 resp.send(FetchResult::Failed {
1397 reason: format!("disconnected: {reason}"),
1398 })
1399 .ok();
1400 }
1401 false
1402 });
1403
1404 if self.config.peer(&remote).is_some() {
1406 let delay = LocalDuration::from_secs(2u64.saturating_pow(session.attempts() as u32))
1407 .clamp(MIN_RECONNECTION_DELTA, MAX_RECONNECTION_DELTA);
1408
1409 session.to_disconnected(since, since + delay);
1412
1413 debug!(target: "service", "Reconnecting to {remote} in {delay}..");
1414
1415 self.outbox.wakeup(delay);
1416 } else {
1417 debug!(target: "service", "Dropping peer {remote}..");
1418 self.sessions.remove(&remote);
1419
1420 let severity = match reason {
1421 DisconnectReason::Dial(_)
1422 | DisconnectReason::Fetch(_)
1423 | DisconnectReason::Connection(_) => {
1424 if self.is_online() {
1425 Severity::Medium
1428 } else {
1429 Severity::Low
1430 }
1431 }
1432 DisconnectReason::Session(e) => e.severity(),
1433 DisconnectReason::Command
1434 | DisconnectReason::Conflict
1435 | DisconnectReason::SelfConnection => Severity::Low,
1436 };
1437
1438 if let Err(e) = self
1439 .db
1440 .addresses_mut()
1441 .disconnected(&remote, &addr, severity)
1442 {
1443 error!(target: "service", "Error updating address store: {e}");
1444 }
1445 if link.is_outbound() {
1448 self.maintain_connections();
1449 }
1450 }
1451 self.dequeue_fetches();
1452 }
1453
1454 pub fn received_message(&mut self, remote: NodeId, message: Message) {
1455 if let Err(err) = self.handle_message(&remote, message) {
1456 self.outbox
1459 .disconnect(remote, DisconnectReason::Session(err));
1460
1461 }
1464 }
1465
1466 pub fn handle_announcement(
1471 &mut self,
1472 relayer: &NodeId,
1473 relayer_addr: &Address,
1474 announcement: &Announcement,
1475 ) -> Result<Option<gossip::AnnouncementId>, session::Error> {
1476 if !announcement.verify() {
1477 return Err(session::Error::Misbehavior);
1478 }
1479 let Announcement {
1480 node: announcer,
1481 message,
1482 ..
1483 } = announcement;
1484
1485 if announcer == self.nid() {
1487 return Ok(None);
1488 }
1489 let now = self.clock;
1490 let timestamp = message.timestamp();
1491
1492 if timestamp.saturating_sub(now.as_millis()) > MAX_TIME_DELTA.as_millis() as u64 {
1494 return Err(session::Error::InvalidTimestamp(timestamp));
1495 }
1496
1497 if let AnnouncementMessage::Inventory(_) | AnnouncementMessage::Refs(_) = message {
1506 match self.db.addresses().get(announcer) {
1507 Ok(node) => {
1508 if node.is_none() {
1509 debug!(target: "service", "Ignoring announcement from unknown node {announcer} (t={timestamp})");
1510 return Ok(None);
1511 }
1512 }
1513 Err(e) => {
1514 error!(target: "service", "Error looking up node in address book: {e}");
1515 return Ok(None);
1516 }
1517 }
1518 }
1519
1520 let relay = match self.db.gossip_mut().announced(announcer, announcement) {
1522 Ok(Some(id)) => {
1523 log::debug!(
1524 target: "service",
1525 "Stored announcement from {announcer} to be broadcast in {} (t={timestamp})",
1526 (self.last_gossip + GOSSIP_INTERVAL) - self.clock
1527 );
1528 self.relayed_by.entry(id).or_default().push(*relayer);
1530
1531 let relay = message.is_node_announcement()
1536 || now - timestamp.to_local_time() <= MAX_TIME_DELTA;
1537 relay.then_some(id)
1538 }
1539 Ok(None) => {
1540 debug!(target: "service", "Ignoring stale announcement from {announcer} (t={timestamp})");
1544 return Ok(None);
1545 }
1546 Err(e) => {
1547 error!(target: "service", "Error updating gossip entry from {announcer}: {e}");
1548 return Ok(None);
1549 }
1550 };
1551
1552 match message {
1553 AnnouncementMessage::Inventory(message) => {
1555 self.emitter.emit(Event::InventoryAnnounced {
1556 nid: *announcer,
1557 inventory: message.inventory.to_vec(),
1558 timestamp: message.timestamp,
1559 });
1560 match self.sync_routing(
1561 message.inventory.iter().cloned(),
1562 *announcer,
1563 message.timestamp,
1564 ) {
1565 Ok(synced) => {
1566 if synced.is_empty() {
1567 trace!(target: "service", "No routes updated by inventory announcement from {announcer}");
1568 return Ok(None);
1569 }
1570 }
1571 Err(e) => {
1572 error!(target: "service", "Error processing inventory from {announcer}: {e}");
1573 return Ok(None);
1574 }
1575 }
1576 let mut missing = Vec::new();
1577 let nid = *self.nid();
1578
1579 if let Some(sess) = self.sessions.get_mut(announcer) {
1582 for id in message.inventory.as_slice() {
1583 if let Some(sub) = &mut sess.subscribe {
1587 sub.filter.insert(id);
1588 }
1589
1590 if self.policies.is_seeding(id).expect(
1593 "Service::handle_announcement: error accessing seeding configuration",
1594 ) {
1595 match self.db.routing().entry(id, &nid) {
1598 Ok(entry) => {
1599 if entry.is_none() {
1600 missing.push(*id);
1601 }
1602 }
1603 Err(e) => error!(
1604 target: "service",
1605 "Error checking local inventory for {id}: {e}"
1606 ),
1607 }
1608 }
1609 }
1610 }
1611 self.rng.shuffle(&mut missing);
1616
1617 for rid in missing {
1618 debug!(target: "service", "Missing seeded inventory {rid}; initiating fetch..");
1619 self.fetch(rid, *announcer, FETCH_TIMEOUT, None);
1620 }
1621 return Ok(relay);
1622 }
1623 AnnouncementMessage::Refs(message) => {
1624 self.emitter.emit(Event::RefsAnnounced {
1625 nid: *announcer,
1626 rid: message.rid,
1627 refs: message.refs.to_vec(),
1628 timestamp: message.timestamp,
1629 });
1630 let Some(refs) = NonEmpty::from_vec(message.refs.to_vec()) else {
1632 debug!(target: "service", "Skipping fetch, no refs in announcement for {} (t={timestamp})", message.rid);
1633 return Ok(None);
1634 };
1635 self.seed_discovered(message.rid, *announcer, message.timestamp);
1638
1639 if let Some(refs) = refs.iter().find(|r| &r.remote == self.nid()) {
1641 debug!(
1642 target: "service",
1643 "Refs announcement of {announcer} for {} contains our own remote at {} (t={})",
1644 message.rid, refs.at, message.timestamp
1645 );
1646 match self.db.seeds_mut().synced(
1647 &message.rid,
1648 announcer,
1649 refs.at,
1650 message.timestamp,
1651 ) {
1652 Ok(updated) => {
1653 if updated {
1654 debug!(
1655 target: "service",
1656 "Updating sync status of {announcer} for {} to {}",
1657 message.rid, refs.at
1658 );
1659 self.emitter.emit(Event::RefsSynced {
1660 rid: message.rid,
1661 remote: *announcer,
1662 at: refs.at,
1663 });
1664 } else {
1665 debug!(
1666 target: "service",
1667 "Sync status of {announcer} was not updated for {}",
1668 message.rid,
1669 );
1670 }
1671 }
1672 Err(e) => {
1673 error!(target: "service", "Error updating sync status for {}: {e}", message.rid);
1674 }
1675 }
1676 }
1677 let repo_entry = self.policies.seed_policy(&message.rid).expect(
1678 "Service::handle_announcement: error accessing repo seeding configuration",
1679 );
1680 let SeedingPolicy::Allow { scope } = repo_entry.policy else {
1681 debug!(
1682 target: "service",
1683 "Ignoring refs announcement from {announcer}: repository {} isn't seeded (t={timestamp})",
1684 message.rid
1685 );
1686 return Ok(None);
1687 };
1688 let Some(remote) = self.sessions.get(announcer).cloned() else {
1692 trace!(
1693 target: "service",
1694 "Skipping fetch of {}, no sessions connected to {announcer}",
1695 message.rid
1696 );
1697 return Ok(relay);
1698 };
1699 self.fetch_refs_at(message.rid, remote.id, refs, scope, FETCH_TIMEOUT, None);
1701
1702 return Ok(relay);
1703 }
1704 AnnouncementMessage::Node(
1705 ann @ NodeAnnouncement {
1706 features,
1707 addresses,
1708 ..
1709 },
1710 ) => {
1711 self.emitter.emit(Event::NodeAnnounced {
1712 nid: *announcer,
1713 alias: ann.alias.clone(),
1714 timestamp: ann.timestamp,
1715 features: *features,
1716 addresses: addresses.to_vec(),
1717 });
1718 if !features.has(Features::SEED) {
1721 return Ok(relay);
1722 }
1723
1724 match self.db.addresses_mut().insert(
1725 announcer,
1726 ann.version,
1727 ann.features,
1728 &ann.alias,
1729 ann.work(),
1730 &ann.agent,
1731 timestamp,
1732 addresses
1733 .iter()
1734 .filter(|a| a.is_routable() || relayer_addr.is_local())
1737 .map(|a| KnownAddress::new(a.clone(), address::Source::Peer)),
1738 ) {
1739 Ok(updated) => {
1740 if updated {
1742 debug!(
1743 target: "service",
1744 "Address store entry for node {announcer} updated at {timestamp}"
1745 );
1746 return Ok(relay);
1747 }
1748 }
1749 Err(err) => {
1750 error!(target: "service", "Error processing node announcement from {announcer}: {err}");
1752 }
1753 }
1754 }
1755 }
1756 Ok(None)
1757 }
1758
1759 pub fn handle_info(&mut self, remote: NodeId, info: &Info) -> Result<(), session::Error> {
1760 match info {
1761 Info::RefsAlreadySynced { rid, at } => {
1763 debug!(target: "service", "Refs already synced for {rid} by {remote}");
1764 self.emitter.emit(Event::RefsSynced {
1765 rid: *rid,
1766 remote,
1767 at: *at,
1768 });
1769 }
1770 }
1771
1772 Ok(())
1773 }
1774
1775 pub fn handle_message(
1776 &mut self,
1777 remote: &NodeId,
1778 message: Message,
1779 ) -> Result<(), session::Error> {
1780 let local = self.node_id();
1781 let relay = self.config.is_relay();
1782 let Some(peer) = self.sessions.get_mut(remote) else {
1783 warn!(target: "service", "Session not found for {remote}");
1784 return Ok(());
1785 };
1786 peer.last_active = self.clock;
1787
1788 let limit = match peer.link {
1789 Link::Outbound => &self.config.limits.rate.outbound,
1790 Link::Inbound => &self.config.limits.rate.inbound,
1791 };
1792 if self
1793 .limiter
1794 .limit(peer.addr.clone().into(), Some(remote), limit, self.clock)
1795 {
1796 debug!(target: "service", "Rate limiting message from {remote} ({})", peer.addr);
1797 return Ok(());
1798 }
1799 message.log(log::Level::Debug, remote, Link::Inbound);
1800
1801 let connected = match &mut peer.state {
1802 session::State::Disconnected { .. } => {
1803 debug!(target: "service", "Ignoring message from disconnected peer {}", peer.id);
1804 return Ok(());
1805 }
1806 session::State::Attempted { .. } | session::State::Initial => {
1813 debug!(target: "service", "Received unexpected message from connecting peer {}", peer.id);
1814 debug!(target: "service", "Transitioning peer {} to 'connected' state", peer.id);
1815
1816 peer.to_connected(self.clock);
1817
1818 None
1819 }
1820 session::State::Connected {
1821 ping, latencies, ..
1822 } => Some((ping, latencies)),
1823 };
1824
1825 trace!(target: "service", "Received message {message:?} from {remote}");
1826
1827 match message {
1828 Message::Announcement(ann) => {
1830 let relayer = remote;
1831 let relayer_addr = peer.addr.clone();
1832
1833 if let Some(id) = self.handle_announcement(relayer, &relayer_addr, &ann)? {
1834 if self.config.is_relay() {
1835 if let AnnouncementMessage::Inventory(_) = ann.message {
1836 if let Err(e) = self
1837 .database_mut()
1838 .gossip_mut()
1839 .set_relay(id, gossip::RelayStatus::Relay)
1840 {
1841 error!(target: "service", "Error setting relay flag for message: {e}");
1842 return Ok(());
1843 }
1844 } else {
1845 self.relay(id, ann);
1846 }
1847 }
1848 }
1849 }
1850 Message::Subscribe(subscribe) => {
1851 match self
1853 .db
1854 .gossip()
1855 .filtered(&subscribe.filter, subscribe.since, subscribe.until)
1856 {
1857 Ok(anns) => {
1858 for ann in anns {
1859 let ann = match ann {
1860 Ok(a) => a,
1861 Err(e) => {
1862 error!(target: "service", "Error reading gossip message from store: {e}");
1863 continue;
1864 }
1865 };
1866 if ann.node == *remote {
1868 continue;
1869 }
1870 if relay || ann.node == local {
1872 self.outbox.write(peer, ann.into());
1873 }
1874 }
1875 }
1876 Err(e) => {
1877 error!(target: "service", "Error querying gossip messages from store: {e}");
1878 }
1879 }
1880 peer.subscribe = Some(subscribe);
1881 }
1882 Message::Info(info) => {
1883 self.handle_info(*remote, &info)?;
1884 }
1885 Message::Ping(Ping { ponglen, .. }) => {
1886 if ponglen > Ping::MAX_PONG_ZEROES {
1888 return Ok(());
1889 }
1890 self.outbox.write(
1891 peer,
1892 Message::Pong {
1893 zeroes: ZeroBytes::new(ponglen),
1894 },
1895 );
1896 }
1897 Message::Pong { zeroes } => {
1898 if let Some((ping, latencies)) = connected {
1899 if let session::PingState::AwaitingResponse {
1900 len: ponglen,
1901 since,
1902 } = *ping
1903 {
1904 if (ponglen as usize) == zeroes.len() {
1905 *ping = session::PingState::Ok;
1906 latencies.push_back(self.clock - since);
1908 if latencies.len() > MAX_LATENCIES {
1909 latencies.pop_front();
1910 }
1911 }
1912 }
1913 }
1914 }
1915 }
1916 Ok(())
1917 }
1918
1919 fn refs_status_of(
1921 &self,
1922 rid: RepoId,
1923 refs: NonEmpty<RefsAt>,
1924 scope: &policy::Scope,
1925 ) -> Result<RefsStatus, Error> {
1926 let mut refs = RefsStatus::new(rid, refs, self.db.refs())?;
1927 if refs.want.is_empty() {
1929 return Ok(refs);
1930 }
1931 let mut refs = match scope {
1933 policy::Scope::All => refs,
1934 policy::Scope::Followed => match self.policies.namespaces_for(&self.storage, &rid) {
1935 Ok(Namespaces::All) => refs,
1936 Ok(Namespaces::Followed(followed)) => {
1937 refs.want.retain(|r| followed.contains(&r.remote));
1938 refs
1939 }
1940 Err(e) => return Err(e.into()),
1941 },
1942 };
1943 refs.want.retain(|r| r.remote != self.node_id());
1945
1946 Ok(refs)
1947 }
1948
1949 fn seed_discovered(&mut self, rid: RepoId, nid: NodeId, time: Timestamp) {
1951 if let Ok(result) = self.db.routing_mut().add_inventory([&rid], nid, time) {
1952 if let &[(_, InsertResult::SeedAdded)] = result.as_slice() {
1953 self.emitter.emit(Event::SeedDiscovered { rid, nid });
1954 info!(target: "service", "Routing table updated for {} with seed {nid}", rid);
1955 }
1956 }
1957 }
1958
1959 fn initial(&mut self, _link: Link) -> Vec<Message> {
1961 let now = self.clock();
1962 let filter = self.filter();
1963
1964 let since = if let Some(last) = self.last_online_at {
1974 Timestamp::from(last - SUBSCRIBE_BACKLOG_DELTA)
1975 } else {
1976 (*now - INITIAL_SUBSCRIBE_BACKLOG_DELTA).into()
1977 };
1978 debug!(target: "service", "Subscribing to messages since timestamp {since}..");
1979
1980 vec![
1981 Message::node(self.node.clone(), &self.signer),
1982 Message::inventory(self.inventory.clone(), &self.signer),
1983 Message::subscribe(filter, since, Timestamp::MAX),
1984 ]
1985 }
1986
1987 fn is_online(&self) -> bool {
1989 self.sessions
1990 .connected()
1991 .filter(|(_, s)| s.addr.is_routable() && s.last_active >= self.clock - IDLE_INTERVAL)
1992 .count()
1993 > 0
1994 }
1995
1996 fn remove_inventory(&mut self, rid: &RepoId) -> Result<bool, Error> {
1998 let node = self.node_id();
1999 let now = self.timestamp();
2000
2001 let removed = self.db.routing_mut().remove_inventory(rid, &node)?;
2002 if removed {
2003 self.refresh_and_announce_inventory(now)?;
2004 }
2005 Ok(removed)
2006 }
2007
2008 fn add_inventory(&mut self, rid: RepoId) -> Result<bool, Error> {
2010 let node = self.node_id();
2011 let now = self.timestamp();
2012
2013 if !self.storage.contains(&rid)? {
2014 error!(target: "service", "Attempt to add non-existing inventory {rid}: repository not found in storage");
2015 return Ok(false);
2016 }
2017 let updates = self.db.routing_mut().add_inventory([&rid], node, now)?;
2019 let updated = !updates.is_empty();
2020
2021 if updated {
2022 self.refresh_and_announce_inventory(now)?;
2023 }
2024 Ok(updated)
2025 }
2026
2027 fn refresh_and_announce_inventory(&mut self, time: Timestamp) -> Result<(), Error> {
2029 let inventory = self.inventory()?;
2030
2031 self.inventory = gossip::inventory(time, inventory);
2032 self.announce_inventory();
2033
2034 Ok(())
2035 }
2036
2037 fn inventory(&self) -> Result<HashSet<RepoId>, Error> {
2048 self.db
2049 .routing()
2050 .get_inventory(self.nid())
2051 .map_err(Error::from)
2052 }
2053
2054 fn sync_routing(
2058 &mut self,
2059 inventory: impl IntoIterator<Item = RepoId>,
2060 from: NodeId,
2061 timestamp: Timestamp,
2062 ) -> Result<SyncedRouting, Error> {
2063 let mut synced = SyncedRouting::default();
2064 let included = inventory.into_iter().collect::<BTreeSet<_>>();
2065
2066 for (rid, result) in
2067 self.db
2068 .routing_mut()
2069 .add_inventory(included.iter(), from, timestamp)?
2070 {
2071 match result {
2072 InsertResult::SeedAdded => {
2073 info!(target: "service", "Routing table updated for {rid} with seed {from}");
2074 self.emitter.emit(Event::SeedDiscovered { rid, nid: from });
2075
2076 if self
2077 .policies
2078 .is_seeding(&rid)
2079 .expect("Service::process_inventory: error accessing seeding configuration")
2080 {
2081 }
2084 synced.added.push(rid);
2085 }
2086 InsertResult::TimeUpdated => {
2087 synced.updated.push(rid);
2088 }
2089 InsertResult::NotUpdated => {}
2090 }
2091 }
2092 for rid in self.db.routing().get_inventory(&from)?.into_iter() {
2093 if !included.contains(&rid) {
2094 if self.db.routing_mut().remove_inventory(&rid, &from)? {
2095 synced.removed.push(rid);
2096 self.emitter.emit(Event::SeedDropped { rid, nid: from });
2097 }
2098 }
2099 }
2100 Ok(synced)
2101 }
2102
2103 fn refs_announcement_for(
2105 &mut self,
2106 rid: RepoId,
2107 remotes: impl IntoIterator<Item = NodeId>,
2108 ) -> Result<(Announcement, Vec<RefsAt>), Error> {
2109 let repo = self.storage.repository(rid)?;
2110 let timestamp = self.timestamp();
2111 let mut refs = BoundedVec::<_, REF_REMOTE_LIMIT>::new();
2112
2113 for remote_id in remotes.into_iter() {
2114 let refs_at = RefsAt::new(&repo, remote_id)?;
2115
2116 if refs.push(refs_at).is_err() {
2117 warn!(
2118 target: "service",
2119 "refs announcement limit ({}) exceeded, peers will see only some of your repository references",
2120 REF_REMOTE_LIMIT,
2121 );
2122 break;
2123 }
2124 }
2125
2126 let msg = AnnouncementMessage::from(RefsAnnouncement {
2127 rid,
2128 refs: refs.clone(),
2129 timestamp,
2130 });
2131 Ok((msg.signed(&self.signer), refs.into()))
2132 }
2133
2134 fn announce_own_refs(&mut self, rid: RepoId, doc: Doc) -> Result<Vec<RefsAt>, Error> {
2136 let (refs, timestamp) = self.announce_refs(rid, doc, [self.node_id()])?;
2137
2138 if let &[r] = refs.as_slice() {
2142 self.emitter.emit(Event::LocalRefsAnnounced {
2143 rid,
2144 refs: r,
2145 timestamp,
2146 });
2147 if let Err(e) = self.database_mut().refs_mut().set(
2148 &rid,
2149 &r.remote,
2150 &SIGREFS_BRANCH,
2151 r.at,
2152 timestamp.to_local_time(),
2153 ) {
2154 error!(
2155 target: "service",
2156 "Error updating refs database for `rad/sigrefs` of {} in {rid}: {e}",
2157 r.remote
2158 );
2159 }
2160 }
2161 Ok(refs)
2162 }
2163
2164 fn announce_refs(
2166 &mut self,
2167 rid: RepoId,
2168 doc: Doc,
2169 remotes: impl IntoIterator<Item = NodeId>,
2170 ) -> Result<(Vec<RefsAt>, Timestamp), Error> {
2171 let (ann, refs) = self.refs_announcement_for(rid, remotes)?;
2172 let timestamp = ann.timestamp();
2173 let peers = self.sessions.connected().map(|(_, p)| p);
2174
2175 if let Some(refs) = refs.iter().find(|r| r.remote == ann.node) {
2178 info!(
2179 target: "service",
2180 "Announcing own refs for {rid} to peers ({}) (t={timestamp})..",
2181 refs.at
2182 );
2183 if let Err(e) = self
2185 .db
2186 .seeds_mut()
2187 .synced(&rid, &ann.node, refs.at, timestamp)
2188 {
2189 error!(target: "service", "Error updating sync status for local node: {e}");
2190 } else {
2191 debug!(target: "service", "Saved local sync status for {rid}..");
2192 }
2193 }
2194
2195 self.outbox.announce(
2196 ann,
2197 peers.filter(|p| {
2198 doc.is_visible_to(&p.id.into())
2200 }),
2201 self.db.gossip_mut(),
2202 );
2203 Ok((refs, timestamp))
2204 }
2205
2206 fn reconnect(&mut self, nid: NodeId, addr: Address) -> bool {
2207 if let Some(sess) = self.sessions.get_mut(&nid) {
2208 sess.to_initial();
2209 self.outbox.connect(nid, addr);
2210
2211 return true;
2212 }
2213 false
2214 }
2215
2216 fn connect(&mut self, nid: NodeId, addr: Address) -> bool {
2217 debug!(target: "service", "Connecting to {nid} ({addr})..");
2218
2219 if self.sessions.contains_key(&nid) {
2220 warn!(target: "service", "Attempted connection to peer {nid} which already has a session");
2221 return false;
2222 }
2223 if nid == self.node_id() {
2224 error!(target: "service", "Attempted connection to self");
2225 return false;
2226 }
2227 if self.sessions.outbound().count() >= self.config.limits.connection.outbound {
2228 error!(target: "service", "Outbound connection limit reached when attempting {nid} ({addr})");
2229 return false;
2230 }
2231 let persistent = self.config.is_persistent(&nid);
2232 let timestamp: Timestamp = self.clock.into();
2233
2234 if let Err(e) = self.db.addresses_mut().attempted(&nid, &addr, timestamp) {
2235 error!(target: "service", "Error updating address book with connection attempt: {e}");
2236 }
2237 self.sessions.insert(
2238 nid,
2239 Session::outbound(
2240 nid,
2241 addr.clone(),
2242 persistent,
2243 self.rng.clone(),
2244 self.config.limits.clone(),
2245 ),
2246 );
2247 self.outbox.connect(nid, addr);
2248
2249 true
2250 }
2251
2252 fn seeds(&self, rid: &RepoId) -> Result<Seeds, Error> {
2253 let mut seeds = Seeds::new(self.rng.clone());
2254
2255 if let Ok(repo) = self.storage.repository(*rid) {
2259 if let Ok(local) = RefsAt::new(&repo, self.node_id()) {
2260 for seed in self.db.seeds().seeds_for(rid)? {
2261 let seed = seed?;
2262 let state = self.sessions.get(&seed.nid).map(|s| s.state.clone());
2263 let synced = if local.at == seed.synced_at.oid {
2264 SyncStatus::Synced { at: seed.synced_at }
2265 } else {
2266 let local = SyncedAt::new(local.at, &repo)?;
2267
2268 SyncStatus::OutOfSync {
2269 local,
2270 remote: seed.synced_at,
2271 }
2272 };
2273 seeds.insert(Seed::new(seed.nid, seed.addresses, state, Some(synced)));
2274 }
2275 }
2276 }
2277
2278 for nid in self.db.routing().get(rid)? {
2282 if nid == self.node_id() {
2283 continue;
2284 }
2285 if seeds.contains(&nid) {
2286 continue;
2288 }
2289 let addrs = self.db.addresses().addresses_of(&nid)?;
2290 let state = self.sessions.get(&nid).map(|s| s.state.clone());
2291
2292 seeds.insert(Seed::new(nid, addrs, state, None));
2293 }
2294 Ok(seeds)
2295 }
2296
2297 fn filter(&self) -> Filter {
2299 if self.config.seeding_policy.is_allow() {
2300 Filter::default()
2302 } else {
2303 self.filter.clone()
2304 }
2305 }
2306
2307 fn timestamp(&mut self) -> Timestamp {
2310 let now = Timestamp::from(self.clock);
2311 if *now > *self.last_timestamp {
2312 self.last_timestamp = now;
2313 } else {
2314 self.last_timestamp = self.last_timestamp + 1;
2315 }
2316 self.last_timestamp
2317 }
2318
2319 fn relay(&mut self, id: gossip::AnnouncementId, ann: Announcement) {
2320 let announcer = ann.node;
2321 let relayed_by = self.relayed_by.get(&id);
2322 let rid = if let AnnouncementMessage::Refs(RefsAnnouncement { rid, .. }) = ann.message {
2323 Some(rid)
2324 } else {
2325 None
2326 };
2327 let relay_to = self
2331 .sessions
2332 .connected()
2333 .filter(|(id, _)| {
2334 relayed_by
2335 .map(|relayers| !relayers.contains(id))
2336 .unwrap_or(true) })
2338 .filter(|(id, _)| **id != announcer)
2339 .filter(|(id, _)| {
2340 if let Some(rid) = rid {
2341 self.storage
2345 .get(rid)
2346 .ok()
2347 .flatten()
2348 .map(|doc| doc.is_visible_to(&(*id).into()))
2349 .unwrap_or(false)
2350 } else {
2351 true
2353 }
2354 })
2355 .map(|(_, p)| p);
2356
2357 self.outbox.relay(ann, relay_to);
2358 }
2359
2360 fn relay_announcements(&mut self) -> Result<(), Error> {
2365 let now = self.clock.into();
2366 let rows = self.database_mut().gossip_mut().relays(now)?;
2367 let local = self.node_id();
2368
2369 for (id, msg) in rows {
2370 let announcer = msg.node;
2371 if announcer == local {
2372 continue;
2374 }
2375 self.relay(id, msg);
2376 }
2377 Ok(())
2378 }
2379
2380 fn announce_inventory(&mut self) {
2382 let timestamp = self.inventory.timestamp.to_local_time();
2383
2384 if self.last_inventory == timestamp {
2385 debug!(target: "service", "Skipping redundant inventory announcement (t={})", self.inventory.timestamp);
2386 return;
2387 }
2388 let msg = AnnouncementMessage::from(self.inventory.clone());
2389
2390 self.outbox.announce(
2391 msg.signed(&self.signer),
2392 self.sessions.connected().map(|(_, p)| p),
2393 self.db.gossip_mut(),
2394 );
2395 self.last_inventory = timestamp;
2396 }
2397
2398 fn prune_routing_entries(&mut self, now: &LocalTime) -> Result<(), routing::Error> {
2399 let count = self.db.routing().len()?;
2400 if count <= self.config.limits.routing_max_size {
2401 return Ok(());
2402 }
2403
2404 let delta = count - self.config.limits.routing_max_size;
2405 let nid = self.node_id();
2406 self.db.routing_mut().prune(
2407 (*now - self.config.limits.routing_max_age).into(),
2408 Some(delta),
2409 &nid,
2410 )?;
2411 Ok(())
2412 }
2413
2414 fn disconnect_unresponsive_peers(&mut self, now: &LocalTime) {
2415 let stale = self
2416 .sessions
2417 .connected()
2418 .filter(|(_, session)| *now - session.last_active >= STALE_CONNECTION_TIMEOUT);
2419
2420 for (_, session) in stale {
2421 debug!(target: "service", "Disconnecting unresponsive peer {}..", session.id);
2422
2423 self.outbox.disconnect(
2427 session.id,
2428 DisconnectReason::Session(session::Error::Timeout),
2429 );
2430 }
2431 }
2432
2433 fn keep_alive(&mut self, now: &LocalTime) {
2435 let inactive_sessions = self
2436 .sessions
2437 .connected_mut()
2438 .filter(|(_, session)| *now - session.last_active >= KEEP_ALIVE_DELTA)
2439 .map(|(_, session)| session);
2440 for session in inactive_sessions {
2441 session.ping(self.clock, &mut self.outbox).ok();
2442 }
2443 }
2444
2445 fn available_peers(&mut self) -> Vec<Peer> {
2447 match self.db.addresses().entries() {
2448 Ok(entries) => {
2449 let mut peers = entries
2452 .filter(|entry| entry.version == PROTOCOL_VERSION)
2453 .filter(|entry| !entry.address.banned)
2454 .filter(|entry| !entry.penalty.is_connect_threshold_reached())
2455 .filter(|entry| !self.sessions.contains_key(&entry.node))
2456 .filter(|entry| !self.config.external_addresses.contains(&entry.address.addr))
2457 .filter(|entry| &entry.node != self.nid())
2458 .fold(HashMap::new(), |mut acc, entry| {
2459 acc.entry(entry.node)
2460 .and_modify(|e: &mut Peer| e.addresses.push(entry.address.clone()))
2461 .or_insert_with(|| Peer {
2462 nid: entry.node,
2463 addresses: vec![entry.address],
2464 penalty: entry.penalty,
2465 });
2466 acc
2467 })
2468 .into_values()
2469 .collect::<Vec<_>>();
2470 peers.sort_by_key(|p| p.penalty);
2471 peers
2472 }
2473 Err(e) => {
2474 error!(target: "service", "Unable to lookup available peers in address book: {e}");
2475 Vec::new()
2476 }
2477 }
2478 }
2479
2480 fn fetch_missing_repositories(&mut self) -> Result<(), Error> {
2482 let policies = self.policies.seed_policies()?.collect::<Vec<_>>();
2485 for policy in policies {
2486 let rid = policy.rid;
2487
2488 if !policy.is_allow() {
2489 continue;
2490 }
2491 if self.storage.contains(&rid)? {
2492 continue;
2493 }
2494 match self.seeds(&rid) {
2495 Ok(seeds) => {
2496 if let Some(connected) = NonEmpty::from_vec(seeds.connected().collect()) {
2497 for seed in connected {
2498 self.fetch(rid, seed.nid, FETCH_TIMEOUT, None);
2499 }
2500 } else {
2501 debug!(target: "service", "No connected seeds found for {rid}..");
2511 }
2512 }
2513 Err(e) => {
2514 error!(target: "service", "Couldn't fetch missing repo {rid}: failed to lookup seeds: {e}");
2515 }
2516 }
2517 }
2518 Ok(())
2519 }
2520
2521 fn idle_connections(&mut self) {
2523 for (_, sess) in self.sessions.iter_mut() {
2524 sess.idle(self.clock);
2525
2526 if sess.is_stable() {
2527 if let Err(e) =
2529 self.db
2530 .addresses_mut()
2531 .connected(&sess.id, &sess.addr, self.clock.into())
2532 {
2533 error!(target: "service", "Error updating address book with connection: {e}");
2534 }
2535 }
2536 }
2537 }
2538
2539 fn maintain_connections(&mut self) {
2541 let PeerConfig::Dynamic = self.config.peers else {
2542 return;
2543 };
2544 trace!(target: "service", "Maintaining connections..");
2545
2546 let target = TARGET_OUTBOUND_PEERS;
2547 let now = self.clock;
2548 let outbound = self
2549 .sessions
2550 .values()
2551 .filter(|s| s.link.is_outbound())
2552 .filter(|s| s.is_connected() || s.is_connecting())
2553 .count();
2554 let wanted = target.saturating_sub(outbound);
2555
2556 if wanted == 0 {
2558 return;
2559 }
2560
2561 let available = self
2563 .available_peers()
2564 .into_iter()
2565 .filter_map(|peer| {
2566 peer.addresses
2567 .into_iter()
2568 .find(|ka| match (ka.last_success, ka.last_attempt) {
2569 (Some(success), Some(attempt)) => {
2572 success >= attempt || now - attempt >= CONNECTION_RETRY_DELTA
2573 }
2574 (None, Some(attempt)) => now - attempt >= CONNECTION_RETRY_DELTA,
2576 (_, None) => true,
2578 })
2579 .map(|ka| (peer.nid, ka))
2580 })
2581 .filter(|(_, ka)| match AddressType::from(&ka.addr) {
2582 AddressType::Onion => self.config.onion.is_some(),
2584 AddressType::Dns | AddressType::Ipv4 | AddressType::Ipv6 => true,
2585 });
2586
2587 let connect = available.take(wanted).collect::<Vec<_>>();
2589 if connect.len() < wanted {
2590 log::debug!(
2591 target: "service",
2592 "Not enough available peers to connect to (available={}, wanted={wanted})",
2593 connect.len()
2594 );
2595 }
2596 for (id, ka) in connect {
2597 self.connect(id, ka.addr.clone());
2598 }
2599 }
2600
2601 fn maintain_persistent(&mut self) {
2603 trace!(target: "service", "Maintaining persistent peers..");
2604
2605 let now = self.local_time();
2606 let mut reconnect = Vec::new();
2607
2608 for (nid, session) in self.sessions.iter_mut() {
2609 if let Some(addr) = self.config.peer(nid) {
2610 if let session::State::Disconnected { retry_at, .. } = &mut session.state {
2611 if now >= *retry_at {
2615 reconnect.push((*nid, addr.clone(), session.attempts()));
2616 }
2617 }
2618 }
2619 }
2620
2621 for (nid, addr, attempts) in reconnect {
2622 if self.reconnect(nid, addr) {
2623 debug!(target: "service", "Reconnecting to {nid} (attempts={attempts})...");
2624 }
2625 }
2626 }
2627}
2628
2629pub trait ServiceState {
2631 fn nid(&self) -> &NodeId;
2633 fn sessions(&self) -> &Sessions;
2635 fn fetching(&self) -> &HashMap<RepoId, FetchState>;
2637 fn outbox(&self) -> &Outbox;
2639 fn limiter(&self) -> &RateLimiter;
2641 fn emitter(&self) -> &Emitter<Event>;
2643 fn get(&self, rid: RepoId) -> Result<Option<Doc>, RepositoryError>;
2645 fn clock(&self) -> &LocalTime;
2647 fn clock_mut(&mut self) -> &mut LocalTime;
2649 fn config(&self) -> &Config;
2651 fn metrics(&self) -> &Metrics;
2653}
2654
2655impl<D, S, G> ServiceState for Service<D, S, G>
2656where
2657 D: routing::Store,
2658 G: crypto::signature::Signer<crypto::Signature>,
2659 S: ReadStorage,
2660{
2661 fn nid(&self) -> &NodeId {
2662 self.signer.public_key()
2663 }
2664
2665 fn sessions(&self) -> &Sessions {
2666 &self.sessions
2667 }
2668
2669 fn fetching(&self) -> &HashMap<RepoId, FetchState> {
2670 &self.fetching
2671 }
2672
2673 fn outbox(&self) -> &Outbox {
2674 &self.outbox
2675 }
2676
2677 fn limiter(&self) -> &RateLimiter {
2678 &self.limiter
2679 }
2680
2681 fn emitter(&self) -> &Emitter<Event> {
2682 &self.emitter
2683 }
2684
2685 fn get(&self, rid: RepoId) -> Result<Option<Doc>, RepositoryError> {
2686 self.storage.get(rid)
2687 }
2688
2689 fn clock(&self) -> &LocalTime {
2690 &self.clock
2691 }
2692
2693 fn clock_mut(&mut self) -> &mut LocalTime {
2694 &mut self.clock
2695 }
2696
2697 fn config(&self) -> &Config {
2698 &self.config
2699 }
2700
2701 fn metrics(&self) -> &Metrics {
2702 &self.metrics
2703 }
2704}
2705
2706#[derive(Debug)]
2708pub enum DisconnectReason {
2709 Dial(Arc<dyn std::error::Error + Sync + Send>),
2712 Connection(Arc<dyn std::error::Error + Sync + Send>),
2715 Fetch(FetchError),
2717 Session(session::Error),
2719 Conflict,
2721 SelfConnection,
2723 Command,
2725}
2726
2727impl DisconnectReason {
2728 pub fn is_dial_err(&self) -> bool {
2729 matches!(self, Self::Dial(_))
2730 }
2731
2732 pub fn is_connection_err(&self) -> bool {
2733 matches!(self, Self::Connection(_))
2734 }
2735
2736 pub fn connection() -> Self {
2737 DisconnectReason::Connection(Arc::new(std::io::Error::from(
2738 std::io::ErrorKind::ConnectionReset,
2739 )))
2740 }
2741}
2742
2743impl fmt::Display for DisconnectReason {
2744 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2745 match self {
2746 Self::Dial(err) => write!(f, "{err}"),
2747 Self::Connection(err) => write!(f, "{err}"),
2748 Self::Command => write!(f, "command"),
2749 Self::SelfConnection => write!(f, "self-connection"),
2750 Self::Conflict => write!(f, "conflict"),
2751 Self::Session(err) => write!(f, "{err}"),
2752 Self::Fetch(err) => write!(f, "fetch: {err}"),
2753 }
2754 }
2755}
2756
2757#[derive(Debug)]
2759pub struct Lookup {
2760 pub local: Option<Doc>,
2762 pub remote: Vec<NodeId>,
2764}
2765
2766#[derive(thiserror::Error, Debug)]
2767pub enum LookupError {
2768 #[error(transparent)]
2769 Routing(#[from] routing::Error),
2770 #[error(transparent)]
2771 Repository(#[from] RepositoryError),
2772}
2773
2774#[derive(Debug, Clone)]
2775pub struct Sessions(AddressBook<NodeId, Session>);
2777
2778impl Sessions {
2779 pub fn new(rng: Rng) -> Self {
2780 Self(AddressBook::new(rng))
2781 }
2782
2783 pub fn connected(&self) -> impl Iterator<Item = (&NodeId, &Session)> + Clone {
2785 self.0
2786 .iter()
2787 .filter_map(move |(id, sess)| match &sess.state {
2788 session::State::Connected { .. } => Some((id, sess)),
2789 _ => None,
2790 })
2791 }
2792
2793 pub fn inbound(&self) -> impl Iterator<Item = (&NodeId, &Session)> + Clone {
2795 self.connected().filter(|(_, s)| s.link.is_inbound())
2796 }
2797
2798 pub fn outbound(&self) -> impl Iterator<Item = (&NodeId, &Session)> + Clone {
2800 self.connected().filter(|(_, s)| s.link.is_outbound())
2801 }
2802
2803 pub fn connected_mut(&mut self) -> impl Iterator<Item = (&NodeId, &mut Session)> {
2805 self.0.iter_mut().filter(move |(_, s)| s.is_connected())
2806 }
2807
2808 pub fn disconnected_mut(&mut self) -> impl Iterator<Item = (&NodeId, &mut Session)> {
2810 self.0.iter_mut().filter(move |(_, s)| s.is_disconnected())
2811 }
2812
2813 pub fn is_connected(&self, id: &NodeId) -> bool {
2815 self.0.get(id).map(|s| s.is_connected()).unwrap_or(false)
2816 }
2817
2818 pub fn is_disconnected(&self, id: &NodeId) -> bool {
2820 self.0.get(id).map(|s| s.is_disconnected()).unwrap_or(true)
2821 }
2822}
2823
2824impl Deref for Sessions {
2825 type Target = AddressBook<NodeId, Session>;
2826
2827 fn deref(&self) -> &Self::Target {
2828 &self.0
2829 }
2830}
2831
2832impl DerefMut for Sessions {
2833 fn deref_mut(&mut self) -> &mut Self::Target {
2834 &mut self.0
2835 }
2836}