1#![allow(clippy::too_many_arguments)]
2#![allow(clippy::collapsible_match)]
3#![allow(clippy::collapsible_if)]
4#![warn(clippy::unwrap_used)]
5pub mod command;
6pub use command::{Command, QueryState};
7
8pub mod filter;
9pub mod gossip;
10pub mod io;
11pub mod limiter;
12pub mod message;
13pub mod session;
14
15use std::collections::hash_map::Entry;
16use std::collections::{BTreeSet, HashMap, HashSet};
17use std::net::IpAddr;
18use std::ops::{Deref, DerefMut};
19use std::sync::Arc;
20use std::{fmt, net, time};
21
22use crossbeam_channel as chan;
23use fastrand::Rng;
24use localtime::{LocalDuration, LocalTime};
25use log::*;
26use nonempty::NonEmpty;
27
28use radicle::identity::Doc;
29use radicle::node;
30use radicle::node::address;
31use radicle::node::address::Store as _;
32use radicle::node::address::{AddressBook, AddressType, KnownAddress};
33use radicle::node::config::{PeerConfig, RateLimit};
34use radicle::node::device::Device;
35use radicle::node::refs::Store as _;
36use radicle::node::routing::Store as _;
37use radicle::node::seed;
38use radicle::node::seed::Store as _;
39use radicle::node::{Penalty, Severity};
40use radicle::storage::refs::SIGREFS_BRANCH;
41use radicle::storage::RepositoryError;
42use radicle_fetch::policy::SeedingPolicy;
43
44use crate::fetcher;
45use crate::fetcher::service::FetcherService;
46use crate::fetcher::FetcherState;
47use crate::fetcher::RefsToFetch;
48use crate::service::gossip::Store as _;
49use crate::service::message::{
50 Announcement, AnnouncementMessage, Info, NodeAnnouncement, Ping, RefsAnnouncement, RefsStatus,
51};
52use crate::service::policy::{store::Write, Scope};
53use radicle::identity::RepoId;
54use radicle::node::events::Emitter;
55use radicle::node::routing;
56use radicle::node::routing::InsertResult;
57use radicle::node::{Address, Features, FetchResult, HostName, Seed, Seeds, SyncStatus, SyncedAt};
58use radicle::prelude::*;
59use radicle::storage;
60use radicle::storage::{refs::RefsAt, Namespaces, ReadStorage};
61use radicle::crypto;
64use radicle::node::Link;
65use radicle::node::PROTOCOL_VERSION;
66
67use crate::bounded::BoundedVec;
68use crate::service::filter::Filter;
69pub use crate::service::message::{Message, ZeroBytes};
70pub use crate::service::session::{QueuedFetch, Session};
71use crate::worker::FetchError;
72use radicle::node::events::{Event, Events};
73use radicle::node::{Config, NodeId};
74
75use radicle::node::policy::config as policy;
76
77use self::io::Outbox;
78use self::limiter::RateLimiter;
79use self::message::InventoryAnnouncement;
80use self::policy::NamespacesError;
81
82pub const IDLE_INTERVAL: LocalDuration = LocalDuration::from_secs(30);
84pub const GOSSIP_INTERVAL: LocalDuration = LocalDuration::from_secs(6);
86pub const ANNOUNCE_INTERVAL: LocalDuration = LocalDuration::from_mins(60);
88pub const SYNC_INTERVAL: LocalDuration = LocalDuration::from_secs(60);
90pub const PRUNE_INTERVAL: LocalDuration = LocalDuration::from_mins(30);
92pub const STALE_CONNECTION_TIMEOUT: LocalDuration = LocalDuration::from_mins(2);
94pub const KEEP_ALIVE_DELTA: LocalDuration = LocalDuration::from_mins(1);
96pub const MAX_LATENCIES: usize = 16;
98pub const MAX_TIME_DELTA: LocalDuration = LocalDuration::from_mins(60);
100pub const MAX_CONNECTION_ATTEMPTS: usize = 3;
102pub const INITIAL_SUBSCRIBE_BACKLOG_DELTA: LocalDuration = LocalDuration::from_mins(60 * 24);
105pub const SUBSCRIBE_BACKLOG_DELTA: LocalDuration = LocalDuration::from_mins(3);
108pub const MIN_RECONNECTION_DELTA: LocalDuration = LocalDuration::from_secs(3);
110pub const MAX_RECONNECTION_DELTA: LocalDuration = LocalDuration::from_mins(60);
112pub const CONNECTION_RETRY_DELTA: LocalDuration = LocalDuration::from_mins(10);
114pub const FETCH_TIMEOUT: time::Duration = time::Duration::from_secs(3);
116pub const TARGET_OUTBOUND_PEERS: usize = 8;
118
119pub use message::ADDRESS_LIMIT;
121pub use message::INVENTORY_LIMIT;
123pub use message::REF_REMOTE_LIMIT;
125
126#[derive(Clone, Debug, Default, serde::Serialize)]
128#[serde(rename_all = "camelCase")]
129pub struct Metrics {
130 pub peers: HashMap<NodeId, PeerMetrics>,
132 pub worker_queue_size: usize,
134 pub open_channels: usize,
136}
137
138impl Metrics {
139 pub fn peer(&mut self, nid: NodeId) -> &mut PeerMetrics {
141 self.peers.entry(nid).or_default()
142 }
143}
144
145#[derive(Clone, Debug, Default, serde::Serialize)]
147#[serde(rename_all = "camelCase")]
148pub struct PeerMetrics {
149 pub received_git_bytes: usize,
150 pub received_fetch_requests: usize,
151 pub received_bytes: usize,
152 pub received_gossip_messages: usize,
153 pub sent_bytes: usize,
154 pub sent_fetch_requests: usize,
155 pub sent_git_bytes: usize,
156 pub sent_gossip_messages: usize,
157 pub streams_opened: usize,
158 pub inbound_connection_attempts: usize,
159 pub outbound_connection_attempts: usize,
160 pub disconnects: usize,
161}
162
163#[derive(Default)]
165struct SyncedRouting {
166 added: Vec<RepoId>,
168 removed: Vec<RepoId>,
170 updated: Vec<RepoId>,
172}
173
174impl SyncedRouting {
175 fn is_empty(&self) -> bool {
176 self.added.is_empty() && self.removed.is_empty() && self.updated.is_empty()
177 }
178}
179
180#[derive(Debug, Clone)]
182struct Peer {
183 nid: NodeId,
184 addresses: Vec<KnownAddress>,
185 penalty: Penalty,
186}
187
188#[derive(thiserror::Error, Debug)]
190pub enum Error {
191 #[error(transparent)]
192 Git(#[from] radicle::git::raw::Error),
193 #[error(transparent)]
194 Storage(#[from] storage::Error),
195 #[error(transparent)]
196 Gossip(#[from] gossip::Error),
197 #[error(transparent)]
198 Refs(#[from] storage::refs::Error),
199 #[error(transparent)]
200 Routing(#[from] routing::Error),
201 #[error(transparent)]
202 Address(#[from] address::Error),
203 #[error(transparent)]
204 Database(#[from] node::db::Error),
205 #[error(transparent)]
206 Seeds(#[from] seed::Error),
207 #[error(transparent)]
208 Policy(#[from] policy::Error),
209 #[error(transparent)]
210 Repository(#[from] radicle::storage::RepositoryError),
211 #[error("namespaces error: {0}")]
212 Namespaces(Box<NamespacesError>),
213}
214
215impl From<NamespacesError> for Error {
216 fn from(e: NamespacesError) -> Self {
217 Self::Namespaces(Box::new(e))
218 }
219}
220
221#[derive(thiserror::Error, Debug)]
222pub enum ConnectError {
223 #[error("attempted connection to peer {nid} which already has a session")]
224 SessionExists { nid: NodeId },
225 #[error("attempted connection to self")]
226 SelfConnection,
227 #[error("outbound connection limit reached when attempting {nid} ({addr})")]
228 LimitReached { nid: NodeId, addr: Address },
229 #[error(
230 "attempted connection to {nid}, via {addr} but addresses of this kind are not supported"
231 )]
232 UnsupportedAddress { nid: NodeId, addr: Address },
233 #[error("attempted connection with blocked peer {nid}")]
234 Blocked { nid: NodeId },
235}
236
237pub trait Store:
239 address::Store + gossip::Store + routing::Store + seed::Store + node::refs::Store
240{
241}
242
243impl Store for radicle::node::Database {}
244
245#[derive(Debug)]
247pub struct FetchState {
248 pub from: NodeId,
250 pub refs_at: Vec<RefsAt>,
252 pub subscribers: Vec<chan::Sender<FetchResult>>,
254}
255
256#[derive(Debug)]
258pub struct Stores<D>(D);
259
260impl<D> Stores<D>
261where
262 D: Store,
263{
264 pub fn routing(&self) -> &impl routing::Store {
266 &self.0
267 }
268
269 pub fn routing_mut(&mut self) -> &mut impl routing::Store {
271 &mut self.0
272 }
273
274 pub fn addresses(&self) -> &impl address::Store {
276 &self.0
277 }
278
279 pub fn addresses_mut(&mut self) -> &mut impl address::Store {
281 &mut self.0
282 }
283
284 pub fn gossip(&self) -> &impl gossip::Store {
286 &self.0
287 }
288
289 pub fn gossip_mut(&mut self) -> &mut impl gossip::Store {
291 &mut self.0
292 }
293
294 pub fn seeds(&self) -> &impl seed::Store {
296 &self.0
297 }
298
299 pub fn seeds_mut(&mut self) -> &mut impl seed::Store {
301 &mut self.0
302 }
303
304 pub fn refs(&self) -> &impl node::refs::Store {
306 &self.0
307 }
308
309 pub fn refs_mut(&mut self) -> &mut impl node::refs::Store {
311 &mut self.0
312 }
313}
314
315impl<D> AsMut<D> for Stores<D> {
316 fn as_mut(&mut self) -> &mut D {
317 &mut self.0
318 }
319}
320
321impl<D> From<D> for Stores<D> {
322 fn from(db: D) -> Self {
323 Self(db)
324 }
325}
326
327#[derive(Debug)]
329pub struct Service<D, S, G> {
330 config: Config,
332 signer: Device<G>,
334 storage: S,
336 db: Stores<D>,
338 policies: policy::Config<Write>,
340 sessions: Sessions,
342 clock: LocalTime,
344 relayed_by: HashMap<gossip::AnnouncementId, Vec<NodeId>>,
347 outbox: Outbox,
349 node: NodeAnnouncement,
351 inventory: InventoryAnnouncement,
353 rng: Rng,
355 fetcher: FetcherService<command::Responder<FetchResult>>,
356 limiter: RateLimiter,
358 filter: Filter,
360 last_idle: LocalTime,
362 last_gossip: LocalTime,
364 last_sync: LocalTime,
366 last_prune: LocalTime,
368 last_announce: LocalTime,
370 last_inventory: LocalTime,
372 last_timestamp: Timestamp,
374 started_at: Option<LocalTime>,
376 last_online_at: Option<LocalTime>,
378 emitter: Emitter<Event>,
380 listening: Vec<net::SocketAddr>,
382 metrics: Metrics,
384}
385
386impl<D, S, G> Service<D, S, G> {
387 pub fn node_id(&self) -> NodeId {
389 *self.signer.public_key()
390 }
391
392 pub fn local_time(&self) -> LocalTime {
394 self.clock
395 }
396
397 pub fn emitter(&self) -> Emitter<Event> {
398 self.emitter.clone()
399 }
400}
401
402impl<D, S, G> Service<D, S, G>
403where
404 D: Store,
405 S: ReadStorage + 'static,
406 G: crypto::signature::Signer<crypto::Signature>,
407{
408 pub fn new(
409 config: Config,
410 db: Stores<D>,
411 storage: S,
412 policies: policy::Config<Write>,
413 signer: Device<G>,
414 rng: Rng,
415 node: NodeAnnouncement,
416 emitter: Emitter<Event>,
417 ) -> Self {
418 let sessions = Sessions::new(rng.clone());
419 let limiter = RateLimiter::new(config.peers());
420 let last_timestamp = node.timestamp;
421 let clock = LocalTime::default(); let inventory = gossip::inventory(clock.into(), []); let fetcher = {
424 let config = fetcher::Config::new()
425 .with_max_concurrency(
426 std::num::NonZeroUsize::new(config.limits.fetch_concurrency.into())
427 .expect("fetch concurrency was zero, must be at least 1"),
428 )
429 .with_max_capacity(fetcher::MaxQueueSize::default());
430 FetcherService::new(config)
431 };
432 Self {
433 config,
434 storage,
435 policies,
436 signer,
437 rng,
438 inventory,
439 node,
440 clock,
441 db,
442 outbox: Outbox::default(),
443 limiter,
444 sessions,
445 fetcher,
446 filter: Filter::empty(),
447 relayed_by: HashMap::default(),
448 last_idle: LocalTime::default(),
449 last_gossip: LocalTime::default(),
450 last_sync: LocalTime::default(),
451 last_prune: LocalTime::default(),
452 last_timestamp,
453 last_announce: LocalTime::default(),
454 last_inventory: LocalTime::default(),
455 started_at: None, last_online_at: None, emitter,
458 listening: vec![],
459 metrics: Metrics::default(),
460 }
461 }
462
463 pub fn started(&self) -> Option<LocalTime> {
465 self.started_at
466 }
467
468 #[allow(clippy::should_implement_trait)]
470 pub fn next(&mut self) -> Option<io::Io> {
471 self.outbox.next()
472 }
473
474 pub fn seed(&mut self, id: &RepoId, scope: Scope) -> Result<bool, policy::Error> {
477 let updated = self.policies.seed(id, scope)?;
478 self.filter.insert(id);
479
480 Ok(updated)
481 }
482
483 pub fn unseed(&mut self, id: &RepoId) -> Result<bool, policy::Error> {
488 let updated = self.policies.unseed(id)?;
489
490 if updated {
491 self.filter = Filter::allowed_by(self.policies.seed_policies()?);
495 if let Err(e) = self.remove_inventory(id) {
497 warn!(target: "service", "Failed to update inventory after unseed: {e}");
498 }
499 }
500 Ok(updated)
501 }
502
503 #[allow(unused)]
507 pub fn closest_peers(&self, n: usize) -> Vec<NodeId> {
508 todo!()
509 }
510
511 pub fn database(&self) -> &Stores<D> {
513 &self.db
514 }
515
516 pub fn database_mut(&mut self) -> &mut Stores<D> {
518 &mut self.db
519 }
520
521 pub fn storage(&self) -> &S {
523 &self.storage
524 }
525
526 pub fn storage_mut(&mut self) -> &mut S {
528 &mut self.storage
529 }
530
531 pub fn policies(&self) -> &policy::Config<Write> {
533 &self.policies
534 }
535
536 pub fn signer(&self) -> &Device<G> {
538 &self.signer
539 }
540
541 pub fn events(&mut self) -> Events {
543 Events::from(self.emitter.subscribe())
544 }
545
546 pub fn fetcher(&self) -> &FetcherState {
547 self.fetcher.state()
548 }
549
550 pub fn outbox(&mut self) -> &mut Outbox {
552 &mut self.outbox
553 }
554
555 pub fn config(&self) -> &Config {
557 &self.config
558 }
559
560 pub fn lookup(&self, rid: RepoId) -> Result<Lookup, LookupError> {
562 let this = self.nid();
563 let local = self.storage.get(rid)?;
564 let remote = self
565 .db
566 .routing()
567 .get(&rid)?
568 .iter()
569 .filter(|nid| nid != &this)
570 .cloned()
571 .collect();
572
573 Ok(Lookup { local, remote })
574 }
575
576 pub fn initialize(&mut self, time: LocalTime) -> Result<(), Error> {
578 debug!(target: "service", "Init @{}", time.as_millis());
579 assert_ne!(time, LocalTime::default());
580
581 let nid = self.node_id();
582
583 self.clock = time;
584 self.started_at = Some(time);
585 self.last_online_at = match self.db.gossip().last() {
586 Ok(Some(last)) => Some(last.to_local_time()),
587 Ok(None) => None,
588 Err(e) => {
589 warn!(target: "service", "Failed to get the latest gossip message from db: {e}");
590 None
591 }
592 };
593
594 match self.db.refs().count() {
597 Ok(0) => {
598 info!(target: "service", "Empty refs database, populating from storage..");
599 if let Err(e) = self.db.refs_mut().populate(&self.storage) {
600 warn!(target: "service", "Failed to populate refs database: {e}");
601 }
602 }
603 Ok(n) => debug!(target: "service", "Refs database has {n} cached references"),
604 Err(e) => {
605 warn!(target: "service", "Failed to retrieve count of refs from database: {e}")
606 }
607 }
608
609 let announced = self
610 .db
611 .seeds()
612 .seeded_by(&nid)?
613 .collect::<Result<HashMap<_, _>, _>>()?;
614 let mut inventory = BTreeSet::new();
615 let mut private = BTreeSet::new();
616
617 for repo in self.storage.repositories()? {
618 let rid = repo.rid;
619
620 if !self.policies.is_seeding(&rid)? {
622 debug!(target: "service", "Local repository {rid} is not seeded");
623 continue;
624 }
625 if repo.doc.is_public() {
627 inventory.insert(rid);
628 } else {
629 private.insert(rid);
630 }
631 let Some(updated_at) = repo.synced_at else {
633 continue;
634 };
635 if let Some(announced) = announced.get(&rid) {
637 if updated_at.oid == announced.oid {
638 continue;
639 }
640 }
641 if self.db.seeds_mut().synced(
643 &rid,
644 &nid,
645 updated_at.oid,
646 updated_at.timestamp.into(),
647 )? {
648 debug!(target: "service", "Saved local sync status for {rid}..");
649 }
650 if let Ok((ann, _)) = self.refs_announcement_for(rid, [nid]) {
654 debug!(target: "service", "Adding refs announcement for {rid} to historical gossip messages..");
655 self.db.gossip_mut().announced(&nid, &ann)?;
656 }
657 }
658
659 self.db
663 .routing_mut()
664 .add_inventory(inventory.iter(), nid, time.into())?;
665 self.inventory = gossip::inventory(self.timestamp(), inventory);
666
667 self.db
670 .routing_mut()
671 .remove_inventories(private.iter(), &nid)?;
672
673 self.filter = Filter::allowed_by(self.policies.seed_policies()?);
675 let addrs = self.config.connect.clone();
677 for (id, addr) in addrs.into_iter().map(|ca| ca.into()) {
678 if let Err(e) = self.connect(id, addr) {
679 debug!(target: "service", "Service::initialization connection error: {e}");
680 }
681 }
682 self.maintain_connections();
684 self.outbox.wakeup(IDLE_INTERVAL);
686 self.outbox.wakeup(GOSSIP_INTERVAL);
687
688 Ok(())
689 }
690
691 pub fn tick(&mut self, now: LocalTime, metrics: &Metrics) {
692 trace!(
693 target: "service",
694 "Tick +{}",
695 now - self.started_at.expect("Service::tick: service must be initialized")
696 );
697 if now >= self.clock {
698 self.clock = now;
699 } else {
700 #[cfg(not(test))]
703 warn!(
704 target: "service",
705 "System clock is not monotonic: {now} is not greater or equal to {}", self.clock
706 );
707 }
708 self.metrics = metrics.clone();
709 }
710
711 pub fn wake(&mut self) {
712 let now = self.clock;
713
714 trace!(
715 target: "service",
716 "Wake +{}",
717 now - self.started_at.expect("Service::wake: service must be initialized")
718 );
719
720 if now - self.last_idle >= IDLE_INTERVAL {
721 trace!(target: "service", "Running 'idle' task...");
722
723 self.keep_alive(&now);
724 self.disconnect_unresponsive_peers(&now);
725 self.idle_connections();
726 self.maintain_connections();
727 self.dequeue_fetches();
728 self.outbox.wakeup(IDLE_INTERVAL);
729 self.last_idle = now;
730 }
731 if now - self.last_gossip >= GOSSIP_INTERVAL {
732 trace!(target: "service", "Running 'gossip' task...");
733
734 if let Err(e) = self.relay_announcements() {
735 warn!(target: "service", "Failed to relay stored announcements: {e}");
736 }
737 self.outbox.wakeup(GOSSIP_INTERVAL);
738 self.last_gossip = now;
739 }
740 if now - self.last_sync >= SYNC_INTERVAL {
741 trace!(target: "service", "Running 'sync' task...");
742
743 if let Err(e) = self.fetch_missing_repositories() {
744 warn!(target: "service", "Failed to fetch missing inventory: {e}");
745 }
746 self.outbox.wakeup(SYNC_INTERVAL);
747 self.last_sync = now;
748 }
749 if now - self.last_announce >= ANNOUNCE_INTERVAL {
750 trace!(target: "service", "Running 'announce' task...");
751
752 self.announce_inventory();
753 self.outbox.wakeup(ANNOUNCE_INTERVAL);
754 self.last_announce = now;
755 }
756 if now - self.last_prune >= PRUNE_INTERVAL {
757 trace!(target: "service", "Running 'prune' task...");
758
759 if let Err(err) = self.prune_routing_entries(&now) {
760 warn!(target: "service", "Failed to prune routing entries: {err}");
761 }
762 if let Err(err) = self
763 .db
764 .gossip_mut()
765 .prune((now - LocalDuration::from(self.config.limits.gossip_max_age)).into())
766 {
767 warn!(target: "service", "Failed to prune gossip entries: {err}");
768 }
769
770 self.outbox.wakeup(PRUNE_INTERVAL);
771 self.last_prune = now;
772 }
773
774 self.maintain_persistent();
776 }
777
778 pub fn command(&mut self, cmd: Command) {
779 info!(target: "service", "Received command {cmd:?}");
780
781 match cmd {
782 Command::Connect(nid, addr, opts) => {
783 if opts.persistent {
784 self.config.connect.insert((nid, addr.clone()).into());
785 }
786 if let Err(e) = self.connect(nid, addr) {
787 match e {
788 ConnectError::SessionExists { nid } => {
789 self.emitter.emit(Event::PeerConnected { nid });
790 }
791 e => {
792 self.emitter.emit(Event::PeerDisconnected {
794 nid,
795 reason: e.to_string(),
796 });
797 }
798 }
799 }
800 }
801 Command::Disconnect(nid) => {
802 self.outbox.disconnect(nid, DisconnectReason::Command);
803 }
804 Command::Config(resp) => {
805 resp.ok(self.config.clone()).ok();
806 }
807 Command::ListenAddrs(resp) => {
808 resp.ok(self.listening.clone()).ok();
809 }
810 Command::Seeds(rid, namespaces, resp) => match self.seeds(&rid, namespaces) {
811 Ok(seeds) => {
812 let (connected, disconnected) = seeds.partition();
813 debug!(
814 target: "service",
815 "Found {} connected seed(s) and {} disconnected seed(s) for {}",
816 connected.len(), disconnected.len(), rid
817 );
818 resp.ok(seeds).ok();
819 }
820 Err(e) => {
821 warn!(target: "service", "Failed to get seeds for {rid}: {e}");
822 resp.err(e).ok();
823 }
824 },
825 Command::Fetch(rid, seed, timeout, resp) => {
826 self.fetch(rid, seed, vec![], timeout, Some(resp));
827 }
828 Command::Seed(rid, scope, resp) => {
829 let seeded = self
831 .seed(&rid, scope)
832 .expect("Service::command: error seeding repository");
833 resp.ok(seeded).ok();
834
835 self.outbox.broadcast(
837 Message::subscribe(self.filter(), self.clock.into(), Timestamp::MAX),
838 self.sessions.connected().map(|(_, s)| s),
839 );
840 }
841 Command::Unseed(id, resp) => {
842 let updated = self
843 .unseed(&id)
844 .expect("Service::command: error unseeding repository");
845 resp.ok(updated).ok();
846 }
847 Command::Follow(id, alias, resp) => {
848 let seeded = self
849 .policies
850 .follow(&id, alias.as_ref())
851 .expect("Service::command: error following node");
852 resp.ok(seeded).ok();
853 }
854 Command::Unfollow(id, resp) => {
855 let updated = self
856 .policies
857 .unfollow(&id)
858 .expect("Service::command: error unfollowing node");
859 resp.ok(updated).ok();
860 }
861 Command::Block(id, resp) => {
862 let updated = self
863 .policies
864 .set_follow_policy(&id, policy::Policy::Block)
865 .expect("Service::command: error blocking node");
866 if updated {
867 self.outbox.disconnect(id, DisconnectReason::Policy);
868 }
869 resp.send(updated).ok();
870 }
871 Command::AnnounceRefs(id, namespaces, resp) => {
872 let doc = match self.storage.get(id) {
873 Ok(Some(doc)) => doc,
874 Ok(None) => {
875 warn!(target: "service", "Failed to announce refs: repository {id} not found");
876 resp.err(command::Error::custom(format!("repository {id} not found")))
877 .ok();
878 return;
879 }
880 Err(e) => {
881 warn!(target: "service", "Failed to announce refs: doc error: {e}");
882 resp.err(e).ok();
883 return;
884 }
885 };
886
887 match self.announce_own_refs(id, doc, namespaces) {
888 Ok((refs, _timestamp)) => {
889 if let Some(refs) = refs.first() {
893 resp.ok(*refs).ok();
894 } else {
895 resp.err(command::Error::custom(format!(
896 "no refs were announced for {id}"
897 )))
898 .ok();
899 }
900 }
901 Err(err) => {
902 warn!(target: "service", "Failed to announce refs: {err}");
903 resp.err(err).ok();
904 }
905 }
906 }
907 Command::AnnounceInventory => {
908 self.announce_inventory();
909 }
910 Command::AddInventory(rid, resp) => match self.add_inventory(rid) {
911 Ok(updated) => {
912 resp.ok(updated).ok();
913 }
914 Err(e) => {
915 warn!(target: "service", "Failed to add {rid} to inventory: {e}");
916 resp.err(e).ok();
917 }
918 },
919 Command::QueryState(query, sender) => {
920 sender.send(query(self)).ok();
921 }
922 }
923 }
924
925 fn fetch_refs_at(
928 &mut self,
929 rid: RepoId,
930 from: NodeId,
931 refs: NonEmpty<RefsAt>,
932 scope: Scope,
933 timeout: time::Duration,
934 ) -> bool {
935 match self.refs_status_of(rid, refs, &scope) {
936 Ok(status) => {
937 if status.want.is_empty() {
938 debug!(target: "service", "Skipping fetch for {rid}, all refs are already in storage");
939 } else {
940 self.fetch(rid, from, status.want, timeout, None);
941 return true;
942 }
943 }
944 Err(e) => {
945 warn!(target: "service", "Failed to get the refs status of {rid}: {e}");
946 }
947 }
948 false
950 }
951
952 fn fetch(
953 &mut self,
954 rid: RepoId,
955 from: NodeId,
956 refs_at: Vec<RefsAt>,
957 timeout: time::Duration,
958 channel: Option<command::Responder<FetchResult>>,
959 ) {
960 let session = {
961 let reason = format!("peer {from} is not connected; cannot initiate fetch");
962 let Some(session) = self.sessions.get_mut(&from) else {
963 if let Some(c) = channel {
964 c.ok(FetchResult::Failed { reason }).ok();
965 }
966 return;
967 };
968 if !session.is_connected() {
969 if let Some(c) = channel {
970 c.ok(FetchResult::Failed { reason }).ok();
971 }
972 return;
973 }
974 session
975 };
976
977 let cmd = fetcher::state::command::Fetch {
978 from,
979 rid,
980 refs: refs_at.into(),
981 timeout,
982 };
983 let fetcher::service::FetchInitiated { event, rejected } = self.fetcher.fetch(cmd, channel);
984
985 if let Some(c) = rejected {
986 c.ok(FetchResult::Failed {
987 reason: "fetch queue at capacity".to_string(),
988 })
989 .ok();
990 }
991
992 match event {
993 fetcher::state::event::Fetch::Started {
994 rid,
995 from,
996 refs: refs_at,
997 timeout,
998 } => {
999 debug!(target: "service", "Starting fetch for {rid} from {from}");
1000 self.outbox.fetch(
1001 session,
1002 rid,
1003 refs_at.into(),
1004 timeout,
1005 self.config.limits.fetch_pack_receive,
1006 );
1007 }
1008 fetcher::state::event::Fetch::Queued { rid, from } => {
1009 debug!(target: "service", "Queued fetch for {rid} from {from}");
1010 }
1011 fetcher::state::event::Fetch::AlreadyFetching { rid, from } => {
1012 debug!(target: "service", "Already fetching {rid} from {from}");
1013 }
1014 fetcher::state::event::Fetch::QueueAtCapacity { rid, from, .. } => {
1015 debug!(target: "service", "Queue at capacity for {from}, rejected {rid}");
1016 }
1017 }
1018 }
1019
1020 pub fn fetched(
1021 &mut self,
1022 rid: RepoId,
1023 from: NodeId,
1024 result: Result<crate::worker::fetch::FetchResult, crate::worker::FetchError>,
1025 ) {
1026 let cmd = fetcher::state::command::Fetched { from, rid };
1027 let fetcher::service::FetchCompleted { event, subscribers } = self.fetcher.fetched(cmd);
1028
1029 self.dequeue_fetches();
1031
1032 match event {
1033 fetcher::state::event::Fetched::NotFound { from, rid } => {
1034 debug!(target: "service", "Unexpected fetch result for {rid} from {from}");
1035 }
1036 fetcher::state::event::Fetched::Completed { from, rid, refs: _ } => {
1037 let fetch_result = match &result {
1039 Ok(success) => FetchResult::Success {
1040 updated: success.updated.clone(),
1041 namespaces: success.namespaces.clone(),
1042 clone: success.clone,
1043 },
1044 Err(e) => FetchResult::Failed {
1045 reason: e.to_string(),
1046 },
1047 };
1048 for responder in subscribers {
1049 responder.ok(fetch_result.clone()).ok();
1050 }
1051 match result {
1052 Ok(crate::worker::fetch::FetchResult {
1053 updated,
1054 canonical,
1055 namespaces,
1056 clone,
1057 doc,
1058 }) => {
1059 info!(target: "service", "Fetched {rid} from {from} successfully");
1060 self.seed_discovered(rid, from, self.clock.into());
1063
1064 for update in &updated {
1065 if update.is_skipped() {
1066 trace!(target: "service", "Ref skipped: {update} for {rid}");
1067 } else {
1068 debug!(target: "service", "Ref updated: {update} for {rid}");
1069 }
1070 }
1071 self.emitter.emit(Event::RefsFetched {
1072 remote: from,
1073 rid,
1074 updated: updated.clone(),
1075 });
1076 self.emitter
1077 .emit_all(canonical.into_iter().map(|(refname, target)| {
1078 Event::CanonicalRefUpdated {
1079 rid,
1080 refname,
1081 target,
1082 }
1083 }));
1084
1085 if clone && doc.is_public() {
1088 debug!(target: "service", "Updating and announcing inventory for cloned repository {rid}..");
1089
1090 if let Err(e) = self.add_inventory(rid) {
1091 warn!(target: "service", "Failed to announce inventory for {rid}: {e}");
1092 }
1093 }
1094
1095 if updated.is_empty() || updated.iter().all(|u| u.is_skipped()) {
1097 debug!(target: "service", "Nothing to announce, no refs were updated..");
1098 } else {
1099 if let Err(e) = self.announce_refs(rid, doc.into(), namespaces, false) {
1102 warn!(target: "service", "Failed to announce new refs: {e}");
1103 }
1104 }
1105 }
1106 Err(err) => {
1107 warn!(target: "service", "Fetch failed for {rid} from {from}: {err}");
1108
1109 if err.is_timeout() {
1112 self.outbox.disconnect(from, DisconnectReason::Fetch(err));
1113 }
1114 }
1115 }
1116 }
1117 }
1118 }
1119
1120 pub fn dequeue_fetches(&mut self) {
1128 let sessions = self
1129 .sessions
1130 .shuffled()
1131 .map(|(k, _)| *k)
1132 .collect::<Vec<_>>();
1133
1134 for nid in sessions {
1135 #[allow(clippy::unwrap_used)]
1136 let sess = self.sessions.get_mut(&nid).unwrap();
1137 if !sess.is_connected() {
1138 continue;
1139 }
1140
1141 let Some(fetcher::QueuedFetch {
1142 rid,
1143 refs: refs_at,
1144 timeout,
1145 }) = self.fetcher.dequeue(&nid)
1146 else {
1147 continue;
1148 };
1149
1150 let repo_entry = self
1152 .policies
1153 .seed_policy(&rid)
1154 .expect("error accessing repo seeding configuration");
1155
1156 let SeedingPolicy::Allow { scope } = repo_entry.policy else {
1157 debug!(target: "service", "Repository {} no longer seeded, skipping", rid);
1158 continue;
1159 };
1160
1161 debug!(target: "service", "Dequeued fetch for {} from {}", rid, nid);
1162
1163 match refs_at {
1164 RefsToFetch::Refs(refs) => {
1165 self.fetch_refs_at(rid, nid, refs, scope, timeout);
1166 }
1167 RefsToFetch::All => {
1168 self.fetch(rid, nid, vec![], timeout, None);
1171 }
1172 }
1173 }
1174 }
1175
1176 pub fn accepted(&mut self, ip: IpAddr) -> bool {
1178 if ip.is_loopback() || ip.is_unspecified() {
1181 return true;
1182 }
1183 if self.sessions.inbound().count() >= self.config.limits.connection.inbound.into() {
1185 return false;
1186 }
1187 match self.db.addresses().is_ip_banned(ip) {
1188 Ok(banned) => {
1189 if banned {
1190 debug!(target: "service", "Rejecting inbound connection from banned ip {ip}");
1191 return false;
1192 }
1193 }
1194 Err(e) => warn!(target: "service", "Failed to query ban status for {ip}: {e}"),
1195 }
1196 let host: HostName = ip.into();
1197 let tokens = self.config.limits.rate.inbound;
1198
1199 if self.limiter.limit(host.clone(), None, &tokens, self.clock) {
1200 trace!(target: "service", "Rate limiting inbound connection from {host}..");
1201 return false;
1202 }
1203 true
1204 }
1205
1206 pub fn attempted(&mut self, nid: NodeId, addr: Address) {
1207 debug!(target: "service", "Attempted connection to {nid} ({addr})");
1208
1209 if let Some(sess) = self.sessions.get_mut(&nid) {
1210 sess.to_attempted();
1211 } else {
1212 #[cfg(debug_assertions)]
1213 panic!("Service::attempted: unknown session {nid}@{addr}");
1214 }
1215 }
1216
1217 pub fn listening(&mut self, local_addr: net::SocketAddr) {
1218 info!(target: "node", "Listening on {local_addr}..");
1219
1220 self.listening.push(local_addr);
1221 }
1222
1223 pub fn connected(&mut self, remote: NodeId, addr: Address, link: Link) {
1224 if let Ok(true) = self.policies.is_blocked(&remote) {
1225 self.emitter.emit(Event::PeerDisconnected {
1226 nid: remote,
1227 reason: format!("{remote} is blocked"),
1228 });
1229 info!(target: "service", "Disconnecting blocked inbound peer {remote}");
1230 self.outbox.disconnect(remote, DisconnectReason::Policy);
1231 return;
1232 }
1233
1234 info!(target: "service", "Connected to {remote} ({addr}) ({link:?})");
1235 self.emitter.emit(Event::PeerConnected { nid: remote });
1236
1237 let msgs = self.initial(link);
1238
1239 if link.is_outbound() {
1240 if let Some(peer) = self.sessions.get_mut(&remote) {
1241 peer.to_connected(self.clock);
1242 self.outbox.write_all(peer, msgs);
1243 }
1244 } else {
1245 match self.sessions.entry(remote) {
1246 Entry::Occupied(mut e) => {
1247 let peer = e.get_mut();
1257 debug!(
1258 target: "service",
1259 "Connecting peer {remote} already has a session open ({peer})"
1260 );
1261 peer.link = link;
1262 peer.to_connected(self.clock);
1263 self.outbox.write_all(peer, msgs);
1264 }
1265 Entry::Vacant(e) => {
1266 if let HostName::Ip(ip) = addr.host {
1267 if !address::is_local(&ip) {
1268 if let Err(e) =
1269 self.db
1270 .addresses_mut()
1271 .record_ip(&remote, ip, self.clock.into())
1272 {
1273 log::debug!(target: "service", "Failed to record IP address for {remote}: {e}");
1274 }
1275 }
1276 }
1277 let peer = e.insert(Session::inbound(
1278 remote,
1279 addr,
1280 self.config.is_persistent(&remote),
1281 self.rng.clone(),
1282 self.clock,
1283 ));
1284 self.outbox.write_all(peer, msgs);
1285 }
1286 }
1287 }
1288 }
1289
1290 pub fn disconnected(&mut self, remote: NodeId, link: Link, reason: &DisconnectReason) {
1291 let since = self.local_time();
1292 let Some(session) = self.sessions.get_mut(&remote) else {
1293 trace!(target: "service", "Redundant disconnection for {remote} ({reason})");
1296 return;
1297 };
1298 if session.link != link {
1301 return;
1302 }
1303
1304 info!(target: "service", "Disconnected from {remote} ({reason})");
1305 self.emitter.emit(Event::PeerDisconnected {
1306 nid: remote,
1307 reason: reason.to_string(),
1308 });
1309
1310 let link = session.link;
1311 let addr = session.addr.clone();
1312
1313 let cmd = fetcher::state::command::Cancel { from: remote };
1314 let fetcher::service::FetchesCancelled { event, orphaned } = self.fetcher.cancel(cmd);
1315
1316 match event {
1317 fetcher::state::event::Cancel::Unexpected { from } => {
1318 debug!(target: "service", "No fetches to cancel for {from}");
1319 }
1320 fetcher::state::event::Cancel::Canceled {
1321 from,
1322 active,
1323 queued,
1324 } => {
1325 debug!(target: "service", "Cancelled {} ongoing, {} queued for {from}", active.len(), queued.len());
1326 }
1327 }
1328
1329 for (rid, responder) in orphaned {
1331 responder
1332 .ok(FetchResult::Failed {
1333 reason: format!("failed fetch to {rid}, peer disconnected: {reason}"),
1334 })
1335 .ok();
1336 }
1337
1338 if self.config.is_persistent(&remote) {
1340 let delay = LocalDuration::from_secs(2u64.saturating_pow(session.attempts() as u32))
1341 .clamp(MIN_RECONNECTION_DELTA, MAX_RECONNECTION_DELTA);
1342
1343 session.to_disconnected(since, since + delay);
1346
1347 debug!(target: "service", "Reconnecting to {remote} in {delay}..");
1348
1349 self.outbox.wakeup(delay);
1350 } else {
1351 debug!(target: "service", "Dropping peer {remote}..");
1352 self.sessions.remove(&remote);
1353
1354 let severity = match reason {
1355 DisconnectReason::Dial(_)
1356 | DisconnectReason::Fetch(_)
1357 | DisconnectReason::Connection(_) => {
1358 if self.is_online() {
1359 Severity::Medium
1362 } else {
1363 Severity::Low
1364 }
1365 }
1366 DisconnectReason::Session(e) => e.severity(),
1367 DisconnectReason::Command
1368 | DisconnectReason::Conflict
1369 | DisconnectReason::Policy
1370 | DisconnectReason::SelfConnection => Severity::Low,
1371 };
1372
1373 if let Err(e) = self
1374 .db
1375 .addresses_mut()
1376 .disconnected(&remote, &addr, severity)
1377 {
1378 debug!(target: "service", "Failed to update address store: {e}");
1379 }
1380 if link.is_outbound() {
1383 self.maintain_connections();
1384 }
1385 }
1386 self.dequeue_fetches();
1387 }
1388
1389 pub fn received_message(&mut self, remote: NodeId, message: Message) {
1390 if let Err(err) = self.handle_message(&remote, message) {
1391 self.outbox
1394 .disconnect(remote, DisconnectReason::Session(err));
1395
1396 }
1399 }
1400
1401 pub fn handle_announcement(
1406 &mut self,
1407 relayer: &NodeId,
1408 relayer_addr: &Address,
1409 announcement: &Announcement,
1410 ) -> Result<Option<gossip::AnnouncementId>, session::Error> {
1411 if !announcement.verify() {
1412 return Err(session::Error::Misbehavior);
1413 }
1414 let Announcement {
1415 node: announcer,
1416 message,
1417 ..
1418 } = announcement;
1419
1420 if announcer == self.nid() {
1422 return Ok(None);
1423 }
1424 let now = self.clock;
1425 let timestamp = message.timestamp();
1426
1427 if timestamp.saturating_sub(now.as_millis()) > MAX_TIME_DELTA.as_millis() as u64 {
1429 return Err(session::Error::InvalidTimestamp(timestamp));
1430 }
1431
1432 if let AnnouncementMessage::Inventory(_) | AnnouncementMessage::Refs(_) = message {
1441 match self.db.addresses().get(announcer) {
1442 Ok(node) => {
1443 if node.is_none() {
1444 debug!(target: "service", "Ignoring announcement from unknown node {announcer} (t={timestamp})");
1445 return Ok(None);
1446 }
1447 }
1448 Err(e) => {
1449 debug!(target: "service", "Failed to look up node in address book: {e}");
1450 return Ok(None);
1451 }
1452 }
1453 }
1454
1455 let relay = match self.db.gossip_mut().announced(announcer, announcement) {
1457 Ok(Some(id)) => {
1458 log::debug!(
1459 target: "service",
1460 "Stored announcement from {announcer} to be broadcast in {} (t={timestamp})",
1461 (self.last_gossip + GOSSIP_INTERVAL) - self.clock
1462 );
1463 self.relayed_by.entry(id).or_default().push(*relayer);
1465
1466 let relay = message.is_node_announcement()
1471 || now - timestamp.to_local_time() <= MAX_TIME_DELTA;
1472 relay.then_some(id)
1473 }
1474 Ok(None) => {
1475 debug!(target: "service", "Ignoring stale announcement from {announcer} (t={timestamp})");
1479 return Ok(None);
1480 }
1481 Err(e) => {
1482 debug!(target: "service", "Failed to update gossip entry from {announcer}: {e}");
1483 return Ok(None);
1484 }
1485 };
1486
1487 match message {
1488 AnnouncementMessage::Inventory(message) => {
1490 self.emitter.emit(Event::InventoryAnnounced {
1491 nid: *announcer,
1492 inventory: message.inventory.to_vec(),
1493 timestamp: message.timestamp,
1494 });
1495 match self.sync_routing(
1496 message.inventory.iter().cloned(),
1497 *announcer,
1498 message.timestamp,
1499 ) {
1500 Ok(synced) => {
1501 if synced.is_empty() {
1502 trace!(target: "service", "No routes updated by inventory announcement from {announcer}");
1503 return Ok(None);
1504 }
1505 }
1506 Err(e) => {
1507 debug!(target: "service", "Failed to process inventory from {announcer}: {e}");
1508 return Ok(None);
1509 }
1510 }
1511 let mut missing = Vec::new();
1512 let nid = *self.nid();
1513
1514 if let Some(sess) = self.sessions.get_mut(announcer) {
1517 for id in message.inventory.as_slice() {
1518 if let Some(sub) = &mut sess.subscribe {
1522 sub.filter.insert(id);
1523 }
1524
1525 if self.policies.is_seeding(id).expect(
1528 "Service::handle_announcement: error accessing seeding configuration",
1529 ) {
1530 match self.db.routing().entry(id, &nid) {
1533 Ok(entry) => {
1534 if entry.is_none() {
1535 missing.push(*id);
1536 }
1537 }
1538 Err(e) => debug!(
1539 target: "service",
1540 "Error checking local inventory for {id}: {e}"
1541 ),
1542 }
1543 }
1544 }
1545 }
1546 self.rng.shuffle(&mut missing);
1551
1552 for rid in missing {
1553 debug!(target: "service", "Missing seeded inventory {rid}; initiating fetch..");
1554 self.fetch(rid, *announcer, vec![], FETCH_TIMEOUT, None);
1555 }
1556 return Ok(relay);
1557 }
1558 AnnouncementMessage::Refs(message) => {
1559 self.emitter.emit(Event::RefsAnnounced {
1560 nid: *announcer,
1561 rid: message.rid,
1562 refs: message.refs.to_vec(),
1563 timestamp: message.timestamp,
1564 });
1565 let Some(refs) = NonEmpty::from_vec(message.refs.to_vec()) else {
1567 debug!(target: "service", "Skipping fetch, no refs in announcement for {} (t={timestamp})", message.rid);
1568 return Ok(None);
1569 };
1570 self.seed_discovered(message.rid, *announcer, message.timestamp);
1573
1574 if let Some(refs) = refs.iter().find(|r| &r.remote == self.nid()) {
1576 debug!(
1577 target: "service",
1578 "Refs announcement of {announcer} for {} contains our own remote at {} (t={})",
1579 message.rid, refs.at, message.timestamp
1580 );
1581 match self.db.seeds_mut().synced(
1582 &message.rid,
1583 announcer,
1584 refs.at,
1585 message.timestamp,
1586 ) {
1587 Ok(updated) => {
1588 if updated {
1589 debug!(
1590 target: "service",
1591 "Updating sync status of {announcer} for {} to {}",
1592 message.rid, refs.at
1593 );
1594 self.emitter.emit(Event::RefsSynced {
1595 rid: message.rid,
1596 remote: *announcer,
1597 at: refs.at,
1598 });
1599 } else {
1600 debug!(
1601 target: "service",
1602 "Sync status of {announcer} was not updated for {}",
1603 message.rid,
1604 );
1605 }
1606 }
1607 Err(e) => {
1608 debug!(target: "service", "Failed to update sync status for {}: {e}", message.rid);
1609 }
1610 }
1611 }
1612 let repo_entry = self.policies.seed_policy(&message.rid).expect(
1613 "Service::handle_announcement: error accessing repo seeding configuration",
1614 );
1615 let SeedingPolicy::Allow { scope } = repo_entry.policy else {
1616 debug!(
1617 target: "service",
1618 "Ignoring refs announcement from {announcer}: repository {} isn't seeded (t={timestamp})",
1619 message.rid
1620 );
1621 return Ok(None);
1622 };
1623 let Some(remote) = self.sessions.get(announcer).cloned() else {
1636 trace!(
1637 target: "service",
1638 "Skipping fetch of {}, no sessions connected to {announcer}",
1639 message.rid
1640 );
1641 return Ok(relay);
1642 };
1643 self.fetch_refs_at(message.rid, remote.id, refs, scope, FETCH_TIMEOUT);
1645
1646 return Ok(relay);
1647 }
1648 AnnouncementMessage::Node(
1649 ann @ NodeAnnouncement {
1650 features,
1651 addresses,
1652 ..
1653 },
1654 ) => {
1655 self.emitter.emit(Event::NodeAnnounced {
1656 nid: *announcer,
1657 alias: ann.alias.clone(),
1658 timestamp: ann.timestamp,
1659 features: *features,
1660 addresses: addresses.to_vec(),
1661 });
1662 if !features.has(Features::SEED) {
1665 return Ok(relay);
1666 }
1667
1668 match self.db.addresses_mut().insert(
1669 announcer,
1670 ann.version,
1671 ann.features,
1672 &ann.alias,
1673 ann.work(),
1674 &ann.agent,
1675 timestamp,
1676 addresses
1677 .iter()
1678 .filter(|a| a.is_routable() || relayer_addr.is_local())
1681 .map(|a| KnownAddress::new(a.clone(), address::Source::Peer)),
1682 ) {
1683 Ok(updated) => {
1684 if updated {
1686 debug!(
1687 target: "service",
1688 "Address store entry for node {announcer} updated at {timestamp}"
1689 );
1690 return Ok(relay);
1691 }
1692 }
1693 Err(err) => {
1694 warn!(target: "service", "Failed to process node announcement from {announcer}: {err}");
1696 }
1697 }
1698 }
1699 }
1700 Ok(None)
1701 }
1702
1703 pub fn handle_info(&mut self, remote: NodeId, info: &Info) -> Result<(), session::Error> {
1704 match info {
1705 Info::RefsAlreadySynced { rid, at } => {
1707 debug!(target: "service", "Refs already synced for {rid} by {remote}");
1708 self.emitter.emit(Event::RefsSynced {
1709 rid: *rid,
1710 remote,
1711 at: *at,
1712 });
1713 }
1714 }
1715
1716 Ok(())
1717 }
1718
1719 pub fn handle_message(
1720 &mut self,
1721 remote: &NodeId,
1722 message: Message,
1723 ) -> Result<(), session::Error> {
1724 let local = self.node_id();
1725 let relay = self.config.is_relay();
1726 let Some(peer) = self.sessions.get_mut(remote) else {
1727 debug!(target: "service", "Session not found for {remote}");
1728 return Ok(());
1729 };
1730 peer.last_active = self.clock;
1731
1732 let limit: RateLimit = match peer.link {
1733 Link::Outbound => self.config.limits.rate.outbound.into(),
1734 Link::Inbound => self.config.limits.rate.inbound.into(),
1735 };
1736 if self
1737 .limiter
1738 .limit(peer.addr.clone().into(), Some(remote), &limit, self.clock)
1739 {
1740 debug!(target: "service", "Rate limiting message from {remote} ({})", peer.addr);
1741 return Ok(());
1742 }
1743 message.log(log::Level::Debug, remote, Link::Inbound);
1744
1745 let connected = match &mut peer.state {
1746 session::State::Disconnected { .. } => {
1747 debug!(target: "service", "Ignoring message from disconnected peer {}", peer.id);
1748 return Ok(());
1749 }
1750 session::State::Attempted | session::State::Initial => {
1757 debug!(target: "service", "Received unexpected message from connecting peer {}", peer.id);
1758 debug!(target: "service", "Transitioning peer {} to 'connected' state", peer.id);
1759
1760 peer.to_connected(self.clock);
1761
1762 None
1763 }
1764 session::State::Connected {
1765 ping, latencies, ..
1766 } => Some((ping, latencies)),
1767 };
1768
1769 trace!(target: "service", "Received message {message:?} from {remote}");
1770
1771 match message {
1772 Message::Announcement(ann) => {
1774 let relayer = remote;
1775 let relayer_addr = peer.addr.clone();
1776
1777 if let Some(id) = self.handle_announcement(relayer, &relayer_addr, &ann)? {
1778 if self.config.is_relay() {
1779 if let AnnouncementMessage::Inventory(_) = ann.message {
1780 if let Err(e) = self
1781 .database_mut()
1782 .gossip_mut()
1783 .set_relay(id, gossip::RelayStatus::Relay)
1784 {
1785 warn!(target: "service", "Failed to set relay flag for message: {e}");
1786 return Ok(());
1787 }
1788 } else {
1789 self.relay(id, ann);
1790 }
1791 }
1792 }
1793 }
1794 Message::Subscribe(subscribe) => {
1795 match self
1797 .db
1798 .gossip()
1799 .filtered(&subscribe.filter, subscribe.since, subscribe.until)
1800 {
1801 Ok(anns) => {
1802 for ann in anns {
1803 let ann = match ann {
1804 Ok(a) => a,
1805 Err(e) => {
1806 debug!(target: "service", "Failed to read gossip message from store: {e}");
1807 continue;
1808 }
1809 };
1810 if ann.node == *remote {
1812 continue;
1813 }
1814 if relay || ann.node == local {
1816 self.outbox.write(peer, ann.into());
1817 }
1818 }
1819 }
1820 Err(e) => {
1821 warn!(target: "service", "Failed to query gossip messages from store: {e}");
1822 }
1823 }
1824 peer.subscribe = Some(subscribe);
1825 }
1826 Message::Info(info) => {
1827 self.handle_info(*remote, &info)?;
1828 }
1829 Message::Ping(Ping { ponglen, .. }) => {
1830 if ponglen > Ping::MAX_PONG_ZEROES {
1832 return Ok(());
1833 }
1834 self.outbox.write(
1835 peer,
1836 Message::Pong {
1837 zeroes: ZeroBytes::new(ponglen),
1838 },
1839 );
1840 }
1841 Message::Pong { zeroes } => {
1842 if let Some((ping, latencies)) = connected {
1843 if let session::PingState::AwaitingResponse {
1844 len: ponglen,
1845 since,
1846 } = *ping
1847 {
1848 if (ponglen as usize) == zeroes.len() {
1849 *ping = session::PingState::Ok;
1850 latencies.push_back(self.clock - since);
1852 if latencies.len() > MAX_LATENCIES {
1853 latencies.pop_front();
1854 }
1855 }
1856 }
1857 }
1858 }
1859 }
1860 Ok(())
1861 }
1862
1863 fn refs_status_of(
1865 &self,
1866 rid: RepoId,
1867 refs: NonEmpty<RefsAt>,
1868 scope: &policy::Scope,
1869 ) -> Result<RefsStatus, Error> {
1870 let mut refs = RefsStatus::new(rid, refs, self.db.refs())?;
1871 if refs.want.is_empty() {
1873 return Ok(refs);
1874 }
1875 let mut refs = match scope {
1877 policy::Scope::All => refs,
1878 policy::Scope::Followed => match self.policies.namespaces_for(&self.storage, &rid) {
1879 Ok(Namespaces::All) => refs,
1880 Ok(Namespaces::Followed(followed)) => {
1881 refs.want.retain(|r| followed.contains(&r.remote));
1882 refs
1883 }
1884 Err(e) => return Err(e.into()),
1885 },
1886 };
1887 refs.want.retain(|r| r.remote != self.node_id());
1889
1890 Ok(refs)
1891 }
1892
1893 fn seed_discovered(&mut self, rid: RepoId, nid: NodeId, time: Timestamp) {
1895 if let Ok(result) = self.db.routing_mut().add_inventory([&rid], nid, time) {
1896 if let &[(_, InsertResult::SeedAdded)] = result.as_slice() {
1897 self.emitter.emit(Event::SeedDiscovered { rid, nid });
1898 debug!(target: "service", "Routing table updated for {rid} with seed {nid}");
1899 }
1900 }
1901 }
1902
1903 fn initial(&mut self, _link: Link) -> Vec<Message> {
1905 let now = self.clock();
1906 let filter = self.filter();
1907
1908 let since = if let Some(last) = self.last_online_at {
1918 Timestamp::from(last - SUBSCRIBE_BACKLOG_DELTA)
1919 } else {
1920 (*now - INITIAL_SUBSCRIBE_BACKLOG_DELTA).into()
1921 };
1922 debug!(target: "service", "Subscribing to messages since timestamp {since}..");
1923
1924 vec![
1925 Message::node(self.node.clone(), &self.signer),
1926 Message::inventory(self.inventory.clone(), &self.signer),
1927 Message::subscribe(filter, since, Timestamp::MAX),
1928 ]
1929 }
1930
1931 fn is_online(&self) -> bool {
1933 self.sessions
1934 .connected()
1935 .filter(|(_, s)| s.addr.is_routable() && s.last_active >= self.clock - IDLE_INTERVAL)
1936 .count()
1937 > 0
1938 }
1939
1940 fn remove_inventory(&mut self, rid: &RepoId) -> Result<bool, Error> {
1942 let node = self.node_id();
1943 let now = self.timestamp();
1944
1945 let removed = self.db.routing_mut().remove_inventory(rid, &node)?;
1946 if removed {
1947 self.refresh_and_announce_inventory(now)?;
1948 }
1949 Ok(removed)
1950 }
1951
1952 fn add_inventory(&mut self, rid: RepoId) -> Result<bool, Error> {
1954 let node = self.node_id();
1955 let now = self.timestamp();
1956
1957 if !self.storage.contains(&rid)? {
1958 debug!(target: "service", "Attempt to add non-existing inventory {rid}: repository not found in storage");
1959 return Ok(false);
1960 }
1961 let updates = self.db.routing_mut().add_inventory([&rid], node, now)?;
1963 let updated = !updates.is_empty();
1964
1965 if updated {
1966 self.refresh_and_announce_inventory(now)?;
1967 }
1968 Ok(updated)
1969 }
1970
1971 fn refresh_and_announce_inventory(&mut self, time: Timestamp) -> Result<(), Error> {
1973 let inventory = self.inventory()?;
1974
1975 self.inventory = gossip::inventory(time, inventory);
1976 self.announce_inventory();
1977
1978 Ok(())
1979 }
1980
1981 fn inventory(&self) -> Result<HashSet<RepoId>, Error> {
1992 self.db
1993 .routing()
1994 .get_inventory(self.nid())
1995 .map_err(Error::from)
1996 }
1997
1998 fn sync_routing(
2002 &mut self,
2003 inventory: impl IntoIterator<Item = RepoId>,
2004 from: NodeId,
2005 timestamp: Timestamp,
2006 ) -> Result<SyncedRouting, Error> {
2007 let mut synced = SyncedRouting::default();
2008 let included = inventory.into_iter().collect::<BTreeSet<_>>();
2009 let mut events = Vec::new();
2010
2011 for (rid, result) in
2012 self.db
2013 .routing_mut()
2014 .add_inventory(included.iter(), from, timestamp)?
2015 {
2016 match result {
2017 InsertResult::SeedAdded => {
2018 debug!(target: "service", "Routing table updated for {rid} with seed {from}");
2019 events.push(Event::SeedDiscovered { rid, nid: from });
2020
2021 if self
2022 .policies
2023 .is_seeding(&rid)
2024 .expect("Service::process_inventory: error accessing seeding configuration")
2025 {
2026 }
2029 synced.added.push(rid);
2030 }
2031 InsertResult::TimeUpdated => {
2032 synced.updated.push(rid);
2033 }
2034 InsertResult::NotUpdated => {}
2035 }
2036 }
2037
2038 synced.removed.extend(
2039 self.db
2040 .routing()
2041 .get_inventory(&from)?
2042 .into_iter()
2043 .filter(|rid| !included.contains(rid)),
2044 );
2045 self.db
2046 .routing_mut()
2047 .remove_inventories(&synced.removed, &from)?;
2048 events.extend(
2049 synced
2050 .removed
2051 .iter()
2052 .map(|&rid| Event::SeedDropped { rid, nid: from }),
2053 );
2054
2055 self.emitter.emit_all(events);
2056
2057 Ok(synced)
2058 }
2059
2060 fn refs_announcement_for(
2062 &mut self,
2063 rid: RepoId,
2064 remotes: impl IntoIterator<Item = NodeId>,
2065 ) -> Result<(Announcement, Vec<RefsAt>), Error> {
2066 let repo = self.storage.repository(rid)?;
2067 let timestamp = self.timestamp();
2068 let mut refs = BoundedVec::<_, REF_REMOTE_LIMIT>::new();
2069
2070 for remote_id in remotes.into_iter() {
2071 let refs_at = RefsAt::new(&repo, remote_id)?;
2072
2073 if refs.push(refs_at).is_err() {
2074 warn!(
2075 target: "service",
2076 "refs announcement limit ({REF_REMOTE_LIMIT}) exceeded, peers will see only some of your repository references",
2077 );
2078 break;
2079 }
2080 }
2081
2082 let msg = AnnouncementMessage::from(RefsAnnouncement {
2083 rid,
2084 refs: refs.clone(),
2085 timestamp,
2086 });
2087 Ok((msg.signed(&self.signer), refs.into()))
2088 }
2089
2090 fn announce_own_refs(
2092 &mut self,
2093 rid: RepoId,
2094 doc: Doc,
2095 namespaces: impl IntoIterator<Item = NodeId>,
2096 ) -> Result<(Vec<RefsAt>, Timestamp), Error> {
2097 let (refs, timestamp) = self.announce_refs(rid, doc, namespaces, true)?;
2098
2099 for r in refs.iter() {
2103 self.emitter.emit(Event::LocalRefsAnnounced {
2104 rid,
2105 refs: *r,
2106 timestamp,
2107 });
2108 if let Err(e) = self.database_mut().refs_mut().set(
2109 &rid,
2110 &r.remote,
2111 &SIGREFS_BRANCH,
2112 r.at,
2113 timestamp.to_local_time(),
2114 ) {
2115 warn!(
2116 target: "service",
2117 "Error updating refs database for `rad/sigrefs` of {} in {rid}: {e}",
2118 r.remote
2119 );
2120 }
2121 }
2122 Ok((refs, timestamp))
2123 }
2124
2125 fn announce_refs(
2127 &mut self,
2128 rid: RepoId,
2129 doc: Doc,
2130 remotes: impl IntoIterator<Item = NodeId>,
2131 own: bool,
2132 ) -> Result<(Vec<RefsAt>, Timestamp), Error> {
2133 let (ann, refs) = self.refs_announcement_for(rid, remotes)?;
2134 let timestamp = ann.timestamp();
2135 let peers = self.sessions.connected().map(|(_, p)| p);
2136
2137 for r in refs.iter().filter(|r| own || r.remote == ann.node) {
2140 info!(
2141 target: "service",
2142 "Announcing refs {rid}/{r} to peers (t={timestamp})..",
2143 );
2144 if let Err(e) = self.db.seeds_mut().synced(&rid, &ann.node, r.at, timestamp) {
2146 warn!(target: "service", "Failed to update sync status for local node: {e}");
2147 } else {
2148 debug!(target: "service", "Saved local sync status for {rid}..");
2149 }
2150 }
2151
2152 self.outbox.announce(
2153 ann,
2154 peers.filter(|p| {
2155 doc.is_visible_to(&p.id.into())
2157 }),
2158 self.db.gossip_mut(),
2159 );
2160 Ok((refs, timestamp))
2161 }
2162
2163 fn reconnect(&mut self, nid: NodeId, addr: Address) -> bool {
2164 if let Some(sess) = self.sessions.get_mut(&nid) {
2165 sess.to_initial();
2166 self.outbox.connect(nid, addr);
2167
2168 return true;
2169 }
2170 false
2171 }
2172
2173 fn connect(&mut self, nid: NodeId, addr: Address) -> Result<(), ConnectError> {
2174 debug!(target: "service", "Connecting to {nid} ({addr})..");
2175
2176 if nid == self.node_id() {
2177 return Err(ConnectError::SelfConnection);
2178 }
2179 if let Ok(true) = self.policies.is_blocked(&nid) {
2180 return Err(ConnectError::Blocked { nid });
2181 }
2182 if !self.is_supported_address(&addr) {
2183 return Err(ConnectError::UnsupportedAddress { nid, addr });
2184 }
2185 if self.sessions.contains_key(&nid) {
2186 return Err(ConnectError::SessionExists { nid });
2187 }
2188 if self.sessions.outbound().count() >= self.config.limits.connection.outbound.into() {
2189 return Err(ConnectError::LimitReached { nid, addr });
2190 }
2191 let persistent = self.config.is_persistent(&nid);
2192 let timestamp: Timestamp = self.clock.into();
2193
2194 if let Err(e) = self.db.addresses_mut().attempted(&nid, &addr, timestamp) {
2195 warn!(target: "service", "Failed to update address book with connection attempt: {e}");
2196 }
2197 self.sessions.insert(
2198 nid,
2199 Session::outbound(nid, addr.clone(), persistent, self.rng.clone()),
2200 );
2201 self.outbox.connect(nid, addr);
2202
2203 Ok(())
2204 }
2205
2206 fn seeds(&self, rid: &RepoId, namespaces: HashSet<PublicKey>) -> Result<Seeds, Error> {
2207 let mut seeds = Seeds::new(self.rng.clone());
2208
2209 if let Ok(repo) = self.storage.repository(*rid) {
2214 for namespace in namespaces.iter() {
2215 let Ok(local) = RefsAt::new(&repo, *namespace) else {
2216 continue;
2217 };
2218
2219 for seed in self.db.seeds().seeds_for(rid)? {
2220 let seed = seed?;
2221 let state = self.sessions.get(&seed.nid).map(|s| s.state.clone());
2222 let synced = if local.at == seed.synced_at.oid {
2223 SyncStatus::Synced { at: seed.synced_at }
2224 } else {
2225 let local = SyncedAt::new(local.at, &repo)?;
2226
2227 SyncStatus::OutOfSync {
2228 local,
2229 remote: seed.synced_at,
2230 }
2231 };
2232 seeds.insert(Seed::new(seed.nid, seed.addresses, state, Some(synced)));
2233 }
2234 }
2235 }
2236
2237 for nid in self.db.routing().get(rid)? {
2241 if namespaces.contains(&nid) {
2242 continue;
2243 }
2244 if seeds.contains(&nid) {
2245 continue;
2247 }
2248 let addrs = self.db.addresses().addresses_of(&nid)?;
2249 let state = self.sessions.get(&nid).map(|s| s.state.clone());
2250
2251 seeds.insert(Seed::new(nid, addrs, state, None));
2252 }
2253 Ok(seeds)
2254 }
2255
2256 fn filter(&self) -> Filter {
2258 if self.config.seeding_policy.is_allow() {
2259 Filter::default()
2261 } else {
2262 self.filter.clone()
2263 }
2264 }
2265
2266 fn timestamp(&mut self) -> Timestamp {
2269 let now = Timestamp::from(self.clock);
2270 if *now > *self.last_timestamp {
2271 self.last_timestamp = now;
2272 } else {
2273 self.last_timestamp = self.last_timestamp + 1;
2274 }
2275 self.last_timestamp
2276 }
2277
2278 fn relay(&mut self, id: gossip::AnnouncementId, ann: Announcement) {
2279 let announcer = ann.node;
2280 let relayed_by = self.relayed_by.get(&id);
2281 let rid = if let AnnouncementMessage::Refs(RefsAnnouncement { rid, .. }) = ann.message {
2282 Some(rid)
2283 } else {
2284 None
2285 };
2286 let relay_to = self
2290 .sessions
2291 .connected()
2292 .filter(|(id, _)| {
2293 relayed_by
2294 .map(|relayers| !relayers.contains(id))
2295 .unwrap_or(true) })
2297 .filter(|(id, _)| **id != announcer)
2298 .filter(|(id, _)| {
2299 if let Some(rid) = rid {
2300 self.storage
2304 .get(rid)
2305 .ok()
2306 .flatten()
2307 .map(|doc| doc.is_visible_to(&(*id).into()))
2308 .unwrap_or(false)
2309 } else {
2310 true
2312 }
2313 })
2314 .map(|(_, p)| p);
2315
2316 self.outbox.relay(ann, relay_to);
2317 }
2318
2319 fn relay_announcements(&mut self) -> Result<(), Error> {
2324 let now = self.clock.into();
2325 let rows = self.database_mut().gossip_mut().relays(now)?;
2326 let local = self.node_id();
2327
2328 for (id, msg) in rows {
2329 let announcer = msg.node;
2330 if announcer == local {
2331 continue;
2333 }
2334 self.relay(id, msg);
2335 }
2336 Ok(())
2337 }
2338
2339 fn announce_inventory(&mut self) {
2341 let timestamp = self.inventory.timestamp.to_local_time();
2342
2343 if self.last_inventory == timestamp {
2344 debug!(target: "service", "Skipping redundant inventory announcement (t={})", self.inventory.timestamp);
2345 return;
2346 }
2347 let msg = AnnouncementMessage::from(self.inventory.clone());
2348
2349 self.outbox.announce(
2350 msg.signed(&self.signer),
2351 self.sessions.connected().map(|(_, p)| p),
2352 self.db.gossip_mut(),
2353 );
2354 self.last_inventory = timestamp;
2355 }
2356
2357 fn prune_routing_entries(&mut self, now: &LocalTime) -> Result<(), routing::Error> {
2358 let count = self.db.routing().len()?;
2359 if count <= self.config.limits.routing_max_size.into() {
2360 return Ok(());
2361 }
2362
2363 let delta = count - usize::from(self.config.limits.routing_max_size);
2364 let nid = self.node_id();
2365 self.db.routing_mut().prune(
2366 (*now - LocalDuration::from(self.config.limits.routing_max_age)).into(),
2367 Some(delta),
2368 &nid,
2369 )?;
2370 Ok(())
2371 }
2372
2373 fn disconnect_unresponsive_peers(&mut self, now: &LocalTime) {
2374 let stale = self
2375 .sessions
2376 .connected()
2377 .filter(|(_, session)| *now - session.last_active >= STALE_CONNECTION_TIMEOUT);
2378
2379 for (_, session) in stale {
2380 debug!(target: "service", "Disconnecting unresponsive peer {}..", session.id);
2381
2382 self.outbox.disconnect(
2386 session.id,
2387 DisconnectReason::Session(session::Error::Timeout),
2388 );
2389 }
2390 }
2391
2392 fn keep_alive(&mut self, now: &LocalTime) {
2394 let inactive_sessions = self
2395 .sessions
2396 .connected_mut()
2397 .filter(|(_, session)| *now - session.last_active >= KEEP_ALIVE_DELTA)
2398 .map(|(_, session)| session);
2399 for session in inactive_sessions {
2400 session.ping(self.clock, &mut self.outbox).ok();
2401 }
2402 }
2403
2404 fn available_peers(&mut self) -> Vec<Peer> {
2406 match self.db.addresses().entries() {
2407 Ok(entries) => {
2408 let mut peers = entries
2411 .filter(|entry| entry.version == PROTOCOL_VERSION)
2412 .filter(|entry| !entry.address.banned)
2413 .filter(|entry| !entry.penalty.is_connect_threshold_reached())
2414 .filter(|entry| !self.sessions.contains_key(&entry.node))
2415 .filter(|entry| !self.policies.is_blocked(&entry.node).unwrap_or(false))
2416 .filter(|entry| !self.config.external_addresses.contains(&entry.address.addr))
2417 .filter(|entry| &entry.node != self.nid())
2418 .filter(|entry| self.is_supported_address(&entry.address.addr))
2419 .fold(HashMap::new(), |mut acc, entry| {
2420 acc.entry(entry.node)
2421 .and_modify(|e: &mut Peer| e.addresses.push(entry.address.clone()))
2422 .or_insert_with(|| Peer {
2423 nid: entry.node,
2424 addresses: vec![entry.address],
2425 penalty: entry.penalty,
2426 });
2427 acc
2428 })
2429 .into_values()
2430 .collect::<Vec<_>>();
2431 peers.sort_by_key(|p| p.penalty);
2432 peers
2433 }
2434 Err(e) => {
2435 warn!(target: "service", "Unable to lookup available peers in address book: {e}");
2436 Vec::new()
2437 }
2438 }
2439 }
2440
2441 fn fetch_missing_repositories(&mut self) -> Result<(), Error> {
2443 let policies = self.policies.seed_policies()?.collect::<Vec<_>>();
2444 for policy in policies {
2445 let policy = match policy {
2446 Ok(policy) => policy,
2447 Err(err) => {
2448 debug!(target: "protocol::filter", "Failed to read seed policy: {err}");
2449 continue;
2450 }
2451 };
2452
2453 let rid = policy.rid;
2454
2455 if !policy.is_allow() {
2456 continue;
2457 }
2458 match self.storage.contains(&rid) {
2459 Ok(exists) => {
2460 if exists {
2461 continue;
2462 }
2463 }
2464 Err(err) => {
2465 log::debug!(target: "protocol::filter", "Failed to check if {rid} exists: {err}");
2466 continue;
2467 }
2468 }
2469 match self.seeds(&rid, [self.node_id()].into()) {
2470 Ok(seeds) => {
2471 if let Some(connected) = NonEmpty::from_vec(seeds.connected().collect()) {
2472 for seed in connected {
2473 self.fetch(rid, seed.nid, vec![], FETCH_TIMEOUT, None);
2474 }
2475 } else {
2476 debug!(target: "service", "No connected seeds found for {rid}..");
2486 }
2487 }
2488 Err(e) => {
2489 debug!(target: "service", "Couldn't fetch missing repo {rid}: failed to lookup seeds: {e}");
2490 }
2491 }
2492 }
2493 Ok(())
2494 }
2495
2496 fn idle_connections(&mut self) {
2498 for (_, sess) in self.sessions.iter_mut() {
2499 sess.idle(self.clock);
2500
2501 if sess.is_stable() {
2502 if let Err(e) =
2504 self.db
2505 .addresses_mut()
2506 .connected(&sess.id, &sess.addr, self.clock.into())
2507 {
2508 warn!(target: "service", "Failed to update address book with connection: {e}");
2509 }
2510 }
2511 }
2512 }
2513
2514 fn maintain_connections(&mut self) {
2516 let PeerConfig::Dynamic = self.config.peers else {
2517 return;
2518 };
2519 trace!(target: "service", "Maintaining connections..");
2520
2521 let target = TARGET_OUTBOUND_PEERS;
2522 let now = self.clock;
2523 let outbound = self
2524 .sessions
2525 .values()
2526 .filter(|s| s.link.is_outbound())
2527 .filter(|s| s.is_connected() || s.is_connecting())
2528 .count();
2529 let wanted = target.saturating_sub(outbound);
2530
2531 if wanted == 0 {
2533 return;
2534 }
2535
2536 let available = self
2538 .available_peers()
2539 .into_iter()
2540 .filter_map(|peer| {
2541 peer.addresses
2542 .into_iter()
2543 .find(|ka| match (ka.last_success, ka.last_attempt) {
2544 (Some(success), Some(attempt)) => {
2547 success >= attempt || now - attempt >= CONNECTION_RETRY_DELTA
2548 }
2549 (None, Some(attempt)) => now - attempt >= CONNECTION_RETRY_DELTA,
2551 (_, None) => true,
2553 })
2554 .map(|ka| (peer.nid, ka))
2555 })
2556 .filter(|(_, ka)| self.is_supported_address(&ka.addr));
2557
2558 let connect = available.take(wanted).collect::<Vec<_>>();
2560 if connect.len() < wanted {
2561 log::debug!(
2562 target: "service",
2563 "Not enough available peers to connect to (available={}, wanted={wanted})",
2564 connect.len()
2565 );
2566 }
2567 for (id, ka) in connect {
2568 if let Err(e) = self.connect(id, ka.addr.clone()) {
2569 warn!(target: "service", "Service::maintain_connections connection error: {e}");
2570 }
2571 }
2572 }
2573
2574 fn maintain_persistent(&mut self) {
2576 trace!(target: "service", "Maintaining persistent peers..");
2577
2578 let now = self.local_time();
2579 let mut reconnect = Vec::new();
2580
2581 for (nid, session) in self.sessions.iter_mut() {
2582 if self.config.is_persistent(nid) {
2583 if self.policies.is_blocked(nid).unwrap_or(false) {
2584 continue;
2585 }
2586 if let session::State::Disconnected { retry_at, .. } = &mut session.state {
2587 if now >= *retry_at {
2591 reconnect.push((*nid, session.addr.clone(), session.attempts()));
2592 }
2593 }
2594 }
2595 }
2596
2597 for (nid, addr, attempts) in reconnect {
2598 if self.reconnect(nid, addr) {
2599 debug!(target: "service", "Reconnecting to {nid} (attempts={attempts})...");
2600 }
2601 }
2602 }
2603
2604 fn is_supported_address(&self, address: &Address) -> bool {
2615 match AddressType::from(address) {
2616 AddressType::Onion => self.config.onion.is_some(),
2618 AddressType::Dns | AddressType::Ipv4 | AddressType::Ipv6 => true,
2619 }
2620 }
2621}
2622
2623pub trait ServiceState {
2625 fn nid(&self) -> &NodeId;
2627 fn sessions(&self) -> &Sessions;
2629 fn fetching(&self) -> &FetcherState;
2631 fn outbox(&self) -> &Outbox;
2633 fn limiter(&self) -> &RateLimiter;
2635 fn emitter(&self) -> &Emitter<Event>;
2637 fn get(&self, rid: RepoId) -> Result<Option<Doc>, RepositoryError>;
2639 fn clock(&self) -> &LocalTime;
2641 fn clock_mut(&mut self) -> &mut LocalTime;
2643 fn config(&self) -> &Config;
2645 fn metrics(&self) -> &Metrics;
2647}
2648
2649impl<D, S, G> ServiceState for Service<D, S, G>
2650where
2651 D: routing::Store,
2652 G: crypto::signature::Signer<crypto::Signature>,
2653 S: ReadStorage,
2654{
2655 fn nid(&self) -> &NodeId {
2656 self.signer.public_key()
2657 }
2658
2659 fn sessions(&self) -> &Sessions {
2660 &self.sessions
2661 }
2662
2663 fn fetching(&self) -> &FetcherState {
2664 self.fetcher.state()
2665 }
2666
2667 fn outbox(&self) -> &Outbox {
2668 &self.outbox
2669 }
2670
2671 fn limiter(&self) -> &RateLimiter {
2672 &self.limiter
2673 }
2674
2675 fn emitter(&self) -> &Emitter<Event> {
2676 &self.emitter
2677 }
2678
2679 fn get(&self, rid: RepoId) -> Result<Option<Doc>, RepositoryError> {
2680 self.storage.get(rid)
2681 }
2682
2683 fn clock(&self) -> &LocalTime {
2684 &self.clock
2685 }
2686
2687 fn clock_mut(&mut self) -> &mut LocalTime {
2688 &mut self.clock
2689 }
2690
2691 fn config(&self) -> &Config {
2692 &self.config
2693 }
2694
2695 fn metrics(&self) -> &Metrics {
2696 &self.metrics
2697 }
2698}
2699
2700#[derive(Debug)]
2702pub enum DisconnectReason {
2703 Dial(Arc<dyn std::error::Error + Sync + Send>),
2706 Connection(Arc<dyn std::error::Error + Sync + Send>),
2709 Fetch(FetchError),
2711 Session(session::Error),
2713 Conflict,
2715 SelfConnection,
2717 Policy,
2719 Command,
2721}
2722
2723impl DisconnectReason {
2724 pub fn is_dial_err(&self) -> bool {
2725 matches!(self, Self::Dial(_))
2726 }
2727
2728 pub fn is_connection_err(&self) -> bool {
2729 matches!(self, Self::Connection(_))
2730 }
2731
2732 pub fn connection() -> Self {
2733 DisconnectReason::Connection(Arc::new(std::io::Error::from(
2734 std::io::ErrorKind::ConnectionReset,
2735 )))
2736 }
2737}
2738
2739impl fmt::Display for DisconnectReason {
2740 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2741 match self {
2742 Self::Dial(err) => write!(f, "{err}"),
2743 Self::Connection(err) => write!(f, "{err}"),
2744 Self::Command => write!(f, "command"),
2745 Self::SelfConnection => write!(f, "self-connection"),
2746 Self::Conflict => write!(f, "conflict"),
2747 Self::Policy => write!(f, "policy"),
2748 Self::Session(err) => write!(f, "{err}"),
2749 Self::Fetch(err) => write!(f, "fetch: {err}"),
2750 }
2751 }
2752}
2753
2754#[derive(Debug)]
2756pub struct Lookup {
2757 pub local: Option<Doc>,
2759 pub remote: Vec<NodeId>,
2761}
2762
2763#[derive(thiserror::Error, Debug)]
2764pub enum LookupError {
2765 #[error(transparent)]
2766 Routing(#[from] routing::Error),
2767 #[error(transparent)]
2768 Repository(#[from] RepositoryError),
2769}
2770
2771#[derive(Debug, Clone)]
2772pub struct Sessions(AddressBook<NodeId, Session>);
2774
2775impl Sessions {
2776 pub fn new(rng: Rng) -> Self {
2777 Self(AddressBook::new(rng))
2778 }
2779
2780 pub fn connected(&self) -> impl Iterator<Item = (&NodeId, &Session)> + Clone {
2782 self.0
2783 .iter()
2784 .filter_map(move |(id, sess)| match &sess.state {
2785 session::State::Connected { .. } => Some((id, sess)),
2786 _ => None,
2787 })
2788 }
2789
2790 pub fn inbound(&self) -> impl Iterator<Item = (&NodeId, &Session)> + Clone {
2792 self.connected().filter(|(_, s)| s.link.is_inbound())
2793 }
2794
2795 pub fn outbound(&self) -> impl Iterator<Item = (&NodeId, &Session)> + Clone {
2797 self.connected().filter(|(_, s)| s.link.is_outbound())
2798 }
2799
2800 pub fn connected_mut(&mut self) -> impl Iterator<Item = (&NodeId, &mut Session)> {
2802 self.0.iter_mut().filter(move |(_, s)| s.is_connected())
2803 }
2804
2805 pub fn disconnected_mut(&mut self) -> impl Iterator<Item = (&NodeId, &mut Session)> {
2807 self.0.iter_mut().filter(move |(_, s)| s.is_disconnected())
2808 }
2809
2810 pub fn is_connected(&self, id: &NodeId) -> bool {
2812 self.0.get(id).map(|s| s.is_connected()).unwrap_or(false)
2813 }
2814
2815 pub fn is_disconnected(&self, id: &NodeId) -> bool {
2817 self.0.get(id).map(|s| s.is_disconnected()).unwrap_or(true)
2818 }
2819}
2820
2821impl Deref for Sessions {
2822 type Target = AddressBook<NodeId, Session>;
2823
2824 fn deref(&self) -> &Self::Target {
2825 &self.0
2826 }
2827}
2828
2829impl DerefMut for Sessions {
2830 fn deref_mut(&mut self) -> &mut Self::Target {
2831 &mut self.0
2832 }
2833}