1use std::collections::hash_map::Entry;
6use std::collections::VecDeque;
7use std::os::unix::io::{AsRawFd, RawFd};
8use std::sync::Arc;
9use std::{io, net, time};
10
11use amplify::Wrapper as _;
12use crossbeam_channel as chan;
13use cyphernet::addr::{HostName, InetHost, NetAddr};
14use cyphernet::encrypt::noise::{HandshakePattern, Keyset, NoiseState};
15use cyphernet::proxy::socks5;
16use cyphernet::{Digest, EcSk, Ecdh, Sha256};
17use localtime::LocalTime;
18use netservices::resource::{ListenerEvent, NetAccept, NetTransport, SessionEvent};
19use netservices::session::{NoiseSession, ProtocolArtifact, Socks5Session};
20use netservices::{NetConnection, NetReader, NetWriter};
21use radicle::node::device::Device;
22use reactor::{ResourceId, ResourceType, Timestamp};
23
24use radicle::collections::RandomMap;
25use radicle::crypto;
26use radicle::node::config::AddressConfig;
27use radicle::node::Link;
28use radicle::node::NodeId;
29use radicle::storage::WriteStorage;
30use radicle_protocol::deserializer::Deserializer;
31pub use radicle_protocol::wire::frame;
32pub use radicle_protocol::wire::frame::{Frame, FrameData, StreamId};
33pub use radicle_protocol::wire::*;
34use radicle_protocol::worker::{FetchRequest, FetchResult};
35
36use crate::service;
37use crate::service::io::Io;
38use crate::service::FETCH_TIMEOUT;
39use crate::service::{session, DisconnectReason, Metrics, Service};
40use crate::worker;
41use crate::worker::{ChannelEvent, ChannelsConfig};
42use crate::worker::{Task, TaskResult};
43
44pub const NOISE_XK: HandshakePattern = HandshakePattern {
46 initiator: cyphernet::encrypt::noise::InitiatorPattern::Xmitted,
47 responder: cyphernet::encrypt::noise::OneWayPattern::Known,
48};
49
50pub const DEFAULT_CONNECTION_TIMEOUT: time::Duration = time::Duration::from_secs(6);
52
53pub const DEFAULT_DIAL_TIMEOUT: time::Duration = time::Duration::from_secs(6);
55
56pub const MAX_INBOX_SIZE: usize = 1024 * 1024 * 2;
58
59#[allow(clippy::large_enum_variant)]
61#[derive(Debug)]
62pub enum Control {
63 User(service::Command),
65 Worker(TaskResult),
67 Flush { remote: NodeId, stream: StreamId },
69}
70
71pub type WireSession<G> = NoiseSession<G, Sha256, Socks5Session<net::TcpStream>>;
73pub type WireReader = NetReader<Socks5Session<net::TcpStream>>;
75pub type WireWriter<G> = NetWriter<NoiseState<G, Sha256>, Socks5Session<net::TcpStream>>;
77
78type Action<G> = reactor::Action<NetAccept<WireSession<G>>, NetTransport<WireSession<G>>>;
80
81struct Stream {
83 channels: worker::Channels,
85 sent_bytes: usize,
87 received_bytes: usize,
89}
90
91impl Stream {
92 fn new(channels: worker::Channels) -> Self {
93 Self {
94 channels,
95 sent_bytes: 0,
96 received_bytes: 0,
97 }
98 }
99}
100
101struct Streams {
103 streams: RandomMap<StreamId, Stream>,
107 link: Link,
109 seq: u64,
111}
112
113impl Streams {
114 fn new(link: Link) -> Self {
116 Self {
117 streams: RandomMap::default(),
118 link,
119 seq: 0,
120 }
121 }
122
123 fn get(&self, stream: &StreamId) -> Option<&Stream> {
125 self.streams.get(stream)
126 }
127
128 fn get_mut(&mut self, stream: &StreamId) -> Option<&mut Stream> {
130 self.streams.get_mut(stream)
131 }
132
133 fn open(&mut self, config: ChannelsConfig) -> (StreamId, worker::Channels) {
135 self.seq += 1;
136
137 let id = StreamId::git(self.link)
138 .nth(self.seq)
139 .expect("Streams::open: too many streams");
140 let channels = self
141 .register(id, config)
142 .expect("Streams::open: stream was already open");
143
144 (id, channels)
145 }
146
147 fn register(&mut self, stream: StreamId, config: ChannelsConfig) -> Option<worker::Channels> {
149 let (wire, worker) = worker::Channels::pair(config)
150 .expect("Streams::register: fatal: unable to create channels");
151
152 match self.streams.entry(stream) {
153 Entry::Vacant(e) => {
154 e.insert(Stream::new(worker));
155 Some(wire)
156 }
157 Entry::Occupied(_) => None,
158 }
159 }
160
161 fn unregister(&mut self, stream: &StreamId) -> Option<Stream> {
163 self.streams.remove(stream)
164 }
165
166 fn shutdown(&mut self) {
168 for (sid, stream) in self.streams.drain() {
169 log::debug!(target: "wire", "Closing worker stream {sid}");
170 stream.channels.close().ok();
171 }
172 }
173}
174
175#[derive(Debug)]
177struct Outbound {
178 id: Option<ResourceId>,
180 addr: NetAddr<HostName>,
182 nid: NodeId,
184}
185
186#[derive(Debug)]
188struct Inbound {
189 id: Option<ResourceId>,
191 addr: NetAddr<HostName>,
193}
194
195enum Peer {
197 Connected {
200 #[allow(dead_code)]
201 addr: NetAddr<HostName>,
202 link: Link,
203 nid: NodeId,
204 inbox: Deserializer<MAX_INBOX_SIZE, Frame>,
205 streams: Streams,
206 },
207 Disconnecting {
210 link: Link,
211 nid: Option<NodeId>,
212 reason: DisconnectReason,
213 },
214}
215
216impl std::fmt::Debug for Peer {
217 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
218 match self {
219 Self::Connected { link, nid, .. } => write!(f, "Connected({link:?}, {nid})"),
220 Self::Disconnecting { .. } => write!(f, "Disconnecting"),
221 }
222 }
223}
224
225impl Peer {
226 fn id(&self) -> Option<&NodeId> {
228 match self {
229 Peer::Connected { nid, .. } | Peer::Disconnecting { nid: Some(nid), .. } => Some(nid),
230 Peer::Disconnecting { nid: None, .. } => None,
231 }
232 }
233
234 fn link(&self) -> Link {
235 match self {
236 Peer::Connected { link, .. } => *link,
237 Peer::Disconnecting { link, .. } => *link,
238 }
239 }
240
241 fn connected(nid: NodeId, addr: NetAddr<HostName>, link: Link) -> Self {
243 Self::Connected {
244 link,
245 addr,
246 nid,
247 inbox: Deserializer::default(),
248 streams: Streams::new(link),
249 }
250 }
251}
252
253struct Peers(RandomMap<ResourceId, Peer>);
255
256impl Peers {
257 fn get_mut(&mut self, id: &ResourceId) -> Option<&mut Peer> {
258 self.0.get_mut(id)
259 }
260
261 fn entry(&mut self, id: ResourceId) -> Entry<ResourceId, Peer> {
262 self.0.entry(id)
263 }
264
265 fn insert(&mut self, id: ResourceId, peer: Peer) {
266 if self.0.insert(id, peer).is_some() {
267 log::warn!(target: "wire", "Replacing existing peer id={id}");
268 }
269 }
270
271 fn remove(&mut self, id: &ResourceId) -> Option<Peer> {
272 self.0.remove(id)
273 }
274
275 fn lookup(&self, node_id: &NodeId) -> Option<(ResourceId, &Peer)> {
276 self.0
277 .iter()
278 .find(|(_, peer)| peer.id() == Some(node_id))
279 .map(|(fd, peer)| (*fd, peer))
280 }
281
282 fn lookup_mut(&mut self, node_id: &NodeId) -> Option<(ResourceId, &mut Peer)> {
283 self.0
284 .iter_mut()
285 .find(|(_, peer)| peer.id() == Some(node_id))
286 .map(|(fd, peer)| (*fd, peer))
287 }
288
289 fn active(&self) -> impl Iterator<Item = (ResourceId, &NodeId, Link)> {
290 self.0.iter().filter_map(|(id, peer)| match peer {
291 Peer::Connected { nid, link, .. } => Some((*id, nid, *link)),
292 Peer::Disconnecting { .. } => None,
293 })
294 }
295
296 fn connected(&self) -> impl Iterator<Item = (ResourceId, &NodeId)> {
297 self.0.iter().filter_map(|(id, peer)| {
298 if let Peer::Connected { nid, .. } = peer {
299 Some((*id, nid))
300 } else {
301 None
302 }
303 })
304 }
305
306 fn iter(&self) -> impl Iterator<Item = &Peer> {
307 self.0.values()
308 }
309}
310
311pub struct Wire<D, S, G: crypto::signature::Signer<crypto::Signature> + Ecdh> {
313 service: Service<D, S, G>,
315 worker: chan::Sender<Task>,
317 signer: Device<G>,
319 metrics: service::Metrics,
321 actions: VecDeque<Action<G>>,
323 outbound: RandomMap<RawFd, Outbound>,
325 inbound: RandomMap<RawFd, Inbound>,
327 listening: RandomMap<RawFd, net::SocketAddr>,
329 peers: Peers,
331}
332
333impl<D, S, G> Wire<D, S, G>
334where
335 D: service::Store,
336 S: WriteStorage + 'static,
337 G: crypto::signature::Signer<crypto::Signature> + Ecdh<Pk = NodeId>,
338{
339 pub fn new(service: Service<D, S, G>, worker: chan::Sender<Task>, signer: Device<G>) -> Self {
340 assert!(service.started().is_some(), "Service must be initialized");
341
342 Self {
343 service,
344 worker,
345 signer,
346 metrics: Metrics::default(),
347 actions: VecDeque::new(),
348 inbound: RandomMap::default(),
349 outbound: RandomMap::default(),
350 listening: RandomMap::default(),
351 peers: Peers(RandomMap::default()),
352 }
353 }
354
355 pub fn listen(&mut self, socket: NetAccept<WireSession<G>>) {
356 self.listening
357 .insert(socket.as_raw_fd(), socket.local_addr());
358 self.actions.push_back(Action::RegisterListener(socket));
359 }
360
361 fn disconnect(&mut self, id: ResourceId, reason: DisconnectReason) -> Option<(NodeId, Link)> {
362 match self.peers.entry(id) {
363 Entry::Vacant(_) => {
364 log::debug!(target: "wire", "Disconnecting pending peer with id={id}: {reason}");
366 self.actions.push_back(Action::UnregisterTransport(id));
367
368 self.outbound
371 .values()
372 .find(|o| o.id == Some(id))
373 .map(|o| (o.nid, Link::Outbound))
374 }
375 Entry::Occupied(mut e) => match e.get_mut() {
376 Peer::Disconnecting { nid, link, .. } => {
377 log::error!(target: "wire", "Peer with id={id} is already disconnecting");
378
379 nid.map(|n| (n, *link))
380 }
381 Peer::Connected {
382 nid, streams, link, ..
383 } => {
384 log::debug!(target: "wire", "Disconnecting peer with id={id}: {reason}");
385 let nid = *nid;
386 let link = *link;
387
388 streams.shutdown();
389 e.insert(Peer::Disconnecting {
390 nid: Some(nid),
391 link,
392 reason,
393 });
394 self.actions.push_back(Action::UnregisterTransport(id));
395
396 Some((nid, link))
397 }
398 },
399 }
400 }
401
402 fn worker_result(&mut self, task: TaskResult) {
403 log::debug!(
404 target: "wire",
405 "Received fetch result from worker for stream {}, remote {}: {:?}",
406 task.stream, task.remote, task.result
407 );
408
409 let nid = task.remote;
410 let Some((fd, peer)) = self.peers.lookup_mut(&nid) else {
411 log::warn!(target: "wire", "Peer {nid} not found; ignoring fetch result");
412 return;
413 };
414
415 if let Peer::Connected { link, streams, .. } = peer {
416 if let Some(s) = streams.unregister(&task.stream) {
420 log::debug!(
421 target: "wire", "Stream {} of {} closing with {} byte(s) sent and {} byte(s) received",
422 task.stream, task.remote, s.sent_bytes, s.received_bytes
423 );
424 let frame = Frame::<service::Message>::control(
425 *link,
426 frame::Control::Close {
427 stream: task.stream,
428 },
429 );
430 self.actions
431 .push_back(Action::Send(fd, frame.encode_to_vec()));
432 }
433 } else {
434 log::warn!(target: "wire", "Peer {nid} is not connected; ignoring fetch result");
437 return;
438 };
439
440 match task.result {
442 FetchResult::Initiator { rid, result } => {
443 self.service.fetched(rid, nid, result);
444 }
445 FetchResult::Responder { rid, result } => {
446 if let Some(rid) = rid {
447 if let Some(err) = result.err() {
448 log::info!(target: "wire", "Peer {nid} failed to fetch {rid} from us: {err}");
449 } else {
450 log::info!(target: "wire", "Peer {nid} fetched {rid} from us successfully");
451 }
452 }
453 }
454 }
455 }
456
457 fn flush(&mut self, remote: NodeId, stream: StreamId) {
458 let Some((fd, peer)) = self.peers.lookup_mut(&remote) else {
459 log::warn!(target: "wire", "Peer {remote} is not known; ignoring flush");
460 return;
461 };
462 let Peer::Connected { streams, link, .. } = peer else {
463 log::warn!(target: "wire", "Peer {remote} is not connected; ignoring flush");
464 return;
465 };
466 let Some(s) = streams.get_mut(&stream) else {
467 log::debug!(target: "wire", "Stream {stream} cannot be found; ignoring flush");
468 return;
469 };
470 let metrics = self.metrics.peer(remote);
471
472 for data in s.channels.try_iter() {
473 let frame = match data {
474 ChannelEvent::Data(data) => {
475 metrics.sent_git_bytes += data.len();
476 metrics.sent_bytes += data.len();
477 Frame::<service::Message>::git(stream, data)
478 }
479 ChannelEvent::Close => Frame::control(*link, frame::Control::Close { stream }),
480 ChannelEvent::Eof => Frame::control(*link, frame::Control::Eof { stream }),
481 };
482 self.actions
483 .push_back(reactor::Action::Send(fd, frame.encode_to_vec()));
484 }
485 }
486
487 fn cleanup(&mut self, id: ResourceId, fd: RawFd) {
488 if self.inbound.remove(&fd).is_some() {
489 log::debug!(target: "wire", "Cleaning up inbound peer state with id={id} (fd={fd})");
490 } else if let Some(outbound) = self.outbound.remove(&fd) {
491 log::debug!(target: "wire", "Cleaning up outbound peer state with id={id} (fd={fd})");
492 self.service.disconnected(
493 outbound.nid,
494 Link::Outbound,
495 &DisconnectReason::connection(),
496 );
497 } else {
498 log::debug!(target: "wire", "Tried to cleanup unknown peer with id={id} (fd={fd})");
499 }
500 }
501}
502
503impl<D, S, G> reactor::Handler for Wire<D, S, G>
504where
505 D: service::Store + Send,
506 S: WriteStorage + Send + 'static,
507 G: crypto::signature::Signer<crypto::Signature> + Ecdh<Pk = NodeId> + Clone + Send,
508{
509 type Listener = NetAccept<WireSession<G>>;
510 type Transport = NetTransport<WireSession<G>>;
511 type Command = Control;
512
513 fn tick(&mut self, time: Timestamp) {
514 self.metrics.open_channels = self
515 .peers
516 .iter()
517 .filter_map(|p| {
518 if let Peer::Connected { streams, .. } = p {
519 Some(streams.streams.len())
520 } else {
521 None
522 }
523 })
524 .sum();
525 self.metrics.worker_queue_size = self.worker.len();
526 self.service.tick(
527 LocalTime::from_millis(time.as_millis() as u128),
528 &self.metrics,
529 );
530 }
531
532 fn handle_timer(&mut self) {
533 self.service.wake();
534 }
535
536 fn handle_listener_event(
537 &mut self,
538 _: ResourceId, event: ListenerEvent<WireSession<G>>,
540 _: Timestamp,
541 ) {
542 match event {
543 ListenerEvent::Accepted(connection) => {
544 let Ok(remote) = connection.remote_addr() else {
545 log::warn!(target: "wire", "Accepted connection doesn't have remote address; dropping..");
546 drop(connection);
547
548 return;
549 };
550 let InetHost::Ip(ip) = remote.host else {
551 log::error!(target: "wire", "Unexpected host type for inbound connection {remote}; dropping..");
552 drop(connection);
553
554 return;
555 };
556 let fd = connection.as_raw_fd();
557 log::debug!(target: "wire", "Inbound connection from {remote} (fd={fd})..");
558
559 if !self.service.accepted(ip) {
562 log::debug!(target: "wire", "Rejecting inbound connection from {ip} (fd={fd})..");
563 drop(connection);
564
565 return;
566 }
567
568 let session = match accept::<G>(
569 remote.clone().into(),
570 connection,
571 self.signer.clone().into_inner(),
572 ) {
573 Ok(s) => s,
574 Err(e) => {
575 log::error!(target: "wire", "Error creating session for {ip}: {e}");
576 return;
577 }
578 };
579 let transport = match NetTransport::with_session(
580 session,
581 netservices::Direction::Inbound,
582 ) {
583 Ok(transport) => transport,
584 Err(err) => {
585 log::error!(target: "wire", "Failed to create transport for accepted connection: {err}");
586 return;
587 }
588 };
589 log::debug!(target: "wire", "Accepted inbound connection from {remote} (fd={fd})..");
590
591 self.inbound.insert(
592 fd,
593 Inbound {
594 id: None,
595 addr: remote.into(),
596 },
597 );
598 self.actions
599 .push_back(reactor::Action::RegisterTransport(transport))
600 }
601 ListenerEvent::Failure(err) => {
602 log::error!(target: "wire", "Error listening for inbound connections: {err}");
603 }
604 }
605 }
606
607 fn handle_registered(&mut self, fd: RawFd, id: ResourceId, typ: ResourceType) {
608 match typ {
609 ResourceType::Listener => {
610 if let Some(local_addr) = self.listening.remove(&fd) {
611 self.service.listening(local_addr);
612 }
613 }
614 ResourceType::Transport => {
615 if let Some(outbound) = self.outbound.get_mut(&fd) {
616 log::debug!(target: "wire", "Outbound peer resource registered for {} with id={id} (fd={fd})", outbound.nid);
617 outbound.id = Some(id);
618 } else if let Some(inbound) = self.inbound.get_mut(&fd) {
619 log::debug!(target: "wire", "Inbound peer resource registered with id={id} (fd={fd})");
620 inbound.id = Some(id);
621 } else {
622 log::warn!(target: "wire", "Unknown peer registered with fd={fd} and id={id}");
623 }
624 }
625 }
626 }
627
628 fn handle_transport_event(
629 &mut self,
630 id: ResourceId,
631 event: SessionEvent<WireSession<G>>,
632 _: Timestamp,
633 ) {
634 match event {
635 SessionEvent::Established(fd, ProtocolArtifact { state, .. }) => {
636 let nid: NodeId = state.remote_static_key.unwrap();
638 if &nid == self.signer.public_key() {
640 log::error!(target: "wire", "Self-connection detected, disconnecting..");
641 self.disconnect(id, DisconnectReason::SelfConnection);
642
643 return;
644 }
645 let (addr, link) = if let Some(peer) = self.inbound.remove(&fd) {
646 self.metrics.peer(nid).inbound_connection_attempts += 1;
647 (peer.addr, Link::Inbound)
648 } else if let Some(peer) = self.outbound.remove(&fd) {
649 assert_eq!(nid, peer.nid);
650 (peer.addr, Link::Outbound)
651 } else {
652 log::error!(target: "wire", "Session for {nid} (id={id}) not found");
653 return;
654 };
655 log::debug!(
656 target: "wire",
657 "Session established with {nid} (id={id}) (fd={fd}) ({})",
658 if link.is_inbound() { "inbound" } else { "outbound" }
659 );
660
661 let mut disconnect = Vec::new();
663
664 {
670 let precedence = *self.signer.public_key() > nid;
674
675 let mut conflicting = Vec::new();
679
680 conflicting.extend(
682 self.peers
683 .active()
684 .filter(|(c_id, d, _)| **d == nid && *c_id != id)
685 .map(|(c_id, _, link)| (c_id, link)),
686 );
687
688 conflicting.extend(self.outbound.iter().filter_map(|(c_fd, other)| {
691 if other.nid == nid && *c_fd != fd {
692 other.id.map(|c_id| (c_id, Link::Outbound))
693 } else {
694 None
695 }
696 }));
697
698 for (c_id, c_link) in conflicting {
699 let close = match (link, c_link) {
704 (Link::Inbound, Link::Outbound) => {
705 if precedence {
706 id
707 } else {
708 c_id
709 }
710 }
711 (Link::Outbound, Link::Inbound) => {
712 if precedence {
713 c_id
714 } else {
715 id
716 }
717 }
718 (Link::Inbound, Link::Inbound) => id.max(c_id),
719 (Link::Outbound, Link::Outbound) => id.max(c_id),
720 };
721
722 log::warn!(
723 target: "wire", "Established session (id={id}) conflicts with existing session for {nid} (id={c_id})"
724 );
725 disconnect.push(close);
726 }
727 }
728 for id in &disconnect {
729 log::warn!(
730 target: "wire", "Closing conflicting session (id={id}) with {nid}.."
731 );
732 if let Some((nid, link)) = self.disconnect(*id, DisconnectReason::Conflict) {
734 self.service
738 .disconnected(nid, link, &DisconnectReason::Conflict);
739 }
740 }
741 if !disconnect.contains(&id) {
742 self.peers
743 .insert(id, Peer::connected(nid, addr.clone(), link));
744 self.service.connected(nid, addr.into(), link);
745 }
746 }
747 SessionEvent::Data(data) => {
748 if let Some(Peer::Connected {
749 nid,
750 inbox,
751 streams,
752 ..
753 }) = self.peers.get_mut(&id)
754 {
755 let metrics = self.metrics.peer(*nid);
756 metrics.received_bytes += data.len();
757
758 if inbox.input(&data).is_err() {
759 log::error!(target: "wire", "Maximum inbox size ({MAX_INBOX_SIZE}) reached for peer {nid}");
760 log::error!(target: "wire", "Unable to process messages fast enough for peer {nid}; disconnecting..");
761 self.disconnect(id, DisconnectReason::Session(session::Error::Misbehavior));
762
763 return;
764 }
765
766 loop {
767 match inbox.deserialize_next() {
768 Ok(Some(Frame {
769 data: FrameData::Control(frame::Control::Open { stream }),
770 ..
771 })) => {
772 log::debug!(target: "wire", "Received `open` command for stream {stream} from {nid}");
773 metrics.streams_opened += 1;
774 metrics.received_fetch_requests += 1;
775 let reader_limit = self.service.config().limits.fetch_pack_receive;
776 let Some(channels) = streams.register(
777 stream,
778 ChannelsConfig::new(FETCH_TIMEOUT)
779 .with_reader_limit(reader_limit),
780 ) else {
781 log::warn!(target: "wire", "Peer attempted to open already-open stream stream {stream}");
782 continue;
783 };
784
785 let task = Task {
786 fetch: FetchRequest::Responder {
787 remote: *nid,
788 emitter: self.service.emitter(),
789 },
790 stream,
791 channels,
792 };
793 if let Err(e) = self.worker.try_send(task) {
794 log::error!(
795 target: "wire",
796 "Worker pool failed to accept incoming fetch request: {e}"
797 );
798 }
799 }
800 Ok(Some(Frame {
801 data: FrameData::Control(frame::Control::Eof { stream }),
802 ..
803 })) => {
804 if let Some(s) = streams.get(&stream) {
805 log::debug!(target: "wire", "Received `end-of-file` on stream {stream} from {nid}");
806
807 if s.channels.send(ChannelEvent::Eof).is_err() {
808 log::error!(target: "wire", "Worker is disconnected; cannot send `EOF`");
809 }
810 } else {
811 log::debug!(target: "wire", "Ignoring frame on closed or unknown stream {stream}");
812 }
813 }
814 Ok(Some(Frame {
815 data: FrameData::Control(frame::Control::Close { stream }),
816 ..
817 })) => {
818 log::debug!(target: "wire", "Received `close` command for stream {stream} from {nid}");
819
820 if let Some(s) = streams.unregister(&stream) {
821 log::debug!(
822 target: "wire",
823 "Stream {stream} of {nid} closed with {} byte(s) sent and {} byte(s) received",
824 s.sent_bytes, s.received_bytes
825 );
826 s.channels.close().ok();
827 }
828 }
829 Ok(Some(Frame {
830 data: FrameData::Gossip(msg),
831 ..
832 })) => {
833 metrics.received_gossip_messages += 1;
834 self.service.received_message(*nid, msg);
835 }
836 Ok(Some(Frame {
837 stream,
838 data: FrameData::Git(data),
839 ..
840 })) => {
841 if let Some(s) = streams.get_mut(&stream) {
842 metrics.received_git_bytes += data.len();
843
844 if s.channels.send(ChannelEvent::Data(data)).is_err() {
845 log::error!(target: "wire", "Worker is disconnected; cannot send data");
846 }
847 } else {
848 log::debug!(target: "wire", "Ignoring frame on closed or unknown stream {stream}");
849 }
850 }
851 Ok(None) => {
852 break;
854 }
855 Err(e) => {
856 log::error!(target: "wire", "Invalid gossip message from {nid}: {e}");
857
858 if !inbox.is_empty() {
859 log::debug!(target: "wire", "Dropping read buffer for {nid} with {} bytes", inbox.len());
860 }
861 self.disconnect(
862 id,
863 DisconnectReason::Session(session::Error::Misbehavior),
864 );
865 break;
866 }
867 }
868 }
869 } else {
870 log::warn!(target: "wire", "Dropping message from unconnected peer (id={id})");
871 }
872 }
873 SessionEvent::Terminated(err) => {
874 self.disconnect(id, DisconnectReason::Connection(Arc::new(err)));
875 }
876 }
877 }
878
879 fn handle_command(&mut self, cmd: Self::Command) {
880 match cmd {
881 Control::User(cmd) => self.service.command(cmd),
882 Control::Worker(result) => self.worker_result(result),
883 Control::Flush { remote, stream } => self.flush(remote, stream),
884 }
885 }
886
887 fn handle_error(
888 &mut self,
889 err: reactor::Error<NetAccept<WireSession<G>>, NetTransport<WireSession<G>>>,
890 ) {
891 match err {
892 reactor::Error::Poll(err) => {
893 log::error!(target: "wire", "Can't poll connections: {err}");
895 }
896 reactor::Error::ListenerDisconnect(id, _) => {
897 log::error!(target: "wire", "Listener {id} disconnected");
899 }
900 reactor::Error::TransportDisconnect(id, transport) => {
901 let fd = transport.as_raw_fd();
902 log::error!(target: "wire", "Peer id={id} (fd={fd}) disconnected");
903
904 drop(transport);
906
907 match self.peers.remove(&id) {
911 Some(mut peer) => {
912 if let Peer::Connected { streams, .. } = &mut peer {
913 streams.shutdown();
914 }
915
916 if let Some(id) = peer.id() {
917 self.service.disconnected(
918 *id,
919 peer.link(),
920 &DisconnectReason::connection(),
921 );
922 } else {
923 log::debug!(target: "wire", "Inbound disconnection before handshake; ignoring..")
924 }
925 }
926 None => self.cleanup(id, fd),
927 }
928 }
929 }
930 }
931
932 fn handover_listener(&mut self, id: ResourceId, _listener: Self::Listener) {
933 log::error!(target: "wire", "Listener handover is not supported (id={id})");
934 }
935
936 fn handover_transport(&mut self, id: ResourceId, transport: Self::Transport) {
937 let fd = transport.as_raw_fd();
938
939 match self.peers.entry(id) {
940 Entry::Occupied(e) => {
941 match e.get() {
942 Peer::Disconnecting {
943 nid, reason, link, ..
944 } => {
945 log::debug!(target: "wire", "Transport handover for disconnecting peer with id={id} (fd={fd})");
946
947 drop(transport);
949
950 if let Some(nid) = nid {
952 self.service.disconnected(*nid, *link, reason);
959 }
960 e.remove();
961 }
962 Peer::Connected { nid, .. } => {
963 panic!("Wire::handover_transport: Unexpected handover of connected peer {nid} with id={id} (fd={fd})");
964 }
965 }
966 }
967 Entry::Vacant(_) => self.cleanup(id, fd),
968 }
969 }
970}
971
972impl<D, S, G> Iterator for Wire<D, S, G>
973where
974 D: service::Store,
975 S: WriteStorage + 'static,
976 G: crypto::signature::Signer<crypto::Signature> + Ecdh<Pk = NodeId> + Clone,
977{
978 type Item = Action<G>;
979
980 fn next(&mut self) -> Option<Self::Item> {
981 while let Some(ev) = self.service.next() {
982 match ev {
983 Io::Write(node_id, msgs) => {
984 let (fd, link) = match self.peers.lookup(&node_id) {
985 Some((fd, Peer::Connected { link, .. })) => (fd, *link),
986 Some((_, peer)) => {
987 log::debug!(target: "wire", "Dropping {} message(s) to {node_id} ({peer:?})", msgs.len());
990 continue;
991 }
992 None => {
993 log::error!(target: "wire", "Dropping {} message(s) to {node_id}: unknown peer", msgs.len());
994 continue;
995 }
996 };
997 log::trace!(
998 target: "wire", "Writing {} message(s) to {}", msgs.len(), node_id
999 );
1000 let mut data = Vec::new();
1001 let metrics = self.metrics.peer(node_id);
1002 metrics.sent_gossip_messages += msgs.len();
1003
1004 for msg in msgs {
1005 Frame::gossip(link, msg).encode(&mut data);
1006 }
1007 metrics.sent_bytes += data.len();
1008
1009 self.actions.push_back(reactor::Action::Send(fd, data));
1010 }
1011 Io::Connect(node_id, addr) => {
1012 if self.peers.connected().any(|(_, id)| id == &node_id) {
1013 log::error!(
1014 target: "wire",
1015 "Attempt to connect to already connected peer {node_id}"
1016 );
1017 continue;
1020 }
1021 self.service.attempted(node_id, addr.clone());
1022 self.metrics.peer(node_id).outbound_connection_attempts += 1;
1023
1024 match dial::<G>(
1025 addr.to_inner(),
1026 node_id,
1027 self.signer.clone().into_inner(),
1028 self.service.config(),
1029 )
1030 .and_then(|session| {
1031 NetTransport::<WireSession<G>>::with_session(
1032 session,
1033 netservices::Direction::Outbound,
1034 )
1035 }) {
1036 Ok(transport) => {
1037 self.outbound.insert(
1038 transport.as_raw_fd(),
1039 Outbound {
1040 id: None,
1041 nid: node_id,
1042 addr: addr.to_inner(),
1043 },
1044 );
1045 log::debug!(
1046 target: "wire",
1047 "Registering outbound transport for {node_id} (fd={})..",
1048 transport.as_raw_fd()
1049 );
1050 self.actions
1051 .push_back(reactor::Action::RegisterTransport(transport));
1052 }
1053 Err(err) => {
1054 log::error!(target: "wire", "Error establishing connection to {addr}: {err}");
1055
1056 self.service.disconnected(
1057 node_id,
1058 Link::Outbound,
1059 &DisconnectReason::Dial(Arc::new(err)),
1060 );
1061 }
1062 }
1063 }
1064 Io::Disconnect(nid, reason) => {
1065 if let Some((id, Peer::Connected { .. })) = self.peers.lookup(&nid) {
1066 if let Some((nid, _)) = self.disconnect(id, reason) {
1067 self.metrics.peer(nid).disconnects += 1;
1068 }
1069 } else {
1070 log::warn!(target: "wire", "Peer {nid} is not connected: ignoring disconnect");
1071 }
1072 }
1073 Io::Wakeup(d) => {
1074 self.actions.push_back(reactor::Action::SetTimer(d.into()));
1075 }
1076 Io::Fetch {
1077 rid,
1078 remote,
1079 timeout,
1080 reader_limit,
1081 refs_at,
1082 } => {
1083 log::trace!(target: "wire", "Processing fetch for {rid} from {remote}..");
1084
1085 let Some((fd, Peer::Connected { link, streams, .. })) =
1086 self.peers.lookup_mut(&remote)
1087 else {
1088 log::error!(target: "wire", "Peer {remote} is not connected: dropping fetch");
1093 continue;
1094 };
1095 let (stream, channels) =
1096 streams.open(ChannelsConfig::new(timeout).with_reader_limit(reader_limit));
1097
1098 log::debug!(target: "wire", "Opened new stream with id {stream} for {rid} and remote {remote}");
1099
1100 let link = *link;
1101 let task = Task {
1102 fetch: FetchRequest::Initiator {
1103 rid,
1104 remote,
1105 refs_at,
1106 },
1107 stream,
1108 channels,
1109 };
1110
1111 if !self.worker.is_empty() {
1112 log::warn!(
1113 target: "wire",
1114 "Worker pool is busy: {} tasks pending, fetch requests may be delayed", self.worker.len()
1115 );
1116 }
1117 if let Err(e) = self.worker.try_send(task) {
1118 log::error!(
1119 target: "wire",
1120 "Worker pool failed to accept outgoing fetch request: {e}"
1121 );
1122 }
1123 let metrics = self.metrics.peer(remote);
1124 metrics.streams_opened += 1;
1125 metrics.sent_fetch_requests += 1;
1126
1127 self.actions.push_back(Action::Send(
1128 fd,
1129 Frame::<service::Message>::control(link, frame::Control::Open { stream })
1130 .encode_to_vec(),
1131 ));
1132 }
1133 }
1134 }
1135 self.actions.pop_front()
1136 }
1137}
1138
1139pub fn dial<G: Ecdh<Pk = NodeId>>(
1141 remote_addr: NetAddr<HostName>,
1142 remote_id: <G as EcSk>::Pk,
1143 signer: G,
1144 config: &radicle::node::Config,
1145) -> io::Result<WireSession<G>> {
1146 let inet_addr: NetAddr<InetHost> = match (&remote_addr.host, config.proxy) {
1149 (HostName::Ip(_), Some(proxy)) => proxy.into(),
1151 (HostName::Ip(ip), None) => NetAddr::new(InetHost::Ip(*ip), remote_addr.port),
1152 (HostName::Dns(_), Some(proxy)) => proxy.into(),
1153 (HostName::Dns(dns), None) => NetAddr::new(InetHost::Dns(dns.clone()), remote_addr.port),
1154 (HostName::Tor(onion), proxy) => match config.onion {
1156 Some(AddressConfig::Proxy { address }) => address.into(),
1159 Some(AddressConfig::Forward) => {
1162 if let Some(proxy) = proxy {
1163 proxy.into()
1164 } else {
1165 NetAddr::new(InetHost::Dns(onion.to_string()), remote_addr.port)
1166 }
1167 }
1168 None => {
1170 return Err(io::Error::new(
1171 io::ErrorKind::Unsupported,
1172 "no configuration found for .onion addresses",
1173 ));
1174 }
1175 },
1176 _ => {
1177 return Err(io::Error::new(
1178 io::ErrorKind::Unsupported,
1179 "unsupported remote address type",
1180 ));
1181 }
1182 };
1183 let connection = net::TcpStream::connect_nonblocking(inet_addr, DEFAULT_DIAL_TIMEOUT)?;
1186 let force_proxy = config.proxy.is_some();
1188
1189 session::<G>(
1190 remote_addr,
1191 Some(remote_id),
1192 connection,
1193 force_proxy,
1194 signer,
1195 )
1196}
1197
1198pub fn accept<G: Ecdh<Pk = NodeId>>(
1200 remote_addr: NetAddr<HostName>,
1201 connection: net::TcpStream,
1202 signer: G,
1203) -> io::Result<WireSession<G>> {
1204 session::<G>(remote_addr, None, connection, false, signer)
1205}
1206
1207fn session<G: Ecdh<Pk = NodeId>>(
1209 remote_addr: NetAddr<HostName>,
1210 remote_id: Option<NodeId>,
1211 connection: net::TcpStream,
1212 force_proxy: bool,
1213 signer: G,
1214) -> io::Result<WireSession<G>> {
1215 if let Err(e) = connection.set_nodelay(true) {
1217 log::warn!(target: "wire", "Unable to set TCP_NODELAY on fd {}: {e}", connection.as_raw_fd());
1218 }
1219 connection.set_read_timeout(Some(DEFAULT_CONNECTION_TIMEOUT))?;
1220 connection.set_write_timeout(Some(DEFAULT_CONNECTION_TIMEOUT))?;
1221
1222 let sock = socket2::Socket::from(connection);
1223 let ka = socket2::TcpKeepalive::new()
1224 .with_time(time::Duration::from_secs(30))
1225 .with_interval(time::Duration::from_secs(10))
1226 .with_retries(3);
1227 if let Err(e) = sock.set_tcp_keepalive(&ka) {
1228 log::warn!(target: "wire", "Unable to set TCP_KEEPALIVE on fd {}: {e}", sock.as_raw_fd());
1229 }
1230
1231 let socks5 = socks5::Socks5::with(remote_addr, force_proxy);
1232 let proxy = Socks5Session::with(sock.into(), socks5);
1233 let pair = G::generate_keypair();
1234 let keyset = Keyset {
1235 e: pair.0,
1236 s: Some(signer),
1237 re: None,
1238 rs: remote_id,
1239 };
1240 let noise = NoiseState::initialize::<{ Sha256::OUTPUT_LEN }>(
1241 NOISE_XK,
1242 remote_id.is_some(),
1243 &[],
1244 keyset,
1245 );
1246 Ok(WireSession::with(proxy, noise))
1247}
1248
1249#[cfg(test)]
1250mod test {
1251 use super::*;
1252 use crate::service::{Message, ZeroBytes};
1253 use crate::wire;
1254 use crate::wire::varint;
1255
1256 #[test]
1257 fn test_pong_message_with_extension() {
1258 use radicle_protocol::deserializer;
1259
1260 let mut stream = Vec::new();
1261 let pong = Message::Pong {
1262 zeroes: ZeroBytes::new(42),
1263 };
1264 frame::PROTOCOL_VERSION_STRING.encode(&mut stream);
1265 frame::StreamId::gossip(Link::Outbound).encode(&mut stream);
1266
1267 let mut gossip = pong.encode_to_vec();
1269 String::from("extra").encode(&mut gossip);
1270 48u8.encode(&mut gossip);
1271
1272 varint::payload::encode(&gossip, &mut stream);
1274
1275 let mut de = deserializer::Deserializer::<1024, Frame>::new(1024);
1276 de.input(&stream).unwrap();
1277
1278 assert_eq!(
1280 de.deserialize_next().unwrap().unwrap(),
1281 Frame::gossip(Link::Outbound, pong)
1282 );
1283 assert!(de.deserialize_next().unwrap().is_none());
1284 assert!(de.is_empty());
1285 }
1286
1287 #[test]
1288 fn test_inventory_ann_with_extension() {
1289 use radicle_protocol::deserializer;
1290
1291 #[derive(Debug)]
1292 struct MessageWithExt {
1293 msg: Message,
1294 ext: String,
1295 }
1296
1297 impl wire::Encode for MessageWithExt {
1298 fn encode(&self, writer: &mut impl bytes::BufMut) {
1299 self.msg.encode(writer);
1300 self.ext.encode(writer);
1301 }
1302 }
1303
1304 impl wire::Decode for MessageWithExt {
1305 fn decode(reader: &mut impl bytes::Buf) -> Result<Self, wire::Error> {
1306 let msg = Message::decode(reader)?;
1307 let ext = String::decode(reader).unwrap_or_default();
1308
1309 Ok(MessageWithExt { msg, ext })
1310 }
1311 }
1312
1313 let rid = radicle::test::arbitrary::gen(1);
1314 let pk = radicle::test::arbitrary::gen(1);
1315 let sig: [u8; 64] = radicle::test::arbitrary::gen(1);
1316
1317 let mut stream = Vec::new();
1319 let ann = Message::announcement(
1320 pk,
1321 service::gossip::inventory(radicle::node::Timestamp::MAX, [rid]),
1322 radicle::crypto::Signature::from(sig),
1323 );
1324 let pong = Message::Pong {
1325 zeroes: ZeroBytes::new(42),
1326 };
1327 frame::Frame::gossip(
1329 Link::Outbound,
1330 MessageWithExt {
1331 msg: ann.clone(),
1332 ext: String::from("extra"),
1333 },
1334 )
1335 .encode(&mut stream);
1336 frame::Frame::gossip(Link::Outbound, pong.clone()).encode(&mut stream);
1338
1339 {
1341 let mut de = deserializer::Deserializer::<1024, Frame<MessageWithExt>>::new(1024);
1342 de.input(&stream).unwrap();
1343
1344 radicle::assert_matches!(
1345 de.deserialize_next().unwrap().unwrap().data,
1346 FrameData::Gossip(MessageWithExt {
1347 msg,
1348 ext,
1349 }) if msg == ann && ext == *"extra"
1350 );
1351 radicle::assert_matches!(
1352 de.deserialize_next().unwrap().unwrap().data,
1353 FrameData::Gossip(MessageWithExt {
1354 msg,
1355 ext,
1356 }) if msg == pong && ext.is_empty()
1357 );
1358 assert!(de.deserialize_next().unwrap().is_none());
1359 assert!(de.is_empty());
1360 }
1361
1362 {
1364 let mut de = deserializer::Deserializer::<1024, Frame<Message>>::new(1024);
1365 de.input(&stream).unwrap();
1366
1367 radicle::assert_matches!(
1368 de.deserialize_next().unwrap().unwrap().data,
1369 FrameData::Gossip(msg)
1370 if msg == ann
1371 );
1372 radicle::assert_matches!(
1373 de.deserialize_next().unwrap().unwrap().data,
1374 FrameData::Gossip(msg)
1375 if msg == pong
1376 );
1377 assert!(de.deserialize_next().unwrap().is_none());
1378 assert!(de.is_empty());
1379 }
1380 }
1381}