1use std::collections::hash_map::Entry;
6use std::collections::VecDeque;
7use std::os::unix::io::{AsRawFd, RawFd};
8use std::sync::Arc;
9use std::{io, net, time};
10
11use amplify::Wrapper as _;
12use crossbeam_channel as chan;
13use cyphernet::addr::{HostName, InetHost, NetAddr};
14use cyphernet::encrypt::noise::{HandshakePattern, Keyset, NoiseState};
15use cyphernet::proxy::socks5;
16use cyphernet::{Digest, EcSk, Ecdh, Sha256};
17use localtime::LocalTime;
18use netservices::resource::{ListenerEvent, NetAccept, NetTransport, SessionEvent};
19use netservices::session::{NoiseSession, ProtocolArtifact, Socks5Session};
20use netservices::{NetConnection, NetReader, NetWriter};
21use radicle::node::device::Device;
22use reactor::{ResourceId, ResourceType, Timestamp};
23
24use radicle::collections::RandomMap;
25use radicle::crypto;
26use radicle::node::config::AddressConfig;
27use radicle::node::NodeId;
28use radicle::storage::WriteStorage;
29
30use crate::prelude::Deserializer;
31use crate::service;
32use crate::service::io::Io;
33use crate::service::FETCH_TIMEOUT;
34use crate::service::{session, DisconnectReason, Metrics, Service};
35use crate::wire::frame;
36use crate::wire::frame::{Frame, FrameData, StreamId};
37use crate::wire::Encode;
38use crate::worker;
39use crate::worker::{ChannelEvent, ChannelsConfig, FetchRequest, FetchResult, Task, TaskResult};
40use crate::Link;
41
42pub const NOISE_XK: HandshakePattern = HandshakePattern {
44 initiator: cyphernet::encrypt::noise::InitiatorPattern::Xmitted,
45 responder: cyphernet::encrypt::noise::OneWayPattern::Known,
46};
47
48pub const DEFAULT_CONNECTION_TIMEOUT: time::Duration = time::Duration::from_secs(6);
50
51pub const DEFAULT_DIAL_TIMEOUT: time::Duration = time::Duration::from_secs(6);
53
54pub const MAX_INBOX_SIZE: usize = 1024 * 1024 * 2;
56
57#[allow(clippy::large_enum_variant)]
59#[derive(Debug)]
60pub enum Control {
61 User(service::Command),
63 Worker(TaskResult),
65 Flush { remote: NodeId, stream: StreamId },
67}
68
69pub type WireSession<G> = NoiseSession<G, Sha256, Socks5Session<net::TcpStream>>;
71pub type WireReader = NetReader<Socks5Session<net::TcpStream>>;
73pub type WireWriter<G> = NetWriter<NoiseState<G, Sha256>, Socks5Session<net::TcpStream>>;
75
76type Action<G> = reactor::Action<NetAccept<WireSession<G>>, NetTransport<WireSession<G>>>;
78
79struct Stream {
81 channels: worker::Channels,
83 sent_bytes: usize,
85 received_bytes: usize,
87}
88
89impl Stream {
90 fn new(channels: worker::Channels) -> Self {
91 Self {
92 channels,
93 sent_bytes: 0,
94 received_bytes: 0,
95 }
96 }
97}
98
99struct Streams {
101 streams: RandomMap<StreamId, Stream>,
105 link: Link,
107 seq: u64,
109}
110
111impl Streams {
112 fn new(link: Link) -> Self {
114 Self {
115 streams: RandomMap::default(),
116 link,
117 seq: 0,
118 }
119 }
120
121 fn get(&self, stream: &StreamId) -> Option<&Stream> {
123 self.streams.get(stream)
124 }
125
126 fn get_mut(&mut self, stream: &StreamId) -> Option<&mut Stream> {
128 self.streams.get_mut(stream)
129 }
130
131 fn open(&mut self, config: ChannelsConfig) -> (StreamId, worker::Channels) {
133 self.seq += 1;
134
135 let id = StreamId::git(self.link)
136 .nth(self.seq)
137 .expect("Streams::open: too many streams");
138 let channels = self
139 .register(id, config)
140 .expect("Streams::open: stream was already open");
141
142 (id, channels)
143 }
144
145 fn register(&mut self, stream: StreamId, config: ChannelsConfig) -> Option<worker::Channels> {
147 let (wire, worker) = worker::Channels::pair(config)
148 .expect("Streams::register: fatal: unable to create channels");
149
150 match self.streams.entry(stream) {
151 Entry::Vacant(e) => {
152 e.insert(Stream::new(worker));
153 Some(wire)
154 }
155 Entry::Occupied(_) => None,
156 }
157 }
158
159 fn unregister(&mut self, stream: &StreamId) -> Option<Stream> {
161 self.streams.remove(stream)
162 }
163
164 fn shutdown(&mut self) {
166 for (sid, stream) in self.streams.drain() {
167 log::debug!(target: "wire", "Closing worker stream {sid}");
168 stream.channels.close().ok();
169 }
170 }
171}
172
173#[derive(Debug)]
175struct Outbound {
176 id: Option<ResourceId>,
178 addr: NetAddr<HostName>,
180 nid: NodeId,
182}
183
184#[derive(Debug)]
186struct Inbound {
187 id: Option<ResourceId>,
189 addr: NetAddr<HostName>,
191}
192
193enum Peer {
195 Connected {
198 #[allow(dead_code)]
199 addr: NetAddr<HostName>,
200 link: Link,
201 nid: NodeId,
202 inbox: Deserializer<MAX_INBOX_SIZE, Frame>,
203 streams: Streams,
204 },
205 Disconnecting {
208 link: Link,
209 nid: Option<NodeId>,
210 reason: DisconnectReason,
211 },
212}
213
214impl std::fmt::Debug for Peer {
215 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
216 match self {
217 Self::Connected { link, nid, .. } => write!(f, "Connected({link:?}, {nid})"),
218 Self::Disconnecting { .. } => write!(f, "Disconnecting"),
219 }
220 }
221}
222
223impl Peer {
224 fn id(&self) -> Option<&NodeId> {
226 match self {
227 Peer::Connected { nid, .. } | Peer::Disconnecting { nid: Some(nid), .. } => Some(nid),
228 Peer::Disconnecting { nid: None, .. } => None,
229 }
230 }
231
232 fn link(&self) -> Link {
233 match self {
234 Peer::Connected { link, .. } => *link,
235 Peer::Disconnecting { link, .. } => *link,
236 }
237 }
238
239 fn connected(nid: NodeId, addr: NetAddr<HostName>, link: Link) -> Self {
241 Self::Connected {
242 link,
243 addr,
244 nid,
245 inbox: Deserializer::default(),
246 streams: Streams::new(link),
247 }
248 }
249}
250
251struct Peers(RandomMap<ResourceId, Peer>);
253
254impl Peers {
255 fn get_mut(&mut self, id: &ResourceId) -> Option<&mut Peer> {
256 self.0.get_mut(id)
257 }
258
259 fn entry(&mut self, id: ResourceId) -> Entry<ResourceId, Peer> {
260 self.0.entry(id)
261 }
262
263 fn insert(&mut self, id: ResourceId, peer: Peer) {
264 if self.0.insert(id, peer).is_some() {
265 log::warn!(target: "wire", "Replacing existing peer id={id}");
266 }
267 }
268
269 fn remove(&mut self, id: &ResourceId) -> Option<Peer> {
270 self.0.remove(id)
271 }
272
273 fn lookup(&self, node_id: &NodeId) -> Option<(ResourceId, &Peer)> {
274 self.0
275 .iter()
276 .find(|(_, peer)| peer.id() == Some(node_id))
277 .map(|(fd, peer)| (*fd, peer))
278 }
279
280 fn lookup_mut(&mut self, node_id: &NodeId) -> Option<(ResourceId, &mut Peer)> {
281 self.0
282 .iter_mut()
283 .find(|(_, peer)| peer.id() == Some(node_id))
284 .map(|(fd, peer)| (*fd, peer))
285 }
286
287 fn active(&self) -> impl Iterator<Item = (ResourceId, &NodeId, Link)> {
288 self.0.iter().filter_map(|(id, peer)| match peer {
289 Peer::Connected { nid, link, .. } => Some((*id, nid, *link)),
290 Peer::Disconnecting { .. } => None,
291 })
292 }
293
294 fn connected(&self) -> impl Iterator<Item = (ResourceId, &NodeId)> {
295 self.0.iter().filter_map(|(id, peer)| {
296 if let Peer::Connected { nid, .. } = peer {
297 Some((*id, nid))
298 } else {
299 None
300 }
301 })
302 }
303
304 fn iter(&self) -> impl Iterator<Item = &Peer> {
305 self.0.values()
306 }
307}
308
309pub struct Wire<D, S, G: crypto::signature::Signer<crypto::Signature> + Ecdh> {
311 service: Service<D, S, G>,
313 worker: chan::Sender<Task>,
315 signer: Device<G>,
317 metrics: service::Metrics,
319 actions: VecDeque<Action<G>>,
321 outbound: RandomMap<RawFd, Outbound>,
323 inbound: RandomMap<RawFd, Inbound>,
325 listening: RandomMap<RawFd, net::SocketAddr>,
327 peers: Peers,
329}
330
331impl<D, S, G> Wire<D, S, G>
332where
333 D: service::Store,
334 S: WriteStorage + 'static,
335 G: crypto::signature::Signer<crypto::Signature> + Ecdh<Pk = NodeId>,
336{
337 pub fn new(service: Service<D, S, G>, worker: chan::Sender<Task>, signer: Device<G>) -> Self {
338 assert!(service.started().is_some(), "Service must be initialized");
339
340 Self {
341 service,
342 worker,
343 signer,
344 metrics: Metrics::default(),
345 actions: VecDeque::new(),
346 inbound: RandomMap::default(),
347 outbound: RandomMap::default(),
348 listening: RandomMap::default(),
349 peers: Peers(RandomMap::default()),
350 }
351 }
352
353 pub fn listen(&mut self, socket: NetAccept<WireSession<G>>) {
354 self.listening
355 .insert(socket.as_raw_fd(), socket.local_addr());
356 self.actions.push_back(Action::RegisterListener(socket));
357 }
358
359 fn disconnect(&mut self, id: ResourceId, reason: DisconnectReason) -> Option<(NodeId, Link)> {
360 match self.peers.entry(id) {
361 Entry::Vacant(_) => {
362 log::debug!(target: "wire", "Disconnecting pending peer with id={id}: {reason}");
364 self.actions.push_back(Action::UnregisterTransport(id));
365
366 self.outbound
369 .values()
370 .find(|o| o.id == Some(id))
371 .map(|o| (o.nid, Link::Outbound))
372 }
373 Entry::Occupied(mut e) => match e.get_mut() {
374 Peer::Disconnecting { nid, link, .. } => {
375 log::error!(target: "wire", "Peer with id={id} is already disconnecting");
376
377 nid.map(|n| (n, *link))
378 }
379 Peer::Connected {
380 nid, streams, link, ..
381 } => {
382 log::debug!(target: "wire", "Disconnecting peer with id={id}: {reason}");
383 let nid = *nid;
384 let link = *link;
385
386 streams.shutdown();
387 e.insert(Peer::Disconnecting {
388 nid: Some(nid),
389 link,
390 reason,
391 });
392 self.actions.push_back(Action::UnregisterTransport(id));
393
394 Some((nid, link))
395 }
396 },
397 }
398 }
399
400 fn worker_result(&mut self, task: TaskResult) {
401 log::debug!(
402 target: "wire",
403 "Received fetch result from worker for stream {}, remote {}: {:?}",
404 task.stream, task.remote, task.result
405 );
406
407 let nid = task.remote;
408 let Some((fd, peer)) = self.peers.lookup_mut(&nid) else {
409 log::warn!(target: "wire", "Peer {nid} not found; ignoring fetch result");
410 return;
411 };
412
413 if let Peer::Connected { link, streams, .. } = peer {
414 if let Some(s) = streams.unregister(&task.stream) {
418 log::debug!(
419 target: "wire", "Stream {} of {} closing with {} byte(s) sent and {} byte(s) received",
420 task.stream, task.remote, s.sent_bytes, s.received_bytes
421 );
422 let frame = Frame::<service::Message>::control(
423 *link,
424 frame::Control::Close {
425 stream: task.stream,
426 },
427 );
428 self.actions.push_back(Action::Send(fd, frame.to_bytes()));
429 }
430 } else {
431 log::warn!(target: "wire", "Peer {nid} is not connected; ignoring fetch result");
434 return;
435 };
436
437 match task.result {
439 FetchResult::Initiator { rid, result } => {
440 self.service.fetched(rid, nid, result);
441 }
442 FetchResult::Responder { rid, result } => {
443 if let Some(rid) = rid {
444 if let Some(err) = result.err() {
445 log::info!(target: "wire", "Peer {nid} failed to fetch {rid} from us: {err}");
446 } else {
447 log::info!(target: "wire", "Peer {nid} fetched {rid} from us successfully");
448 }
449 }
450 }
451 }
452 }
453
454 fn flush(&mut self, remote: NodeId, stream: StreamId) {
455 let Some((fd, peer)) = self.peers.lookup_mut(&remote) else {
456 log::warn!(target: "wire", "Peer {remote} is not known; ignoring flush");
457 return;
458 };
459 let Peer::Connected { streams, link, .. } = peer else {
460 log::warn!(target: "wire", "Peer {remote} is not connected; ignoring flush");
461 return;
462 };
463 let Some(s) = streams.get_mut(&stream) else {
464 log::debug!(target: "wire", "Stream {stream} cannot be found; ignoring flush");
465 return;
466 };
467 let metrics = self.metrics.peer(remote);
468
469 for data in s.channels.try_iter() {
470 let frame = match data {
471 ChannelEvent::Data(data) => {
472 metrics.sent_git_bytes += data.len();
473 metrics.sent_bytes += data.len();
474 Frame::<service::Message>::git(stream, data)
475 }
476 ChannelEvent::Close => Frame::control(*link, frame::Control::Close { stream }),
477 ChannelEvent::Eof => Frame::control(*link, frame::Control::Eof { stream }),
478 };
479 self.actions
480 .push_back(reactor::Action::Send(fd, frame.to_bytes()));
481 }
482 }
483
484 fn cleanup(&mut self, id: ResourceId, fd: RawFd) {
485 if self.inbound.remove(&fd).is_some() {
486 log::debug!(target: "wire", "Cleaning up inbound peer state with id={id} (fd={fd})");
487 } else if let Some(outbound) = self.outbound.remove(&fd) {
488 log::debug!(target: "wire", "Cleaning up outbound peer state with id={id} (fd={fd})");
489 self.service.disconnected(
490 outbound.nid,
491 Link::Outbound,
492 &DisconnectReason::connection(),
493 );
494 } else {
495 log::debug!(target: "wire", "Tried to cleanup unknown peer with id={id} (fd={fd})");
496 }
497 }
498}
499
500impl<D, S, G> reactor::Handler for Wire<D, S, G>
501where
502 D: service::Store + Send,
503 S: WriteStorage + Send + 'static,
504 G: crypto::signature::Signer<crypto::Signature> + Ecdh<Pk = NodeId> + Clone + Send,
505{
506 type Listener = NetAccept<WireSession<G>>;
507 type Transport = NetTransport<WireSession<G>>;
508 type Command = Control;
509
510 fn tick(&mut self, time: Timestamp) {
511 self.metrics.open_channels = self
512 .peers
513 .iter()
514 .filter_map(|p| {
515 if let Peer::Connected { streams, .. } = p {
516 Some(streams.streams.len())
517 } else {
518 None
519 }
520 })
521 .sum();
522 self.metrics.worker_queue_size = self.worker.len();
523 self.service.tick(
524 LocalTime::from_millis(time.as_millis() as u128),
525 &self.metrics,
526 );
527 }
528
529 fn handle_timer(&mut self) {
530 self.service.wake();
531 }
532
533 fn handle_listener_event(
534 &mut self,
535 _: ResourceId, event: ListenerEvent<WireSession<G>>,
537 _: Timestamp,
538 ) {
539 match event {
540 ListenerEvent::Accepted(connection) => {
541 let Ok(remote) = connection.remote_addr() else {
542 log::warn!(target: "wire", "Accepted connection doesn't have remote address; dropping..");
543 drop(connection);
544
545 return;
546 };
547 let InetHost::Ip(ip) = remote.host else {
548 log::error!(target: "wire", "Unexpected host type for inbound connection {remote}; dropping..");
549 drop(connection);
550
551 return;
552 };
553 let fd = connection.as_raw_fd();
554 log::debug!(target: "wire", "Inbound connection from {remote} (fd={fd})..");
555
556 if !self.service.accepted(ip) {
559 log::debug!(target: "wire", "Rejecting inbound connection from {ip} (fd={fd})..");
560 drop(connection);
561
562 return;
563 }
564
565 let session = match accept::<G>(
566 remote.clone().into(),
567 connection,
568 self.signer.clone().into_inner(),
569 ) {
570 Ok(s) => s,
571 Err(e) => {
572 log::error!(target: "wire", "Error creating session for {ip}: {e}");
573 return;
574 }
575 };
576 let transport = match NetTransport::with_session(session, Link::Inbound) {
577 Ok(transport) => transport,
578 Err(err) => {
579 log::error!(target: "wire", "Failed to create transport for accepted connection: {err}");
580 return;
581 }
582 };
583 log::debug!(target: "wire", "Accepted inbound connection from {remote} (fd={fd})..");
584
585 self.inbound.insert(
586 fd,
587 Inbound {
588 id: None,
589 addr: remote.into(),
590 },
591 );
592 self.actions
593 .push_back(reactor::Action::RegisterTransport(transport))
594 }
595 ListenerEvent::Failure(err) => {
596 log::error!(target: "wire", "Error listening for inbound connections: {err}");
597 }
598 }
599 }
600
601 fn handle_registered(&mut self, fd: RawFd, id: ResourceId, typ: ResourceType) {
602 match typ {
603 ResourceType::Listener => {
604 if let Some(local_addr) = self.listening.remove(&fd) {
605 self.service.listening(local_addr);
606 }
607 }
608 ResourceType::Transport => {
609 if let Some(outbound) = self.outbound.get_mut(&fd) {
610 log::debug!(target: "wire", "Outbound peer resource registered for {} with id={id} (fd={fd})", outbound.nid);
611 outbound.id = Some(id);
612 } else if let Some(inbound) = self.inbound.get_mut(&fd) {
613 log::debug!(target: "wire", "Inbound peer resource registered with id={id} (fd={fd})");
614 inbound.id = Some(id);
615 } else {
616 log::warn!(target: "wire", "Unknown peer registered with fd={fd} and id={id}");
617 }
618 }
619 }
620 }
621
622 fn handle_transport_event(
623 &mut self,
624 id: ResourceId,
625 event: SessionEvent<WireSession<G>>,
626 _: Timestamp,
627 ) {
628 match event {
629 SessionEvent::Established(fd, ProtocolArtifact { state, .. }) => {
630 let nid: NodeId = state.remote_static_key.unwrap();
632 if &nid == self.signer.public_key() {
634 log::error!(target: "wire", "Self-connection detected, disconnecting..");
635 self.disconnect(id, DisconnectReason::SelfConnection);
636
637 return;
638 }
639 let (addr, link) = if let Some(peer) = self.inbound.remove(&fd) {
640 self.metrics.peer(nid).inbound_connection_attempts += 1;
641 (peer.addr, Link::Inbound)
642 } else if let Some(peer) = self.outbound.remove(&fd) {
643 assert_eq!(nid, peer.nid);
644 (peer.addr, Link::Outbound)
645 } else {
646 log::error!(target: "wire", "Session for {nid} (id={id}) not found");
647 return;
648 };
649 log::debug!(
650 target: "wire",
651 "Session established with {nid} (id={id}) (fd={fd}) ({})",
652 if link.is_inbound() { "inbound" } else { "outbound" }
653 );
654
655 let mut disconnect = Vec::new();
657
658 {
664 let precedence = *self.signer.public_key() > nid;
668
669 let mut conflicting = Vec::new();
673
674 conflicting.extend(
676 self.peers
677 .active()
678 .filter(|(c_id, d, _)| **d == nid && *c_id != id)
679 .map(|(c_id, _, link)| (c_id, link)),
680 );
681
682 conflicting.extend(self.outbound.iter().filter_map(|(c_fd, other)| {
685 if other.nid == nid && *c_fd != fd {
686 other.id.map(|c_id| (c_id, Link::Outbound))
687 } else {
688 None
689 }
690 }));
691
692 for (c_id, c_link) in conflicting {
693 let close = match (link, c_link) {
698 (Link::Inbound, Link::Outbound) => {
699 if precedence {
700 id
701 } else {
702 c_id
703 }
704 }
705 (Link::Outbound, Link::Inbound) => {
706 if precedence {
707 c_id
708 } else {
709 id
710 }
711 }
712 (Link::Inbound, Link::Inbound) => id.max(c_id),
713 (Link::Outbound, Link::Outbound) => id.max(c_id),
714 };
715
716 log::warn!(
717 target: "wire", "Established session (id={id}) conflicts with existing session for {nid} (id={c_id})"
718 );
719 disconnect.push(close);
720 }
721 }
722 for id in &disconnect {
723 log::warn!(
724 target: "wire", "Closing conflicting session (id={id}) with {nid}.."
725 );
726 if let Some((nid, link)) = self.disconnect(*id, DisconnectReason::Conflict) {
728 self.service
732 .disconnected(nid, link, &DisconnectReason::Conflict);
733 }
734 }
735 if !disconnect.contains(&id) {
736 self.peers
737 .insert(id, Peer::connected(nid, addr.clone(), link));
738 self.service.connected(nid, addr.into(), link);
739 }
740 }
741 SessionEvent::Data(data) => {
742 if let Some(Peer::Connected {
743 nid,
744 inbox,
745 streams,
746 ..
747 }) = self.peers.get_mut(&id)
748 {
749 let metrics = self.metrics.peer(*nid);
750 metrics.received_bytes += data.len();
751
752 if inbox.input(&data).is_err() {
753 log::error!(target: "wire", "Maximum inbox size ({MAX_INBOX_SIZE}) reached for peer {nid}");
754 log::error!(target: "wire", "Unable to process messages fast enough for peer {nid}; disconnecting..");
755 self.disconnect(id, DisconnectReason::Session(session::Error::Misbehavior));
756
757 return;
758 }
759
760 loop {
761 match inbox.deserialize_next() {
762 Ok(Some(Frame {
763 data: FrameData::Control(frame::Control::Open { stream }),
764 ..
765 })) => {
766 log::debug!(target: "wire", "Received `open` command for stream {stream} from {nid}");
767 metrics.streams_opened += 1;
768 metrics.received_fetch_requests += 1;
769 let reader_limit = self.service.config().limits.fetch_pack_receive;
770 let Some(channels) = streams.register(
771 stream,
772 ChannelsConfig::new(FETCH_TIMEOUT)
773 .with_reader_limit(reader_limit),
774 ) else {
775 log::warn!(target: "wire", "Peer attempted to open already-open stream stream {stream}");
776 continue;
777 };
778
779 let task = Task {
780 fetch: FetchRequest::Responder {
781 remote: *nid,
782 emitter: self.service.emitter(),
783 },
784 stream,
785 channels,
786 };
787 if let Err(e) = self.worker.try_send(task) {
788 log::error!(
789 target: "wire",
790 "Worker pool failed to accept incoming fetch request: {e}"
791 );
792 }
793 }
794 Ok(Some(Frame {
795 data: FrameData::Control(frame::Control::Eof { stream }),
796 ..
797 })) => {
798 if let Some(s) = streams.get(&stream) {
799 log::debug!(target: "wire", "Received `end-of-file` on stream {stream} from {nid}");
800
801 if s.channels.send(ChannelEvent::Eof).is_err() {
802 log::error!(target: "wire", "Worker is disconnected; cannot send `EOF`");
803 }
804 } else {
805 log::debug!(target: "wire", "Ignoring frame on closed or unknown stream {stream}");
806 }
807 }
808 Ok(Some(Frame {
809 data: FrameData::Control(frame::Control::Close { stream }),
810 ..
811 })) => {
812 log::debug!(target: "wire", "Received `close` command for stream {stream} from {nid}");
813
814 if let Some(s) = streams.unregister(&stream) {
815 log::debug!(
816 target: "wire",
817 "Stream {stream} of {nid} closed with {} byte(s) sent and {} byte(s) received",
818 s.sent_bytes, s.received_bytes
819 );
820 s.channels.close().ok();
821 }
822 }
823 Ok(Some(Frame {
824 data: FrameData::Gossip(msg),
825 ..
826 })) => {
827 metrics.received_gossip_messages += 1;
828 self.service.received_message(*nid, msg);
829 }
830 Ok(Some(Frame {
831 stream,
832 data: FrameData::Git(data),
833 ..
834 })) => {
835 if let Some(s) = streams.get_mut(&stream) {
836 metrics.received_git_bytes += data.len();
837
838 if s.channels.send(ChannelEvent::Data(data)).is_err() {
839 log::error!(target: "wire", "Worker is disconnected; cannot send data");
840 }
841 } else {
842 log::debug!(target: "wire", "Ignoring frame on closed or unknown stream {stream}");
843 }
844 }
845 Ok(None) => {
846 break;
848 }
849 Err(e) => {
850 log::error!(target: "wire", "Invalid gossip message from {nid}: {e}");
851
852 if !inbox.is_empty() {
853 log::debug!(target: "wire", "Dropping read buffer for {nid} with {} bytes", inbox.len());
854 }
855 self.disconnect(
856 id,
857 DisconnectReason::Session(session::Error::Misbehavior),
858 );
859 break;
860 }
861 }
862 }
863 } else {
864 log::warn!(target: "wire", "Dropping message from unconnected peer (id={id})");
865 }
866 }
867 SessionEvent::Terminated(err) => {
868 self.disconnect(id, DisconnectReason::Connection(Arc::new(err)));
869 }
870 }
871 }
872
873 fn handle_command(&mut self, cmd: Self::Command) {
874 match cmd {
875 Control::User(cmd) => self.service.command(cmd),
876 Control::Worker(result) => self.worker_result(result),
877 Control::Flush { remote, stream } => self.flush(remote, stream),
878 }
879 }
880
881 fn handle_error(
882 &mut self,
883 err: reactor::Error<NetAccept<WireSession<G>>, NetTransport<WireSession<G>>>,
884 ) {
885 match err {
886 reactor::Error::Poll(err) => {
887 log::error!(target: "wire", "Can't poll connections: {err}");
889 }
890 reactor::Error::ListenerDisconnect(id, _) => {
891 log::error!(target: "wire", "Listener {id} disconnected");
893 }
894 reactor::Error::TransportDisconnect(id, transport) => {
895 let fd = transport.as_raw_fd();
896 log::error!(target: "wire", "Peer id={id} (fd={fd}) disconnected");
897
898 drop(transport);
900
901 match self.peers.remove(&id) {
905 Some(mut peer) => {
906 if let Peer::Connected { streams, .. } = &mut peer {
907 streams.shutdown();
908 }
909
910 if let Some(id) = peer.id() {
911 self.service.disconnected(
912 *id,
913 peer.link(),
914 &DisconnectReason::connection(),
915 );
916 } else {
917 log::debug!(target: "wire", "Inbound disconnection before handshake; ignoring..")
918 }
919 }
920 None => self.cleanup(id, fd),
921 }
922 }
923 }
924 }
925
926 fn handover_listener(&mut self, id: ResourceId, _listener: Self::Listener) {
927 log::error!(target: "wire", "Listener handover is not supported (id={id})");
928 }
929
930 fn handover_transport(&mut self, id: ResourceId, transport: Self::Transport) {
931 let fd = transport.as_raw_fd();
932
933 match self.peers.entry(id) {
934 Entry::Occupied(e) => {
935 match e.get() {
936 Peer::Disconnecting {
937 nid, reason, link, ..
938 } => {
939 log::debug!(target: "wire", "Transport handover for disconnecting peer with id={id} (fd={fd})");
940
941 drop(transport);
943
944 if let Some(nid) = nid {
946 self.service.disconnected(*nid, *link, reason);
953 }
954 e.remove();
955 }
956 Peer::Connected { nid, .. } => {
957 panic!("Wire::handover_transport: Unexpected handover of connected peer {} with id={id} (fd={fd})", nid);
958 }
959 }
960 }
961 Entry::Vacant(_) => self.cleanup(id, fd),
962 }
963 }
964}
965
966impl<D, S, G> Iterator for Wire<D, S, G>
967where
968 D: service::Store,
969 S: WriteStorage + 'static,
970 G: crypto::signature::Signer<crypto::Signature> + Ecdh<Pk = NodeId> + Clone,
971{
972 type Item = Action<G>;
973
974 fn next(&mut self) -> Option<Self::Item> {
975 while let Some(ev) = self.service.next() {
976 match ev {
977 Io::Write(node_id, msgs) => {
978 let (fd, link) = match self.peers.lookup(&node_id) {
979 Some((fd, Peer::Connected { link, .. })) => (fd, *link),
980 Some((_, peer)) => {
981 log::debug!(target: "wire", "Dropping {} message(s) to {node_id} ({peer:?})", msgs.len());
984 continue;
985 }
986 None => {
987 log::error!(target: "wire", "Dropping {} message(s) to {node_id}: unknown peer", msgs.len());
988 continue;
989 }
990 };
991 log::trace!(
992 target: "wire", "Writing {} message(s) to {}", msgs.len(), node_id
993 );
994 let mut data = Vec::new();
995 let metrics = self.metrics.peer(node_id);
996 metrics.sent_gossip_messages += msgs.len();
997
998 for msg in msgs {
999 Frame::gossip(link, msg)
1000 .encode(&mut data)
1001 .expect("in-memory writes never fail");
1002 }
1003 metrics.sent_bytes += data.len();
1004
1005 self.actions.push_back(reactor::Action::Send(fd, data));
1006 }
1007 Io::Connect(node_id, addr) => {
1008 if self.peers.connected().any(|(_, id)| id == &node_id) {
1009 log::error!(
1010 target: "wire",
1011 "Attempt to connect to already connected peer {node_id}"
1012 );
1013 continue;
1016 }
1017 self.service.attempted(node_id, addr.clone());
1018 self.metrics.peer(node_id).outbound_connection_attempts += 1;
1019
1020 match dial::<G>(
1021 addr.to_inner(),
1022 node_id,
1023 self.signer.clone().into_inner(),
1024 self.service.config(),
1025 )
1026 .and_then(|session| {
1027 NetTransport::<WireSession<G>>::with_session(session, Link::Outbound)
1028 }) {
1029 Ok(transport) => {
1030 self.outbound.insert(
1031 transport.as_raw_fd(),
1032 Outbound {
1033 id: None,
1034 nid: node_id,
1035 addr: addr.to_inner(),
1036 },
1037 );
1038 log::debug!(
1039 target: "wire",
1040 "Registering outbound transport for {node_id} (fd={})..",
1041 transport.as_raw_fd()
1042 );
1043 self.actions
1044 .push_back(reactor::Action::RegisterTransport(transport));
1045 }
1046 Err(err) => {
1047 log::error!(target: "wire", "Error establishing connection to {addr}: {err}");
1048
1049 self.service.disconnected(
1050 node_id,
1051 Link::Outbound,
1052 &DisconnectReason::Dial(Arc::new(err)),
1053 );
1054 }
1055 }
1056 }
1057 Io::Disconnect(nid, reason) => {
1058 if let Some((id, Peer::Connected { .. })) = self.peers.lookup(&nid) {
1059 if let Some((nid, _)) = self.disconnect(id, reason) {
1060 self.metrics.peer(nid).disconnects += 1;
1061 }
1062 } else {
1063 log::warn!(target: "wire", "Peer {nid} is not connected: ignoring disconnect");
1064 }
1065 }
1066 Io::Wakeup(d) => {
1067 self.actions.push_back(reactor::Action::SetTimer(d.into()));
1068 }
1069 Io::Fetch {
1070 rid,
1071 remote,
1072 timeout,
1073 reader_limit,
1074 refs_at,
1075 } => {
1076 log::trace!(target: "wire", "Processing fetch for {rid} from {remote}..");
1077
1078 let Some((fd, Peer::Connected { link, streams, .. })) =
1079 self.peers.lookup_mut(&remote)
1080 else {
1081 log::error!(target: "wire", "Peer {remote} is not connected: dropping fetch");
1086 continue;
1087 };
1088 let (stream, channels) =
1089 streams.open(ChannelsConfig::new(timeout).with_reader_limit(reader_limit));
1090
1091 log::debug!(target: "wire", "Opened new stream with id {stream} for {rid} and remote {remote}");
1092
1093 let link = *link;
1094 let task = Task {
1095 fetch: FetchRequest::Initiator {
1096 rid,
1097 remote,
1098 refs_at,
1099 },
1100 stream,
1101 channels,
1102 };
1103
1104 if !self.worker.is_empty() {
1105 log::warn!(
1106 target: "wire",
1107 "Worker pool is busy: {} tasks pending, fetch requests may be delayed", self.worker.len()
1108 );
1109 }
1110 if let Err(e) = self.worker.try_send(task) {
1111 log::error!(
1112 target: "wire",
1113 "Worker pool failed to accept outgoing fetch request: {e}"
1114 );
1115 }
1116 let metrics = self.metrics.peer(remote);
1117 metrics.streams_opened += 1;
1118 metrics.sent_fetch_requests += 1;
1119
1120 self.actions.push_back(Action::Send(
1121 fd,
1122 Frame::<service::Message>::control(link, frame::Control::Open { stream })
1123 .to_bytes(),
1124 ));
1125 }
1126 }
1127 }
1128 self.actions.pop_front()
1129 }
1130}
1131
1132pub fn dial<G: Ecdh<Pk = NodeId>>(
1134 remote_addr: NetAddr<HostName>,
1135 remote_id: <G as EcSk>::Pk,
1136 signer: G,
1137 config: &service::Config,
1138) -> io::Result<WireSession<G>> {
1139 let inet_addr: NetAddr<InetHost> = match (&remote_addr.host, config.proxy) {
1142 (HostName::Ip(_), Some(proxy)) => proxy.into(),
1144 (HostName::Ip(ip), None) => NetAddr::new(InetHost::Ip(*ip), remote_addr.port),
1145 (HostName::Dns(_), Some(proxy)) => proxy.into(),
1146 (HostName::Dns(dns), None) => NetAddr::new(InetHost::Dns(dns.clone()), remote_addr.port),
1147 (HostName::Tor(onion), proxy) => match config.onion {
1149 Some(AddressConfig::Proxy { address }) => address.into(),
1152 Some(AddressConfig::Forward) => {
1155 if let Some(proxy) = proxy {
1156 proxy.into()
1157 } else {
1158 NetAddr::new(InetHost::Dns(onion.to_string()), remote_addr.port)
1159 }
1160 }
1161 None => {
1163 return Err(io::Error::new(
1164 io::ErrorKind::Unsupported,
1165 "no configuration found for .onion addresses",
1166 ));
1167 }
1168 },
1169 _ => {
1170 return Err(io::Error::new(
1171 io::ErrorKind::Unsupported,
1172 "unsupported remote address type",
1173 ));
1174 }
1175 };
1176 let connection = net::TcpStream::connect_nonblocking(inet_addr, DEFAULT_DIAL_TIMEOUT)?;
1179 let force_proxy = config.proxy.is_some();
1181
1182 session::<G>(
1183 remote_addr,
1184 Some(remote_id),
1185 connection,
1186 force_proxy,
1187 signer,
1188 )
1189}
1190
1191pub fn accept<G: Ecdh<Pk = NodeId>>(
1193 remote_addr: NetAddr<HostName>,
1194 connection: net::TcpStream,
1195 signer: G,
1196) -> io::Result<WireSession<G>> {
1197 session::<G>(remote_addr, None, connection, false, signer)
1198}
1199
1200fn session<G: Ecdh<Pk = NodeId>>(
1202 remote_addr: NetAddr<HostName>,
1203 remote_id: Option<NodeId>,
1204 connection: net::TcpStream,
1205 force_proxy: bool,
1206 signer: G,
1207) -> io::Result<WireSession<G>> {
1208 if let Err(e) = connection.set_nodelay(true) {
1210 log::warn!(target: "wire", "Unable to set TCP_NODELAY on fd {}: {e}", connection.as_raw_fd());
1211 }
1212 connection.set_read_timeout(Some(DEFAULT_CONNECTION_TIMEOUT))?;
1213 connection.set_write_timeout(Some(DEFAULT_CONNECTION_TIMEOUT))?;
1214
1215 let sock = socket2::Socket::from(connection);
1216 let ka = socket2::TcpKeepalive::new()
1217 .with_time(time::Duration::from_secs(30))
1218 .with_interval(time::Duration::from_secs(10))
1219 .with_retries(3);
1220 if let Err(e) = sock.set_tcp_keepalive(&ka) {
1221 log::warn!(target: "wire", "Unable to set TCP_KEEPALIVE on fd {}: {e}", sock.as_raw_fd());
1222 }
1223
1224 let socks5 = socks5::Socks5::with(remote_addr, force_proxy);
1225 let proxy = Socks5Session::with(sock.into(), socks5);
1226 let pair = G::generate_keypair();
1227 let keyset = Keyset {
1228 e: pair.0,
1229 s: Some(signer),
1230 re: None,
1231 rs: remote_id,
1232 };
1233 let noise = NoiseState::initialize::<{ Sha256::OUTPUT_LEN }>(
1234 NOISE_XK,
1235 remote_id.is_some(),
1236 &[],
1237 keyset,
1238 );
1239 Ok(WireSession::with(proxy, noise))
1240}
1241
1242#[cfg(test)]
1243mod test {
1244 use super::*;
1245 use crate::service::{Message, ZeroBytes};
1246 use crate::wire;
1247 use crate::wire::varint;
1248
1249 #[test]
1250 fn test_pong_message_with_extension() {
1251 use crate::deserializer;
1252
1253 let mut stream = Vec::new();
1254 let pong = Message::Pong {
1255 zeroes: ZeroBytes::new(42),
1256 };
1257 frame::PROTOCOL_VERSION_STRING.encode(&mut stream).unwrap();
1258 frame::StreamId::gossip(Link::Outbound)
1259 .encode(&mut stream)
1260 .unwrap();
1261
1262 let mut gossip = wire::serialize(&pong);
1264 String::from("extra").encode(&mut gossip).unwrap();
1265 48u8.encode(&mut gossip).unwrap();
1266
1267 varint::payload::encode(&gossip, &mut stream).unwrap();
1269
1270 let mut de = deserializer::Deserializer::<1024, Frame>::new(1024);
1271 de.input(&stream).unwrap();
1272
1273 assert_eq!(
1275 de.deserialize_next().unwrap().unwrap(),
1276 Frame::gossip(Link::Outbound, pong)
1277 );
1278 assert!(de.deserialize_next().unwrap().is_none());
1279 assert!(de.is_empty());
1280 }
1281
1282 #[test]
1283 fn test_inventory_ann_with_extension() {
1284 use crate::deserializer;
1285
1286 #[derive(Debug)]
1287 struct MessageWithExt {
1288 msg: Message,
1289 ext: String,
1290 }
1291
1292 impl wire::Encode for MessageWithExt {
1293 fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
1294 let mut n = self.msg.encode(writer)?;
1295 n += self.ext.encode(writer)?;
1296
1297 Ok(n)
1298 }
1299 }
1300
1301 impl wire::Decode for MessageWithExt {
1302 fn decode<R: io::Read + ?Sized>(reader: &mut R) -> Result<Self, wire::Error> {
1303 let msg = Message::decode(reader)?;
1304 let ext = String::decode(reader).unwrap_or_default();
1305
1306 Ok(MessageWithExt { msg, ext })
1307 }
1308 }
1309
1310 let rid = radicle::test::arbitrary::gen(1);
1311 let pk = radicle::test::arbitrary::gen(1);
1312 let sig: [u8; 64] = radicle::test::arbitrary::gen(1);
1313
1314 let mut stream = Vec::new();
1316 let ann = Message::announcement(
1317 pk,
1318 service::gossip::inventory(radicle::node::Timestamp::MAX, [rid]),
1319 radicle::crypto::Signature::from(sig),
1320 );
1321 let pong = Message::Pong {
1322 zeroes: ZeroBytes::new(42),
1323 };
1324 frame::Frame::gossip(
1326 Link::Outbound,
1327 MessageWithExt {
1328 msg: ann.clone(),
1329 ext: String::from("extra"),
1330 },
1331 )
1332 .encode(&mut stream)
1333 .unwrap();
1334 frame::Frame::gossip(Link::Outbound, pong.clone())
1336 .encode(&mut stream)
1337 .unwrap();
1338
1339 {
1341 let mut de = deserializer::Deserializer::<1024, Frame<MessageWithExt>>::new(1024);
1342 de.input(&stream).unwrap();
1343
1344 radicle::assert_matches!(
1345 de.deserialize_next().unwrap().unwrap().data,
1346 FrameData::Gossip(MessageWithExt {
1347 msg,
1348 ext,
1349 }) if msg == ann && ext == *"extra"
1350 );
1351 radicle::assert_matches!(
1352 de.deserialize_next().unwrap().unwrap().data,
1353 FrameData::Gossip(MessageWithExt {
1354 msg,
1355 ext,
1356 }) if msg == pong && ext.is_empty()
1357 );
1358 assert!(de.deserialize_next().unwrap().is_none());
1359 assert!(de.is_empty());
1360 }
1361
1362 {
1364 let mut de = deserializer::Deserializer::<1024, Frame<Message>>::new(1024);
1365 de.input(&stream).unwrap();
1366
1367 radicle::assert_matches!(
1368 de.deserialize_next().unwrap().unwrap().data,
1369 FrameData::Gossip(msg)
1370 if msg == ann
1371 );
1372 radicle::assert_matches!(
1373 de.deserialize_next().unwrap().unwrap().data,
1374 FrameData::Gossip(msg)
1375 if msg == pong
1376 );
1377 assert!(de.deserialize_next().unwrap().is_none());
1378 assert!(de.is_empty());
1379 }
1380 }
1381}