emissary_core/tunnel/transit/
participant.rs1use crate::{
20    events::EventHandle,
21    i2np::{tunnel::data::EncryptedTunnelData, Message, MessageBuilder, MessageType},
22    primitives::{RouterId, TunnelId},
23    runtime::Runtime,
24    tunnel::{
25        noise::TunnelKeys,
26        routing_table::RoutingTable,
27        transit::{TransitTunnel, TRANSIT_TUNNEL_EXPIRATION},
28    },
29};
30
31use bytes::{BufMut, BytesMut};
32use futures::FutureExt;
33use rand_core::RngCore;
34
35use alloc::vec::Vec;
36use core::{
37    future::Future,
38    pin::Pin,
39    task::{Context, Poll},
40    time::Duration,
41};
42use thingbuf::mpsc::Receiver;
43
44const LOG_TARGET: &str = "emissary::tunnel::transit::participant";
46
47pub struct Participant<R: Runtime> {
52    event_handle: EventHandle<R>,
54
55    expiration_timer: R::Timer,
57
58    bandwidth: usize,
60
61    message_rx: Receiver<Message>,
63
64    #[allow(unused)]
66    metrics_handle: R::MetricsHandle,
67
68    next_router: RouterId,
70
71    next_tunnel_id: TunnelId,
73
74    routing_table: RoutingTable,
76
77    tunnel_id: TunnelId,
79
80    tunnel_keys: TunnelKeys,
82}
83
84impl<R: Runtime> Participant<R> {
85    fn handle_tunnel_data(
90        &mut self,
91        tunnel_data: &EncryptedTunnelData,
92    ) -> crate::Result<(RouterId, Vec<u8>)> {
93        tracing::trace!(
94            target: LOG_TARGET,
95            tunnel_id = %self.tunnel_id,
96            "participant tunnel data",
97        );
98
99        let (ciphertext, iv) = self.tunnel_keys.decrypt_record(tunnel_data);
101
102        let mut out = BytesMut::with_capacity(4 + 16 + ciphertext.len());
104
105        out.put_u32(self.next_tunnel_id.into());
106        out.put_slice(&iv);
107        out.put_slice(&ciphertext);
108
109        let message = MessageBuilder::short()
110            .with_message_type(MessageType::TunnelData)
111            .with_message_id(R::rng().next_u32())
112            .with_expiration(R::time_since_epoch() + Duration::from_secs(8))
113            .with_payload(&out)
114            .build();
115
116        Ok((self.next_router.clone(), message))
117    }
118}
119
120impl<R: Runtime> TransitTunnel<R> for Participant<R> {
121    fn new(
122        tunnel_id: TunnelId,
123        next_tunnel_id: TunnelId,
124        next_router: RouterId,
125        tunnel_keys: TunnelKeys,
126        routing_table: RoutingTable,
127        metrics_handle: R::MetricsHandle,
128        message_rx: Receiver<Message>,
129        event_handle: EventHandle<R>,
130    ) -> Self {
131        Participant {
132            event_handle,
133            expiration_timer: R::timer(TRANSIT_TUNNEL_EXPIRATION),
134            bandwidth: 0usize,
135            message_rx,
136            metrics_handle,
137            next_router,
138            next_tunnel_id,
139            routing_table,
140            tunnel_id,
141            tunnel_keys,
142        }
143    }
144}
145
146impl<R: Runtime> Future for Participant<R> {
147    type Output = TunnelId;
148
149    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
150        while let Poll::Ready(event) = self.message_rx.poll_recv(cx) {
151            match event {
152                None => {
153                    tracing::warn!(
154                        target: LOG_TARGET,
155                        tunnel_id = %self.tunnel_id,
156                        "message channel closed",
157                    );
158                    return Poll::Ready(self.tunnel_id);
159                }
160                Some(message) => {
161                    self.bandwidth += message.serialized_len_short();
162
163                    match message.message_type {
164                        MessageType::TunnelData => {
165                            match EncryptedTunnelData::parse(&message.payload) {
166                                Some(message) => match self.handle_tunnel_data(&message) {
167                                    Ok((router, message)) => {
168                                        self.bandwidth += message.len();
169
170                                        if let Err(error) =
171                                            self.routing_table.send_message(router, message)
172                                        {
173                                            tracing::error!(
174                                                target: LOG_TARGET,
175                                                tunnel_id = %self.tunnel_id,
176                                                ?error,
177                                                "failed to send message",
178                                            )
179                                        }
180                                    }
181                                    Err(error) => tracing::warn!(
182                                        target: LOG_TARGET,
183                                        tunnel_id = %self.tunnel_id,
184                                        ?error,
185                                        "failed to handle tunnel data",
186                                    ),
187                                },
188                                None => tracing::warn!(
189                                    target: LOG_TARGET,
190                                    "failed to parse message",
191                                ),
192                            }
193                        }
194                        message_type => tracing::warn!(
195                            target: LOG_TARGET,
196                            tunnel_id = %self.tunnel_id,
197                            ?message_type,
198                            "unsupported message",
199                        ),
200                    }
201                }
202            }
203        }
204
205        if self.event_handle.poll_unpin(cx).is_ready() {
206            self.event_handle.transit_tunnel_bandwidth(self.bandwidth);
207            self.bandwidth = 0;
208        }
209
210        if self.expiration_timer.poll_unpin(cx).is_ready() {
211            return Poll::Ready(self.tunnel_id);
212        }
213
214        Poll::Pending
215    }
216}