emissary_core/tunnel/transit/
participant.rs1use crate::{
20 events::EventHandle,
21 i2np::{tunnel::data::EncryptedTunnelData, Message, MessageBuilder, MessageType},
22 primitives::{RouterId, TunnelId},
23 runtime::Runtime,
24 tunnel::{
25 noise::TunnelKeys,
26 routing_table::RoutingTable,
27 transit::{TransitTunnel, TRANSIT_TUNNEL_EXPIRATION},
28 },
29};
30
31use bytes::{BufMut, BytesMut};
32use futures::FutureExt;
33use rand_core::RngCore;
34
35use alloc::vec::Vec;
36use core::{
37 future::Future,
38 pin::Pin,
39 task::{Context, Poll},
40 time::Duration,
41};
42use thingbuf::mpsc::Receiver;
43
44const LOG_TARGET: &str = "emissary::tunnel::transit::participant";
46
47pub struct Participant<R: Runtime> {
52 event_handle: EventHandle<R>,
54
55 expiration_timer: R::Timer,
57
58 bandwidth: usize,
60
61 message_rx: Receiver<Message>,
63
64 #[allow(unused)]
66 metrics_handle: R::MetricsHandle,
67
68 next_router: RouterId,
70
71 next_tunnel_id: TunnelId,
73
74 routing_table: RoutingTable,
76
77 tunnel_id: TunnelId,
79
80 tunnel_keys: TunnelKeys,
82}
83
84impl<R: Runtime> Participant<R> {
85 fn handle_tunnel_data(
90 &mut self,
91 tunnel_data: &EncryptedTunnelData,
92 ) -> crate::Result<(RouterId, Vec<u8>)> {
93 tracing::trace!(
94 target: LOG_TARGET,
95 tunnel_id = %self.tunnel_id,
96 "participant tunnel data",
97 );
98
99 let (ciphertext, iv) = self.tunnel_keys.decrypt_record(tunnel_data);
101
102 let mut out = BytesMut::with_capacity(4 + 16 + ciphertext.len());
104
105 out.put_u32(self.next_tunnel_id.into());
106 out.put_slice(&iv);
107 out.put_slice(&ciphertext);
108
109 let message = MessageBuilder::short()
110 .with_message_type(MessageType::TunnelData)
111 .with_message_id(R::rng().next_u32())
112 .with_expiration(R::time_since_epoch() + Duration::from_secs(8))
113 .with_payload(&out)
114 .build();
115
116 Ok((self.next_router.clone(), message))
117 }
118}
119
120impl<R: Runtime> TransitTunnel<R> for Participant<R> {
121 fn new(
122 tunnel_id: TunnelId,
123 next_tunnel_id: TunnelId,
124 next_router: RouterId,
125 tunnel_keys: TunnelKeys,
126 routing_table: RoutingTable,
127 metrics_handle: R::MetricsHandle,
128 message_rx: Receiver<Message>,
129 event_handle: EventHandle<R>,
130 ) -> Self {
131 Participant {
132 event_handle,
133 expiration_timer: R::timer(TRANSIT_TUNNEL_EXPIRATION),
134 bandwidth: 0usize,
135 message_rx,
136 metrics_handle,
137 next_router,
138 next_tunnel_id,
139 routing_table,
140 tunnel_id,
141 tunnel_keys,
142 }
143 }
144}
145
146impl<R: Runtime> Future for Participant<R> {
147 type Output = TunnelId;
148
149 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
150 while let Poll::Ready(event) = self.message_rx.poll_recv(cx) {
151 match event {
152 None => {
153 tracing::warn!(
154 target: LOG_TARGET,
155 tunnel_id = %self.tunnel_id,
156 "message channel closed",
157 );
158 return Poll::Ready(self.tunnel_id);
159 }
160 Some(message) => {
161 self.bandwidth += message.serialized_len_short();
162
163 match message.message_type {
164 MessageType::TunnelData => {
165 match EncryptedTunnelData::parse(&message.payload) {
166 Some(message) => match self.handle_tunnel_data(&message) {
167 Ok((router, message)) => {
168 self.bandwidth += message.len();
169
170 if let Err(error) =
171 self.routing_table.send_message(router, message)
172 {
173 tracing::error!(
174 target: LOG_TARGET,
175 tunnel_id = %self.tunnel_id,
176 ?error,
177 "failed to send message",
178 )
179 }
180 }
181 Err(error) => tracing::warn!(
182 target: LOG_TARGET,
183 tunnel_id = %self.tunnel_id,
184 ?error,
185 "failed to handle tunnel data",
186 ),
187 },
188 None => tracing::warn!(
189 target: LOG_TARGET,
190 "failed to parse message",
191 ),
192 }
193 }
194 message_type => tracing::warn!(
195 target: LOG_TARGET,
196 tunnel_id = %self.tunnel_id,
197 ?message_type,
198 "unsupported message",
199 ),
200 }
201 }
202 }
203 }
204
205 if self.event_handle.poll_unpin(cx).is_ready() {
206 self.event_handle.transit_tunnel_bandwidth(self.bandwidth);
207 self.bandwidth = 0;
208 }
209
210 if self.expiration_timer.poll_unpin(cx).is_ready() {
211 return Poll::Ready(self.tunnel_id);
212 }
213
214 Poll::Pending
215 }
216}