1use std::collections::{HashMap, VecDeque};
2use std::io;
3use std::net::SocketAddr;
4use std::sync::{
5 Arc,
6 atomic::{AtomicBool, AtomicUsize, Ordering},
7};
8use std::time::Duration;
9
10use bytes::Bytes;
11use thiserror::Error;
12use tokio::sync::mpsc;
13use tokio::sync::mpsc::error::TrySendError;
14use tokio::task::JoinHandle;
15use tokio::time::timeout;
16
17use crate::client::{
18 ClientDisconnectReason, ClientSendOptions, RaknetClient, RaknetClientConfig, RaknetClientError,
19 RaknetClientEvent, ReconnectPolicy,
20};
21use crate::error::ConfigValidationError;
22use crate::protocol::reliability::Reliability;
23use crate::server::{PeerDisconnectReason, PeerId, RaknetServer, RaknetServerEvent, SendOptions};
24
25#[derive(Debug, Clone, Copy, PartialEq, Eq)]
26pub enum RelayDirection {
27 DownstreamToUpstream,
28 UpstreamToDownstream,
29}
30
31#[derive(Debug, Clone, PartialEq, Eq)]
32pub enum RelayDecision {
33 Forward(Bytes),
34 Drop,
35 Disconnect { reason: &'static str },
36}
37
38pub trait RelayPolicy: Send + Sync + 'static {
39 fn decide(&self, direction: RelayDirection, payload: &Bytes) -> RelayDecision {
40 let _ = direction;
41 RelayDecision::Forward(payload.clone())
42 }
43}
44
45#[derive(Debug, Clone, Copy, Default)]
46pub struct PassthroughRelayPolicy;
47
48impl RelayPolicy for PassthroughRelayPolicy {}
49
50#[derive(Debug, Clone, Copy, PartialEq, Eq)]
51pub struct RelayContractConfig {
52 pub max_payload_bytes: usize,
53 pub allow_empty_payload: bool,
54}
55
56impl Default for RelayContractConfig {
57 fn default() -> Self {
58 Self {
59 max_payload_bytes: 2 * 1024 * 1024,
60 allow_empty_payload: false,
61 }
62 }
63}
64
65impl RelayContractConfig {
66 pub fn validate(&self) -> Result<(), ConfigValidationError> {
67 if self.max_payload_bytes == 0 {
68 return Err(ConfigValidationError::new(
69 "RelayContractConfig",
70 "max_payload_bytes",
71 "must be >= 1",
72 ));
73 }
74 Ok(())
75 }
76}
77
78#[derive(Debug, Error, Clone, PartialEq, Eq)]
79pub enum RelayContractError {
80 #[error("empty payload is not allowed by relay contract")]
81 EmptyPayload,
82 #[error("payload too large for relay contract: {actual} > {max}")]
83 PayloadTooLarge { actual: usize, max: usize },
84 #[error("relay policy requested disconnect: {reason}")]
85 PolicyDisconnect { reason: &'static str },
86}
87
88pub struct RelayContract<P = PassthroughRelayPolicy> {
89 config: RelayContractConfig,
90 policy: P,
91}
92
93impl<P> RelayContract<P>
94where
95 P: RelayPolicy,
96{
97 pub fn new(config: RelayContractConfig, policy: P) -> Self {
98 Self { config, policy }
99 }
100
101 pub fn config(&self) -> RelayContractConfig {
102 self.config
103 }
104
105 pub fn apply(
106 &self,
107 direction: RelayDirection,
108 payload: Bytes,
109 ) -> Result<Option<Bytes>, RelayContractError> {
110 self.validate_payload(&payload)?;
111
112 match self.policy.decide(direction, &payload) {
113 RelayDecision::Forward(bytes) => {
114 self.validate_payload(&bytes)?;
115 Ok(Some(bytes))
116 }
117 RelayDecision::Drop => Ok(None),
118 RelayDecision::Disconnect { reason } => {
119 Err(RelayContractError::PolicyDisconnect { reason })
120 }
121 }
122 }
123
124 fn validate_payload(&self, payload: &Bytes) -> Result<(), RelayContractError> {
125 if payload.is_empty() && !self.config.allow_empty_payload {
126 return Err(RelayContractError::EmptyPayload);
127 }
128
129 if payload.len() > self.config.max_payload_bytes {
130 return Err(RelayContractError::PayloadTooLarge {
131 actual: payload.len(),
132 max: self.config.max_payload_bytes,
133 });
134 }
135
136 Ok(())
137 }
138}
139
140#[derive(Debug, Clone, Default)]
141pub struct UpstreamConnectorConfig {
142 pub client_config: RaknetClientConfig,
143 pub reconnect_policy: ReconnectPolicy,
144}
145
146impl UpstreamConnectorConfig {
147 pub fn validate(&self) -> Result<(), ConfigValidationError> {
148 self.client_config.validate()?;
149 self.reconnect_policy.validate()?;
150 Ok(())
151 }
152}
153
154#[derive(Debug, Clone)]
155pub struct UpstreamConnector {
156 pub upstream_addr: SocketAddr,
157 pub config: UpstreamConnectorConfig,
158}
159
160impl UpstreamConnector {
161 pub fn new(upstream_addr: SocketAddr, config: UpstreamConnectorConfig) -> Self {
162 Self {
163 upstream_addr,
164 config,
165 }
166 }
167
168 pub async fn connect(&self) -> Result<RaknetClient, RaknetClientError> {
169 RaknetClient::connect_with_retry(
170 self.upstream_addr,
171 self.config.client_config.clone(),
172 self.config.reconnect_policy.clone(),
173 )
174 .await
175 }
176}
177
178#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
179pub enum RelayOverflowPolicy {
180 #[default]
181 DropNewest,
182 DisconnectSession,
183}
184
185#[derive(Debug, Clone, Copy)]
186pub struct RelayRuntimeConfig {
187 pub per_session_downstream_queue_capacity: usize,
188 pub session_event_queue_capacity: usize,
189 pub downstream_to_upstream_send: ClientSendOptions,
190 pub upstream_to_downstream_send: SendOptions,
191 pub downstream_overflow_policy: RelayOverflowPolicy,
192 pub budget_overflow_policy: RelayOverflowPolicy,
193 pub downstream_max_pending_packets: usize,
194 pub downstream_max_pending_bytes: usize,
195 pub upstream_max_pending_packets: usize,
196 pub upstream_max_pending_bytes: usize,
197 pub session_total_max_pending_bytes: usize,
198}
199
200impl Default for RelayRuntimeConfig {
201 fn default() -> Self {
202 Self {
203 per_session_downstream_queue_capacity: 256,
204 session_event_queue_capacity: 1024,
205 downstream_to_upstream_send: ClientSendOptions::default(),
206 upstream_to_downstream_send: SendOptions::default(),
207 downstream_overflow_policy: RelayOverflowPolicy::DropNewest,
208 budget_overflow_policy: RelayOverflowPolicy::DisconnectSession,
209 downstream_max_pending_packets: 256,
210 downstream_max_pending_bytes: 512 * 1024,
211 upstream_max_pending_packets: 512,
212 upstream_max_pending_bytes: 1024 * 1024,
213 session_total_max_pending_bytes: 1536 * 1024,
214 }
215 }
216}
217
218impl RelayRuntimeConfig {
219 pub fn validate(&self) -> Result<(), ConfigValidationError> {
220 if self.per_session_downstream_queue_capacity == 0 {
221 return Err(ConfigValidationError::new(
222 "RelayRuntimeConfig",
223 "per_session_downstream_queue_capacity",
224 "must be >= 1",
225 ));
226 }
227 if self.session_event_queue_capacity == 0 {
228 return Err(ConfigValidationError::new(
229 "RelayRuntimeConfig",
230 "session_event_queue_capacity",
231 "must be >= 1",
232 ));
233 }
234 if self.downstream_max_pending_packets == 0 {
235 return Err(ConfigValidationError::new(
236 "RelayRuntimeConfig",
237 "downstream_max_pending_packets",
238 "must be >= 1",
239 ));
240 }
241 if self.downstream_max_pending_bytes == 0 {
242 return Err(ConfigValidationError::new(
243 "RelayRuntimeConfig",
244 "downstream_max_pending_bytes",
245 "must be >= 1",
246 ));
247 }
248 if self.upstream_max_pending_packets == 0 {
249 return Err(ConfigValidationError::new(
250 "RelayRuntimeConfig",
251 "upstream_max_pending_packets",
252 "must be >= 1",
253 ));
254 }
255 if self.upstream_max_pending_bytes == 0 {
256 return Err(ConfigValidationError::new(
257 "RelayRuntimeConfig",
258 "upstream_max_pending_bytes",
259 "must be >= 1",
260 ));
261 }
262 if self.session_total_max_pending_bytes == 0 {
263 return Err(ConfigValidationError::new(
264 "RelayRuntimeConfig",
265 "session_total_max_pending_bytes",
266 "must be >= 1",
267 ));
268 }
269 let min_total = self
270 .downstream_max_pending_bytes
271 .max(self.upstream_max_pending_bytes);
272 if self.session_total_max_pending_bytes < min_total {
273 return Err(ConfigValidationError::new(
274 "RelayRuntimeConfig",
275 "session_total_max_pending_bytes",
276 format!(
277 "must be >= max(downstream_max_pending_bytes, upstream_max_pending_bytes) = {min_total}"
278 ),
279 ));
280 }
281 Ok(())
282 }
283}
284
285#[derive(Debug, Clone, PartialEq, Eq)]
286pub struct RelayBudgetExceeded {
287 pub pending_packets: usize,
288 pub pending_bytes: usize,
289 pub packet_limit: usize,
290 pub byte_limit: usize,
291 pub total_pending_bytes: usize,
292 pub total_limit: usize,
293}
294
295#[derive(Debug, Clone, PartialEq, Eq)]
296pub enum RelayDropReason {
297 NoSession,
298 QueueOverflow,
299 BudgetExceeded(RelayBudgetExceeded),
300 PolicyDrop,
301 ContractViolation(RelayContractError),
302}
303
304#[derive(Debug, Clone, PartialEq, Eq)]
305pub enum RelaySessionCloseReason {
306 DownstreamDisconnected {
307 reason: PeerDisconnectReason,
308 },
309 UpstreamDisconnected {
310 reason: ClientDisconnectReason,
311 },
312 UpstreamConnectFailed {
313 message: String,
314 },
315 UpstreamSendFailed {
316 message: String,
317 },
318 DownstreamSendFailed {
319 message: String,
320 },
321 ContractViolation {
322 direction: RelayDirection,
323 error: RelayContractError,
324 },
325 PolicyDisconnect {
326 direction: RelayDirection,
327 reason: &'static str,
328 },
329 BudgetExceeded {
330 direction: RelayDirection,
331 details: RelayBudgetExceeded,
332 },
333 DownstreamQueueOverflow,
334 CommandChannelClosed,
335 ProxyShutdown,
336}
337
338#[derive(Debug)]
339pub enum RaknetRelayProxyEvent {
340 SessionStarted {
341 peer_id: PeerId,
342 downstream_addr: SocketAddr,
343 upstream_addr: SocketAddr,
344 },
345 Forwarded {
346 peer_id: PeerId,
347 direction: RelayDirection,
348 payload_len: usize,
349 },
350 Dropped {
351 peer_id: PeerId,
352 direction: RelayDirection,
353 reason: RelayDropReason,
354 },
355 DecodeError {
356 peer_id: PeerId,
357 direction: RelayDirection,
358 error: String,
359 },
360 SessionClosed {
361 peer_id: PeerId,
362 reason: RelaySessionCloseReason,
363 },
364 DownstreamRateLimited {
365 addr: SocketAddr,
366 },
367 DownstreamSessionLimitReached {
368 addr: SocketAddr,
369 },
370 DownstreamProxyDropped {
371 addr: SocketAddr,
372 },
373 DownstreamDecodeError {
374 addr: SocketAddr,
375 error: String,
376 },
377 DownstreamWorkerError {
378 shard_id: usize,
379 message: String,
380 },
381 DownstreamWorkerStopped {
382 shard_id: usize,
383 },
384}
385
386struct RelaySessionHandle {
387 command_tx: mpsc::Sender<RelaySessionCommand>,
388 stop: Arc<AtomicBool>,
389 join: JoinHandle<()>,
390 downstream_pending_packets: Arc<AtomicUsize>,
391 downstream_pending_bytes: Arc<AtomicUsize>,
392 upstream_pending_packets: Arc<AtomicUsize>,
393 upstream_pending_bytes: Arc<AtomicUsize>,
394}
395
396enum RelayInput {
397 Server(Option<RaknetServerEvent>),
398 Session(Option<RelaySessionEvent>),
399}
400
401enum RelaySessionCommand {
402 ForwardDownstreamPayload {
403 payload: Bytes,
404 send_options: ClientSendOptions,
405 },
406 Disconnect,
407}
408
409enum RelaySessionEvent {
410 ForwardToDownstream {
411 peer_id: PeerId,
412 payload: Bytes,
413 send_options: SendOptions,
414 },
415 Forwarded {
416 peer_id: PeerId,
417 direction: RelayDirection,
418 payload_len: usize,
419 },
420 Dropped {
421 peer_id: PeerId,
422 direction: RelayDirection,
423 reason: RelayDropReason,
424 },
425 DecodeError {
426 peer_id: PeerId,
427 direction: RelayDirection,
428 error: String,
429 },
430 Terminated {
431 peer_id: PeerId,
432 reason: RelaySessionCloseReason,
433 },
434}
435
436struct RelaySessionRuntimeContext {
437 event_tx: mpsc::Sender<RelaySessionEvent>,
438 upstream_to_downstream_send: SendOptions,
439 runtime_config: RelayRuntimeConfig,
440 stop: Arc<AtomicBool>,
441 downstream_pending_packets: Arc<AtomicUsize>,
442 downstream_pending_bytes: Arc<AtomicUsize>,
443 upstream_pending_packets: Arc<AtomicUsize>,
444 upstream_pending_bytes: Arc<AtomicUsize>,
445}
446
447pub struct RaknetRelayProxy<P = PassthroughRelayPolicy> {
448 server: RaknetServer,
449 upstream_connector: UpstreamConnector,
450 contract: Arc<RelayContract<P>>,
451 runtime_config: RelayRuntimeConfig,
452 sessions: HashMap<PeerId, RelaySessionHandle>,
453 session_event_tx: mpsc::Sender<RelaySessionEvent>,
454 session_event_rx: mpsc::Receiver<RelaySessionEvent>,
455 pending_events: VecDeque<RaknetRelayProxyEvent>,
456}
457
458impl<P> RaknetRelayProxy<P>
459where
460 P: RelayPolicy,
461{
462 pub fn try_new(
463 server: RaknetServer,
464 upstream_connector: UpstreamConnector,
465 contract: RelayContract<P>,
466 runtime_config: RelayRuntimeConfig,
467 ) -> io::Result<Self> {
468 upstream_connector
469 .config
470 .validate()
471 .map_err(invalid_config_io_error)?;
472 contract
473 .config()
474 .validate()
475 .map_err(invalid_config_io_error)?;
476 runtime_config.validate().map_err(invalid_config_io_error)?;
477 Ok(Self::new(
478 server,
479 upstream_connector,
480 contract,
481 runtime_config,
482 ))
483 }
484
485 pub fn new(
486 server: RaknetServer,
487 upstream_connector: UpstreamConnector,
488 contract: RelayContract<P>,
489 runtime_config: RelayRuntimeConfig,
490 ) -> Self {
491 let (session_event_tx, session_event_rx) =
492 mpsc::channel(runtime_config.session_event_queue_capacity.max(1));
493
494 Self {
495 server,
496 upstream_connector,
497 contract: Arc::new(contract),
498 runtime_config,
499 sessions: HashMap::new(),
500 session_event_tx,
501 session_event_rx,
502 pending_events: VecDeque::new(),
503 }
504 }
505
506 pub async fn bind(
507 downstream_bind_addr: SocketAddr,
508 upstream_connector: UpstreamConnector,
509 contract: RelayContract<P>,
510 runtime_config: RelayRuntimeConfig,
511 ) -> io::Result<Self> {
512 let server = RaknetServer::bind(downstream_bind_addr).await?;
513 Self::try_new(server, upstream_connector, contract, runtime_config)
514 }
515
516 pub fn session_count(&self) -> usize {
517 self.sessions.len()
518 }
519
520 pub async fn next_event(&mut self) -> Option<RaknetRelayProxyEvent> {
521 if let Some(event) = self.pending_events.pop_front() {
522 return Some(event);
523 }
524
525 loop {
526 let input = {
527 let server = &mut self.server;
528 let session_event_rx = &mut self.session_event_rx;
529 tokio::select! {
530 event = server.next_event() => RelayInput::Server(event),
531 event = session_event_rx.recv() => RelayInput::Session(event),
532 }
533 };
534
535 match input {
536 RelayInput::Server(Some(event)) => self.handle_server_event(event).await,
537 RelayInput::Session(Some(event)) => self.handle_session_event(event).await,
538 RelayInput::Server(None) => return None,
539 RelayInput::Session(None) => {
540 if self.sessions.is_empty() {
541 return None;
542 }
543 }
544 }
545
546 if let Some(event) = self.pending_events.pop_front() {
547 return Some(event);
548 }
549 }
550 }
551
552 pub async fn shutdown(mut self) -> io::Result<()> {
553 self.stop_all_sessions().await;
554 self.server.shutdown().await
555 }
556
557 async fn handle_server_event(&mut self, event: RaknetServerEvent) {
558 match event {
559 RaknetServerEvent::PeerConnected { peer_id, addr, .. } => {
560 if let Some(existing) = self.sessions.remove(&peer_id) {
561 self.finalize_session_handle(existing, true).await;
562 }
563
564 match self.upstream_connector.connect().await {
565 Ok(upstream_client) => {
566 let upstream_addr = upstream_client.server_addr();
567 self.spawn_session(peer_id, upstream_client);
568 self.pending_events
569 .push_back(RaknetRelayProxyEvent::SessionStarted {
570 peer_id,
571 downstream_addr: addr,
572 upstream_addr,
573 });
574 }
575 Err(error) => {
576 let reason = RelaySessionCloseReason::UpstreamConnectFailed {
577 message: error.to_string(),
578 };
579 let _ = self.server.disconnect(peer_id).await;
580 self.pending_events
581 .push_back(RaknetRelayProxyEvent::SessionClosed { peer_id, reason });
582 }
583 }
584 }
585 RaknetServerEvent::PeerDisconnected {
586 peer_id, reason, ..
587 } => {
588 let _ = self
589 .close_session(
590 peer_id,
591 RelaySessionCloseReason::DownstreamDisconnected { reason },
592 true,
593 false,
594 )
595 .await;
596 }
597 RaknetServerEvent::Packet {
598 peer_id,
599 payload,
600 reliability,
601 ordering_channel,
602 ..
603 } => {
604 self.forward_downstream_payload(peer_id, payload, reliability, ordering_channel)
605 .await;
606 }
607 RaknetServerEvent::PeerRateLimited { addr } => {
608 self.pending_events
609 .push_back(RaknetRelayProxyEvent::DownstreamRateLimited { addr });
610 }
611 RaknetServerEvent::SessionLimitReached { addr } => {
612 self.pending_events
613 .push_back(RaknetRelayProxyEvent::DownstreamSessionLimitReached { addr });
614 }
615 RaknetServerEvent::ProxyDropped { addr } => {
616 self.pending_events
617 .push_back(RaknetRelayProxyEvent::DownstreamProxyDropped { addr });
618 }
619 RaknetServerEvent::DecodeError { addr, error } => {
620 self.pending_events
621 .push_back(RaknetRelayProxyEvent::DownstreamDecodeError { addr, error });
622 }
623 RaknetServerEvent::WorkerError { shard_id, message } => {
624 self.pending_events
625 .push_back(RaknetRelayProxyEvent::DownstreamWorkerError { shard_id, message });
626 }
627 RaknetServerEvent::WorkerStopped { shard_id } => {
628 self.pending_events
629 .push_back(RaknetRelayProxyEvent::DownstreamWorkerStopped { shard_id });
630 }
631 RaknetServerEvent::OfflinePacket { .. } => {}
632 RaknetServerEvent::ReceiptAcked { .. } => {}
633 RaknetServerEvent::Metrics { .. } => {}
634 }
635 }
636
637 async fn forward_downstream_payload(
638 &mut self,
639 peer_id: PeerId,
640 payload: Bytes,
641 reliability: Reliability,
642 ordering_channel: Option<u8>,
643 ) {
644 let Some(session) = self.sessions.get(&peer_id) else {
645 self.pending_events
646 .push_back(RaknetRelayProxyEvent::Dropped {
647 peer_id,
648 direction: RelayDirection::DownstreamToUpstream,
649 reason: RelayDropReason::NoSession,
650 });
651 return;
652 };
653
654 let payload_len = payload.len();
655 let send_options = downstream_to_upstream_send_options(
656 self.runtime_config.downstream_to_upstream_send,
657 reliability,
658 ordering_channel,
659 );
660 if let Err(details) = try_reserve_budget(
661 RelayDirection::DownstreamToUpstream,
662 payload_len,
663 self.runtime_config,
664 &session.downstream_pending_packets,
665 &session.downstream_pending_bytes,
666 &session.upstream_pending_packets,
667 &session.upstream_pending_bytes,
668 ) {
669 match self.runtime_config.budget_overflow_policy {
670 RelayOverflowPolicy::DropNewest => {
671 self.pending_events
672 .push_back(RaknetRelayProxyEvent::Dropped {
673 peer_id,
674 direction: RelayDirection::DownstreamToUpstream,
675 reason: RelayDropReason::BudgetExceeded(details),
676 });
677 }
678 RelayOverflowPolicy::DisconnectSession => {
679 let _ = self
680 .close_session(
681 peer_id,
682 RelaySessionCloseReason::BudgetExceeded {
683 direction: RelayDirection::DownstreamToUpstream,
684 details,
685 },
686 true,
687 true,
688 )
689 .await;
690 }
691 }
692 return;
693 }
694
695 match session
696 .command_tx
697 .try_send(RelaySessionCommand::ForwardDownstreamPayload {
698 payload,
699 send_options,
700 }) {
701 Ok(()) => {}
702 Err(TrySendError::Full(_)) => match self.runtime_config.downstream_overflow_policy {
703 RelayOverflowPolicy::DropNewest => {
704 release_reserved_budget(
705 RelayDirection::DownstreamToUpstream,
706 payload_len,
707 &session.downstream_pending_packets,
708 &session.downstream_pending_bytes,
709 &session.upstream_pending_packets,
710 &session.upstream_pending_bytes,
711 );
712 self.pending_events
713 .push_back(RaknetRelayProxyEvent::Dropped {
714 peer_id,
715 direction: RelayDirection::DownstreamToUpstream,
716 reason: RelayDropReason::QueueOverflow,
717 });
718 }
719 RelayOverflowPolicy::DisconnectSession => {
720 release_reserved_budget(
721 RelayDirection::DownstreamToUpstream,
722 payload_len,
723 &session.downstream_pending_packets,
724 &session.downstream_pending_bytes,
725 &session.upstream_pending_packets,
726 &session.upstream_pending_bytes,
727 );
728 let _ = self
729 .close_session(
730 peer_id,
731 RelaySessionCloseReason::DownstreamQueueOverflow,
732 true,
733 true,
734 )
735 .await;
736 }
737 },
738 Err(TrySendError::Closed(_)) => {
739 release_reserved_budget(
740 RelayDirection::DownstreamToUpstream,
741 payload_len,
742 &session.downstream_pending_packets,
743 &session.downstream_pending_bytes,
744 &session.upstream_pending_packets,
745 &session.upstream_pending_bytes,
746 );
747 self.pending_events
748 .push_back(RaknetRelayProxyEvent::Dropped {
749 peer_id,
750 direction: RelayDirection::DownstreamToUpstream,
751 reason: RelayDropReason::NoSession,
752 });
753
754 let _ = self
755 .close_session(
756 peer_id,
757 RelaySessionCloseReason::CommandChannelClosed,
758 false,
759 true,
760 )
761 .await;
762 }
763 }
764 }
765
766 async fn handle_session_event(&mut self, event: RelaySessionEvent) {
767 match event {
768 RelaySessionEvent::ForwardToDownstream {
769 peer_id,
770 payload,
771 send_options,
772 } => {
773 let payload_len = payload.len();
774 if let Some(session) = self.sessions.get(&peer_id) {
775 release_reserved_budget(
776 RelayDirection::UpstreamToDownstream,
777 payload_len,
778 &session.downstream_pending_packets,
779 &session.downstream_pending_bytes,
780 &session.upstream_pending_packets,
781 &session.upstream_pending_bytes,
782 );
783 }
784
785 match self
786 .server
787 .send_with_options(peer_id, payload, send_options)
788 .await
789 {
790 Ok(()) => {
791 self.pending_events
792 .push_back(RaknetRelayProxyEvent::Forwarded {
793 peer_id,
794 direction: RelayDirection::UpstreamToDownstream,
795 payload_len,
796 });
797 }
798 Err(error) => {
799 let _ = self
800 .close_session(
801 peer_id,
802 RelaySessionCloseReason::DownstreamSendFailed {
803 message: error.to_string(),
804 },
805 true,
806 true,
807 )
808 .await;
809 }
810 }
811 }
812 RelaySessionEvent::Forwarded {
813 peer_id,
814 direction,
815 payload_len,
816 } => {
817 self.pending_events
818 .push_back(RaknetRelayProxyEvent::Forwarded {
819 peer_id,
820 direction,
821 payload_len,
822 });
823 }
824 RelaySessionEvent::Dropped {
825 peer_id,
826 direction,
827 reason,
828 } => {
829 self.pending_events
830 .push_back(RaknetRelayProxyEvent::Dropped {
831 peer_id,
832 direction,
833 reason,
834 });
835 }
836 RelaySessionEvent::DecodeError {
837 peer_id,
838 direction,
839 error,
840 } => {
841 self.pending_events
842 .push_back(RaknetRelayProxyEvent::DecodeError {
843 peer_id,
844 direction,
845 error,
846 });
847 }
848 RelaySessionEvent::Terminated { peer_id, reason } => {
849 let disconnect_downstream = should_disconnect_downstream(&reason);
850 let _ = self
851 .close_session(peer_id, reason, false, disconnect_downstream)
852 .await;
853 }
854 }
855 }
856
857 fn spawn_session(&mut self, peer_id: PeerId, upstream_client: RaknetClient) {
858 let (command_tx, command_rx) = mpsc::channel(
859 self.runtime_config
860 .per_session_downstream_queue_capacity
861 .max(1),
862 );
863 let stop = Arc::new(AtomicBool::new(false));
864 let downstream_pending_packets = Arc::new(AtomicUsize::new(0));
865 let downstream_pending_bytes = Arc::new(AtomicUsize::new(0));
866 let upstream_pending_packets = Arc::new(AtomicUsize::new(0));
867 let upstream_pending_bytes = Arc::new(AtomicUsize::new(0));
868
869 let contract = Arc::clone(&self.contract);
870 let session_context = RelaySessionRuntimeContext {
871 event_tx: self.session_event_tx.clone(),
872 upstream_to_downstream_send: self.runtime_config.upstream_to_downstream_send,
873 runtime_config: self.runtime_config,
874 stop: Arc::clone(&stop),
875 downstream_pending_packets: Arc::clone(&downstream_pending_packets),
876 downstream_pending_bytes: Arc::clone(&downstream_pending_bytes),
877 upstream_pending_packets: Arc::clone(&upstream_pending_packets),
878 upstream_pending_bytes: Arc::clone(&upstream_pending_bytes),
879 };
880
881 let join = tokio::spawn(async move {
882 run_relay_session(
883 peer_id,
884 upstream_client,
885 contract,
886 command_rx,
887 session_context,
888 )
889 .await;
890 });
891
892 self.sessions.insert(
893 peer_id,
894 RelaySessionHandle {
895 command_tx,
896 stop,
897 join,
898 downstream_pending_packets,
899 downstream_pending_bytes,
900 upstream_pending_packets,
901 upstream_pending_bytes,
902 },
903 );
904 }
905
906 async fn stop_all_sessions(&mut self) {
907 let sessions = self
908 .sessions
909 .drain()
910 .map(|(_, session)| session)
911 .collect::<Vec<_>>();
912
913 for session in sessions {
914 self.finalize_session_handle(session, true).await;
915 }
916 }
917
918 async fn finalize_session_handle(&self, session: RelaySessionHandle, request_disconnect: bool) {
919 let RelaySessionHandle {
920 command_tx,
921 stop,
922 mut join,
923 downstream_pending_packets: _,
924 downstream_pending_bytes: _,
925 upstream_pending_packets: _,
926 upstream_pending_bytes: _,
927 } = session;
928
929 if request_disconnect {
930 stop.store(true, Ordering::Relaxed);
931 let _ = command_tx.try_send(RelaySessionCommand::Disconnect);
932 }
933
934 if timeout(Duration::from_millis(200), &mut join)
935 .await
936 .is_err()
937 {
938 join.abort();
939 let _ = join.await;
940 }
941 }
942
943 async fn close_session(
944 &mut self,
945 peer_id: PeerId,
946 reason: RelaySessionCloseReason,
947 request_upstream_disconnect: bool,
948 request_downstream_disconnect: bool,
949 ) -> bool {
950 let Some(session) = self.sessions.remove(&peer_id) else {
951 return false;
952 };
953
954 self.finalize_session_handle(session, request_upstream_disconnect)
955 .await;
956 if request_downstream_disconnect {
957 let _ = self.server.disconnect(peer_id).await;
958 }
959 self.pending_events
960 .push_back(RaknetRelayProxyEvent::SessionClosed { peer_id, reason });
961 true
962 }
963}
964
965fn try_reserve_budget(
966 direction: RelayDirection,
967 payload_len: usize,
968 runtime_config: RelayRuntimeConfig,
969 downstream_pending_packets: &AtomicUsize,
970 downstream_pending_bytes: &AtomicUsize,
971 upstream_pending_packets: &AtomicUsize,
972 upstream_pending_bytes: &AtomicUsize,
973) -> Result<(), RelayBudgetExceeded> {
974 let (dir_pending_packets, dir_pending_bytes, packet_limit, byte_limit) = match direction {
975 RelayDirection::DownstreamToUpstream => (
976 downstream_pending_packets.load(Ordering::Relaxed),
977 downstream_pending_bytes.load(Ordering::Relaxed),
978 runtime_config.downstream_max_pending_packets.max(1),
979 runtime_config.downstream_max_pending_bytes.max(1),
980 ),
981 RelayDirection::UpstreamToDownstream => (
982 upstream_pending_packets.load(Ordering::Relaxed),
983 upstream_pending_bytes.load(Ordering::Relaxed),
984 runtime_config.upstream_max_pending_packets.max(1),
985 runtime_config.upstream_max_pending_bytes.max(1),
986 ),
987 };
988
989 let total_pending_bytes = downstream_pending_bytes.load(Ordering::Relaxed)
990 + upstream_pending_bytes.load(Ordering::Relaxed);
991 let total_limit = runtime_config.session_total_max_pending_bytes.max(1);
992
993 if dir_pending_packets.saturating_add(1) > packet_limit
994 || dir_pending_bytes.saturating_add(payload_len) > byte_limit
995 || total_pending_bytes.saturating_add(payload_len) > total_limit
996 {
997 return Err(RelayBudgetExceeded {
998 pending_packets: dir_pending_packets,
999 pending_bytes: dir_pending_bytes,
1000 packet_limit,
1001 byte_limit,
1002 total_pending_bytes,
1003 total_limit,
1004 });
1005 }
1006
1007 match direction {
1008 RelayDirection::DownstreamToUpstream => {
1009 downstream_pending_packets.fetch_add(1, Ordering::Relaxed);
1010 downstream_pending_bytes.fetch_add(payload_len, Ordering::Relaxed);
1011 }
1012 RelayDirection::UpstreamToDownstream => {
1013 upstream_pending_packets.fetch_add(1, Ordering::Relaxed);
1014 upstream_pending_bytes.fetch_add(payload_len, Ordering::Relaxed);
1015 }
1016 }
1017
1018 Ok(())
1019}
1020
1021fn release_reserved_budget(
1022 direction: RelayDirection,
1023 payload_len: usize,
1024 downstream_pending_packets: &AtomicUsize,
1025 downstream_pending_bytes: &AtomicUsize,
1026 upstream_pending_packets: &AtomicUsize,
1027 upstream_pending_bytes: &AtomicUsize,
1028) {
1029 match direction {
1030 RelayDirection::DownstreamToUpstream => {
1031 atomic_saturating_sub(downstream_pending_packets, 1);
1032 atomic_saturating_sub(downstream_pending_bytes, payload_len);
1033 }
1034 RelayDirection::UpstreamToDownstream => {
1035 atomic_saturating_sub(upstream_pending_packets, 1);
1036 atomic_saturating_sub(upstream_pending_bytes, payload_len);
1037 }
1038 }
1039}
1040
1041fn atomic_saturating_sub(counter: &AtomicUsize, value: usize) {
1042 let _ = counter.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| {
1043 Some(current.saturating_sub(value))
1044 });
1045}
1046
1047fn should_disconnect_downstream(reason: &RelaySessionCloseReason) -> bool {
1048 matches!(
1049 reason,
1050 RelaySessionCloseReason::UpstreamDisconnected { .. }
1051 | RelaySessionCloseReason::UpstreamSendFailed { .. }
1052 | RelaySessionCloseReason::ContractViolation { .. }
1053 | RelaySessionCloseReason::PolicyDisconnect { .. }
1054 | RelaySessionCloseReason::BudgetExceeded { .. }
1055 | RelaySessionCloseReason::CommandChannelClosed
1056 )
1057}
1058
1059#[inline]
1060fn apply_channel_hint(default_channel: u8, ordering_channel: Option<u8>) -> u8 {
1061 ordering_channel.unwrap_or(default_channel)
1062}
1063
1064#[inline]
1065fn downstream_to_upstream_send_options(
1066 defaults: ClientSendOptions,
1067 reliability: Reliability,
1068 ordering_channel: Option<u8>,
1069) -> ClientSendOptions {
1070 ClientSendOptions {
1071 reliability,
1072 channel: apply_channel_hint(defaults.channel, ordering_channel),
1073 priority: defaults.priority,
1074 }
1075}
1076
1077#[inline]
1078fn upstream_to_downstream_send_options(
1079 defaults: SendOptions,
1080 reliability: Reliability,
1081 ordering_channel: Option<u8>,
1082) -> SendOptions {
1083 SendOptions {
1084 reliability,
1085 channel: apply_channel_hint(defaults.channel, ordering_channel),
1086 priority: defaults.priority,
1087 }
1088}
1089
1090fn invalid_config_io_error(error: ConfigValidationError) -> io::Error {
1091 io::Error::new(io::ErrorKind::InvalidInput, error.to_string())
1092}
1093
1094async fn emit_session_event(
1095 tx: &mpsc::Sender<RelaySessionEvent>,
1096 event: RelaySessionEvent,
1097) -> bool {
1098 tx.send(event).await.is_ok()
1099}
1100
1101async fn run_relay_session<P>(
1102 peer_id: PeerId,
1103 mut upstream: RaknetClient,
1104 contract: Arc<RelayContract<P>>,
1105 mut command_rx: mpsc::Receiver<RelaySessionCommand>,
1106 context: RelaySessionRuntimeContext,
1107) where
1108 P: RelayPolicy,
1109{
1110 let RelaySessionRuntimeContext {
1111 event_tx,
1112 upstream_to_downstream_send,
1113 runtime_config,
1114 stop,
1115 downstream_pending_packets,
1116 downstream_pending_bytes,
1117 upstream_pending_packets,
1118 upstream_pending_bytes,
1119 } = context;
1120
1121 loop {
1122 if stop.load(Ordering::Relaxed) {
1123 let _ = upstream.disconnect(None).await;
1124 emit_session_event(
1125 &event_tx,
1126 RelaySessionEvent::Terminated {
1127 peer_id,
1128 reason: RelaySessionCloseReason::ProxyShutdown,
1129 },
1130 )
1131 .await;
1132 break;
1133 }
1134
1135 tokio::select! {
1136 command = command_rx.recv() => {
1137 match command {
1138 Some(RelaySessionCommand::ForwardDownstreamPayload {
1139 payload,
1140 send_options,
1141 }) => {
1142 release_reserved_budget(
1143 RelayDirection::DownstreamToUpstream,
1144 payload.len(),
1145 &downstream_pending_packets,
1146 &downstream_pending_bytes,
1147 &upstream_pending_packets,
1148 &upstream_pending_bytes,
1149 );
1150
1151 match contract.apply(RelayDirection::DownstreamToUpstream, payload) {
1152 Ok(Some(forward_payload)) => {
1153 let payload_len = forward_payload.len();
1154 match upstream.send_with_options(forward_payload, send_options).await {
1155 Ok(()) => {
1156 emit_session_event(
1157 &event_tx,
1158 RelaySessionEvent::Forwarded {
1159 peer_id,
1160 direction: RelayDirection::DownstreamToUpstream,
1161 payload_len,
1162 },
1163 ).await;
1164 }
1165 Err(error) => {
1166 emit_session_event(
1167 &event_tx,
1168 RelaySessionEvent::Terminated {
1169 peer_id,
1170 reason: RelaySessionCloseReason::UpstreamSendFailed {
1171 message: error.to_string(),
1172 },
1173 },
1174 ).await;
1175 break;
1176 }
1177 }
1178 }
1179 Ok(None) => {
1180 emit_session_event(
1181 &event_tx,
1182 RelaySessionEvent::Dropped {
1183 peer_id,
1184 direction: RelayDirection::DownstreamToUpstream,
1185 reason: RelayDropReason::PolicyDrop,
1186 },
1187 ).await;
1188 }
1189 Err(RelayContractError::PolicyDisconnect { reason }) => {
1190 emit_session_event(
1191 &event_tx,
1192 RelaySessionEvent::Terminated {
1193 peer_id,
1194 reason: RelaySessionCloseReason::PolicyDisconnect {
1195 direction: RelayDirection::DownstreamToUpstream,
1196 reason,
1197 },
1198 },
1199 ).await;
1200 break;
1201 }
1202 Err(error) => {
1203 emit_session_event(
1204 &event_tx,
1205 RelaySessionEvent::Dropped {
1206 peer_id,
1207 direction: RelayDirection::DownstreamToUpstream,
1208 reason: RelayDropReason::ContractViolation(error.clone()),
1209 },
1210 ).await;
1211
1212 emit_session_event(
1213 &event_tx,
1214 RelaySessionEvent::Terminated {
1215 peer_id,
1216 reason: RelaySessionCloseReason::ContractViolation {
1217 direction: RelayDirection::DownstreamToUpstream,
1218 error,
1219 },
1220 },
1221 ).await;
1222 break;
1223 }
1224 }
1225 }
1226 Some(RelaySessionCommand::Disconnect) => {
1227 let _ = upstream.disconnect(None).await;
1228 emit_session_event(
1229 &event_tx,
1230 RelaySessionEvent::Terminated {
1231 peer_id,
1232 reason: RelaySessionCloseReason::ProxyShutdown,
1233 },
1234 ).await;
1235 break;
1236 }
1237 None => {
1238 emit_session_event(
1239 &event_tx,
1240 RelaySessionEvent::Terminated {
1241 peer_id,
1242 reason: RelaySessionCloseReason::CommandChannelClosed,
1243 },
1244 ).await;
1245 break;
1246 }
1247 }
1248 }
1249 upstream_event = upstream.next_event() => {
1250 match upstream_event {
1251 Some(RaknetClientEvent::Packet {
1252 payload,
1253 reliability,
1254 ordering_channel,
1255 ..
1256 }) => {
1257 match contract.apply(RelayDirection::UpstreamToDownstream, payload) {
1258 Ok(Some(forward_payload)) => {
1259 let payload_len = forward_payload.len();
1260 let send_options = upstream_to_downstream_send_options(
1261 upstream_to_downstream_send,
1262 reliability,
1263 ordering_channel,
1264 );
1265 match try_reserve_budget(
1266 RelayDirection::UpstreamToDownstream,
1267 payload_len,
1268 runtime_config,
1269 &downstream_pending_packets,
1270 &downstream_pending_bytes,
1271 &upstream_pending_packets,
1272 &upstream_pending_bytes,
1273 ) {
1274 Ok(()) => {
1275 if !emit_session_event(
1276 &event_tx,
1277 RelaySessionEvent::ForwardToDownstream {
1278 peer_id,
1279 payload: forward_payload,
1280 send_options,
1281 },
1282 )
1283 .await
1284 {
1285 release_reserved_budget(
1286 RelayDirection::UpstreamToDownstream,
1287 payload_len,
1288 &downstream_pending_packets,
1289 &downstream_pending_bytes,
1290 &upstream_pending_packets,
1291 &upstream_pending_bytes,
1292 );
1293 emit_session_event(
1294 &event_tx,
1295 RelaySessionEvent::Terminated {
1296 peer_id,
1297 reason: RelaySessionCloseReason::CommandChannelClosed,
1298 },
1299 )
1300 .await;
1301 break;
1302 }
1303 }
1304 Err(details) => match runtime_config.budget_overflow_policy {
1305 RelayOverflowPolicy::DropNewest => {
1306 emit_session_event(
1307 &event_tx,
1308 RelaySessionEvent::Dropped {
1309 peer_id,
1310 direction: RelayDirection::UpstreamToDownstream,
1311 reason: RelayDropReason::BudgetExceeded(details),
1312 },
1313 )
1314 .await;
1315 }
1316 RelayOverflowPolicy::DisconnectSession => {
1317 emit_session_event(
1318 &event_tx,
1319 RelaySessionEvent::Terminated {
1320 peer_id,
1321 reason: RelaySessionCloseReason::BudgetExceeded {
1322 direction: RelayDirection::UpstreamToDownstream,
1323 details,
1324 },
1325 },
1326 )
1327 .await;
1328 break;
1329 }
1330 },
1331 }
1332 }
1333 Ok(None) => {
1334 emit_session_event(
1335 &event_tx,
1336 RelaySessionEvent::Dropped {
1337 peer_id,
1338 direction: RelayDirection::UpstreamToDownstream,
1339 reason: RelayDropReason::PolicyDrop,
1340 },
1341 ).await;
1342 }
1343 Err(RelayContractError::PolicyDisconnect { reason }) => {
1344 emit_session_event(
1345 &event_tx,
1346 RelaySessionEvent::Terminated {
1347 peer_id,
1348 reason: RelaySessionCloseReason::PolicyDisconnect {
1349 direction: RelayDirection::UpstreamToDownstream,
1350 reason,
1351 },
1352 },
1353 ).await;
1354 break;
1355 }
1356 Err(error) => {
1357 emit_session_event(
1358 &event_tx,
1359 RelaySessionEvent::Dropped {
1360 peer_id,
1361 direction: RelayDirection::UpstreamToDownstream,
1362 reason: RelayDropReason::ContractViolation(error.clone()),
1363 },
1364 ).await;
1365
1366 emit_session_event(
1367 &event_tx,
1368 RelaySessionEvent::Terminated {
1369 peer_id,
1370 reason: RelaySessionCloseReason::ContractViolation {
1371 direction: RelayDirection::UpstreamToDownstream,
1372 error,
1373 },
1374 },
1375 ).await;
1376 break;
1377 }
1378 }
1379 }
1380 Some(RaknetClientEvent::Disconnected { reason }) => {
1381 emit_session_event(
1382 &event_tx,
1383 RelaySessionEvent::Terminated {
1384 peer_id,
1385 reason: RelaySessionCloseReason::UpstreamDisconnected { reason },
1386 },
1387 ).await;
1388 break;
1389 }
1390 Some(RaknetClientEvent::DecodeError { error }) => {
1391 emit_session_event(
1392 &event_tx,
1393 RelaySessionEvent::DecodeError {
1394 peer_id,
1395 direction: RelayDirection::UpstreamToDownstream,
1396 error,
1397 },
1398 ).await;
1399 }
1400 Some(RaknetClientEvent::Connected { .. })
1401 | Some(RaknetClientEvent::ReceiptAcked { .. }) => {}
1402 None => {
1403 emit_session_event(
1404 &event_tx,
1405 RelaySessionEvent::Terminated {
1406 peer_id,
1407 reason: RelaySessionCloseReason::UpstreamDisconnected {
1408 reason: ClientDisconnectReason::TransportError {
1409 message: "upstream event stream ended".to_string(),
1410 },
1411 },
1412 },
1413 ).await;
1414 break;
1415 }
1416 }
1417 }
1418 }
1419 }
1420}
1421
1422#[cfg(test)]
1423mod tests {
1424 use bytes::Bytes;
1425
1426 use crate::client::ClientSendOptions;
1427 use crate::protocol::reliability::Reliability;
1428 use crate::server::SendOptions;
1429 use crate::session::RakPriority;
1430
1431 use super::{
1432 PassthroughRelayPolicy, RelayContract, RelayContractConfig, RelayContractError,
1433 RelayDecision, RelayDirection, RelayPolicy, RelayRuntimeConfig,
1434 };
1435
1436 #[test]
1437 fn contract_rejects_empty_when_disabled() {
1438 let contract = RelayContract::new(
1439 RelayContractConfig {
1440 max_payload_bytes: 64,
1441 allow_empty_payload: false,
1442 },
1443 PassthroughRelayPolicy,
1444 );
1445
1446 let err = contract
1447 .apply(RelayDirection::DownstreamToUpstream, Bytes::new())
1448 .expect_err("empty payload must be rejected");
1449 assert_eq!(err, RelayContractError::EmptyPayload);
1450 }
1451
1452 #[test]
1453 fn contract_rejects_oversized_payload() {
1454 let contract = RelayContract::new(
1455 RelayContractConfig {
1456 max_payload_bytes: 2,
1457 allow_empty_payload: true,
1458 },
1459 PassthroughRelayPolicy,
1460 );
1461
1462 let err = contract
1463 .apply(
1464 RelayDirection::DownstreamToUpstream,
1465 Bytes::from_static(b"abc"),
1466 )
1467 .expect_err("oversized payload must be rejected");
1468
1469 assert_eq!(
1470 err,
1471 RelayContractError::PayloadTooLarge { actual: 3, max: 2 }
1472 );
1473 }
1474
1475 struct DropPolicy;
1476
1477 impl RelayPolicy for DropPolicy {
1478 fn decide(&self, _direction: RelayDirection, _payload: &Bytes) -> RelayDecision {
1479 RelayDecision::Drop
1480 }
1481 }
1482
1483 struct DisconnectPolicy;
1484
1485 impl RelayPolicy for DisconnectPolicy {
1486 fn decide(&self, _direction: RelayDirection, _payload: &Bytes) -> RelayDecision {
1487 RelayDecision::Disconnect {
1488 reason: "policy_disconnect",
1489 }
1490 }
1491 }
1492
1493 #[test]
1494 fn policy_can_drop_payload() {
1495 let contract = RelayContract::new(RelayContractConfig::default(), DropPolicy);
1496 let result = contract
1497 .apply(
1498 RelayDirection::UpstreamToDownstream,
1499 Bytes::from_static(b"ok"),
1500 )
1501 .expect("drop should not error");
1502
1503 assert!(result.is_none());
1504 }
1505
1506 #[test]
1507 fn policy_disconnect_is_reported() {
1508 let contract = RelayContract::new(RelayContractConfig::default(), DisconnectPolicy);
1509 let err = contract
1510 .apply(
1511 RelayDirection::UpstreamToDownstream,
1512 Bytes::from_static(b"ok"),
1513 )
1514 .expect_err("disconnect policy must error");
1515
1516 assert_eq!(
1517 err,
1518 RelayContractError::PolicyDisconnect {
1519 reason: "policy_disconnect"
1520 }
1521 );
1522 }
1523
1524 #[test]
1525 fn runtime_config_defaults_are_non_zero() {
1526 let cfg = RelayRuntimeConfig::default();
1527 assert!(cfg.per_session_downstream_queue_capacity > 0);
1528 assert!(cfg.session_event_queue_capacity > 0);
1529 cfg.validate()
1530 .expect("default relay runtime config must be valid");
1531 }
1532
1533 #[test]
1534 fn contract_config_validate_rejects_zero_payload_limit() {
1535 let cfg = RelayContractConfig {
1536 max_payload_bytes: 0,
1537 allow_empty_payload: true,
1538 };
1539 let err = cfg
1540 .validate()
1541 .expect_err("max_payload_bytes=0 must be rejected");
1542 assert_eq!(err.config, "RelayContractConfig");
1543 assert_eq!(err.field, "max_payload_bytes");
1544 }
1545
1546 #[test]
1547 fn runtime_config_validate_rejects_total_budget_below_directional_max() {
1548 let cfg = RelayRuntimeConfig {
1549 downstream_max_pending_bytes: 512,
1550 upstream_max_pending_bytes: 1024,
1551 session_total_max_pending_bytes: 900,
1552 ..RelayRuntimeConfig::default()
1553 };
1554 let err = cfg.validate().expect_err("invalid total budget must fail");
1555 assert_eq!(err.config, "RelayRuntimeConfig");
1556 assert_eq!(err.field, "session_total_max_pending_bytes");
1557 }
1558
1559 #[test]
1560 fn downstream_send_options_preserve_reliability_and_channel_hint() {
1561 let defaults = ClientSendOptions {
1562 reliability: Reliability::ReliableOrdered,
1563 channel: 0,
1564 priority: RakPriority::Immediate,
1565 };
1566 let options =
1567 super::downstream_to_upstream_send_options(defaults, Reliability::Reliable, Some(9));
1568 assert_eq!(options.reliability, Reliability::Reliable);
1569 assert_eq!(options.channel, 9);
1570 assert_eq!(options.priority, RakPriority::Immediate);
1571 }
1572
1573 #[test]
1574 fn upstream_send_options_fall_back_to_default_channel_when_missing_hint() {
1575 let defaults = SendOptions {
1576 reliability: Reliability::ReliableOrdered,
1577 channel: 3,
1578 priority: RakPriority::High,
1579 };
1580 let options =
1581 super::upstream_to_downstream_send_options(defaults, Reliability::Unreliable, None);
1582 assert_eq!(options.reliability, Reliability::Unreliable);
1583 assert_eq!(options.channel, 3);
1584 assert_eq!(options.priority, RakPriority::High);
1585 }
1586}