Skip to main content

raknet_rust/
proxy.rs

1use std::collections::{HashMap, VecDeque};
2use std::io;
3use std::net::SocketAddr;
4use std::sync::{
5    Arc,
6    atomic::{AtomicBool, AtomicUsize, Ordering},
7};
8use std::time::Duration;
9
10use bytes::Bytes;
11use thiserror::Error;
12use tokio::sync::mpsc;
13use tokio::sync::mpsc::error::TrySendError;
14use tokio::task::JoinHandle;
15use tokio::time::timeout;
16
17use crate::client::{
18    ClientDisconnectReason, ClientSendOptions, RaknetClient, RaknetClientConfig, RaknetClientError,
19    RaknetClientEvent, ReconnectPolicy,
20};
21use crate::error::ConfigValidationError;
22use crate::protocol::reliability::Reliability;
23use crate::server::{PeerDisconnectReason, PeerId, RaknetServer, RaknetServerEvent, SendOptions};
24
25#[derive(Debug, Clone, Copy, PartialEq, Eq)]
26pub enum RelayDirection {
27    DownstreamToUpstream,
28    UpstreamToDownstream,
29}
30
31#[derive(Debug, Clone, PartialEq, Eq)]
32pub enum RelayDecision {
33    Forward(Bytes),
34    Drop,
35    Disconnect { reason: &'static str },
36}
37
38pub trait RelayPolicy: Send + Sync + 'static {
39    fn decide(&self, direction: RelayDirection, payload: &Bytes) -> RelayDecision {
40        let _ = direction;
41        RelayDecision::Forward(payload.clone())
42    }
43}
44
45#[derive(Debug, Clone, Copy, Default)]
46pub struct PassthroughRelayPolicy;
47
48impl RelayPolicy for PassthroughRelayPolicy {}
49
50#[derive(Debug, Clone, Copy, PartialEq, Eq)]
51pub struct RelayContractConfig {
52    pub max_payload_bytes: usize,
53    pub allow_empty_payload: bool,
54}
55
56impl Default for RelayContractConfig {
57    fn default() -> Self {
58        Self {
59            max_payload_bytes: 2 * 1024 * 1024,
60            allow_empty_payload: false,
61        }
62    }
63}
64
65impl RelayContractConfig {
66    pub fn validate(&self) -> Result<(), ConfigValidationError> {
67        if self.max_payload_bytes == 0 {
68            return Err(ConfigValidationError::new(
69                "RelayContractConfig",
70                "max_payload_bytes",
71                "must be >= 1",
72            ));
73        }
74        Ok(())
75    }
76}
77
78#[derive(Debug, Error, Clone, PartialEq, Eq)]
79pub enum RelayContractError {
80    #[error("empty payload is not allowed by relay contract")]
81    EmptyPayload,
82    #[error("payload too large for relay contract: {actual} > {max}")]
83    PayloadTooLarge { actual: usize, max: usize },
84    #[error("relay policy requested disconnect: {reason}")]
85    PolicyDisconnect { reason: &'static str },
86}
87
88pub struct RelayContract<P = PassthroughRelayPolicy> {
89    config: RelayContractConfig,
90    policy: P,
91}
92
93impl<P> RelayContract<P>
94where
95    P: RelayPolicy,
96{
97    pub fn new(config: RelayContractConfig, policy: P) -> Self {
98        Self { config, policy }
99    }
100
101    pub fn config(&self) -> RelayContractConfig {
102        self.config
103    }
104
105    pub fn apply(
106        &self,
107        direction: RelayDirection,
108        payload: Bytes,
109    ) -> Result<Option<Bytes>, RelayContractError> {
110        self.validate_payload(&payload)?;
111
112        match self.policy.decide(direction, &payload) {
113            RelayDecision::Forward(bytes) => {
114                self.validate_payload(&bytes)?;
115                Ok(Some(bytes))
116            }
117            RelayDecision::Drop => Ok(None),
118            RelayDecision::Disconnect { reason } => {
119                Err(RelayContractError::PolicyDisconnect { reason })
120            }
121        }
122    }
123
124    fn validate_payload(&self, payload: &Bytes) -> Result<(), RelayContractError> {
125        if payload.is_empty() && !self.config.allow_empty_payload {
126            return Err(RelayContractError::EmptyPayload);
127        }
128
129        if payload.len() > self.config.max_payload_bytes {
130            return Err(RelayContractError::PayloadTooLarge {
131                actual: payload.len(),
132                max: self.config.max_payload_bytes,
133            });
134        }
135
136        Ok(())
137    }
138}
139
140#[derive(Debug, Clone, Default)]
141pub struct UpstreamConnectorConfig {
142    pub client_config: RaknetClientConfig,
143    pub reconnect_policy: ReconnectPolicy,
144}
145
146impl UpstreamConnectorConfig {
147    pub fn validate(&self) -> Result<(), ConfigValidationError> {
148        self.client_config.validate()?;
149        self.reconnect_policy.validate()?;
150        Ok(())
151    }
152}
153
154#[derive(Debug, Clone)]
155pub struct UpstreamConnector {
156    pub upstream_addr: SocketAddr,
157    pub config: UpstreamConnectorConfig,
158}
159
160impl UpstreamConnector {
161    pub fn new(upstream_addr: SocketAddr, config: UpstreamConnectorConfig) -> Self {
162        Self {
163            upstream_addr,
164            config,
165        }
166    }
167
168    pub async fn connect(&self) -> Result<RaknetClient, RaknetClientError> {
169        RaknetClient::connect_with_retry(
170            self.upstream_addr,
171            self.config.client_config.clone(),
172            self.config.reconnect_policy.clone(),
173        )
174        .await
175    }
176}
177
178#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
179pub enum RelayOverflowPolicy {
180    #[default]
181    DropNewest,
182    DisconnectSession,
183}
184
185#[derive(Debug, Clone, Copy)]
186pub struct RelayRuntimeConfig {
187    pub per_session_downstream_queue_capacity: usize,
188    pub session_event_queue_capacity: usize,
189    pub downstream_to_upstream_send: ClientSendOptions,
190    pub upstream_to_downstream_send: SendOptions,
191    pub downstream_overflow_policy: RelayOverflowPolicy,
192    pub budget_overflow_policy: RelayOverflowPolicy,
193    pub downstream_max_pending_packets: usize,
194    pub downstream_max_pending_bytes: usize,
195    pub upstream_max_pending_packets: usize,
196    pub upstream_max_pending_bytes: usize,
197    pub session_total_max_pending_bytes: usize,
198}
199
200impl Default for RelayRuntimeConfig {
201    fn default() -> Self {
202        Self {
203            per_session_downstream_queue_capacity: 256,
204            session_event_queue_capacity: 1024,
205            downstream_to_upstream_send: ClientSendOptions::default(),
206            upstream_to_downstream_send: SendOptions::default(),
207            downstream_overflow_policy: RelayOverflowPolicy::DropNewest,
208            budget_overflow_policy: RelayOverflowPolicy::DisconnectSession,
209            downstream_max_pending_packets: 256,
210            downstream_max_pending_bytes: 512 * 1024,
211            upstream_max_pending_packets: 512,
212            upstream_max_pending_bytes: 1024 * 1024,
213            session_total_max_pending_bytes: 1536 * 1024,
214        }
215    }
216}
217
218impl RelayRuntimeConfig {
219    pub fn validate(&self) -> Result<(), ConfigValidationError> {
220        if self.per_session_downstream_queue_capacity == 0 {
221            return Err(ConfigValidationError::new(
222                "RelayRuntimeConfig",
223                "per_session_downstream_queue_capacity",
224                "must be >= 1",
225            ));
226        }
227        if self.session_event_queue_capacity == 0 {
228            return Err(ConfigValidationError::new(
229                "RelayRuntimeConfig",
230                "session_event_queue_capacity",
231                "must be >= 1",
232            ));
233        }
234        if self.downstream_max_pending_packets == 0 {
235            return Err(ConfigValidationError::new(
236                "RelayRuntimeConfig",
237                "downstream_max_pending_packets",
238                "must be >= 1",
239            ));
240        }
241        if self.downstream_max_pending_bytes == 0 {
242            return Err(ConfigValidationError::new(
243                "RelayRuntimeConfig",
244                "downstream_max_pending_bytes",
245                "must be >= 1",
246            ));
247        }
248        if self.upstream_max_pending_packets == 0 {
249            return Err(ConfigValidationError::new(
250                "RelayRuntimeConfig",
251                "upstream_max_pending_packets",
252                "must be >= 1",
253            ));
254        }
255        if self.upstream_max_pending_bytes == 0 {
256            return Err(ConfigValidationError::new(
257                "RelayRuntimeConfig",
258                "upstream_max_pending_bytes",
259                "must be >= 1",
260            ));
261        }
262        if self.session_total_max_pending_bytes == 0 {
263            return Err(ConfigValidationError::new(
264                "RelayRuntimeConfig",
265                "session_total_max_pending_bytes",
266                "must be >= 1",
267            ));
268        }
269        let min_total = self
270            .downstream_max_pending_bytes
271            .max(self.upstream_max_pending_bytes);
272        if self.session_total_max_pending_bytes < min_total {
273            return Err(ConfigValidationError::new(
274                "RelayRuntimeConfig",
275                "session_total_max_pending_bytes",
276                format!(
277                    "must be >= max(downstream_max_pending_bytes, upstream_max_pending_bytes) = {min_total}"
278                ),
279            ));
280        }
281        Ok(())
282    }
283}
284
285#[derive(Debug, Clone, PartialEq, Eq)]
286pub struct RelayBudgetExceeded {
287    pub pending_packets: usize,
288    pub pending_bytes: usize,
289    pub packet_limit: usize,
290    pub byte_limit: usize,
291    pub total_pending_bytes: usize,
292    pub total_limit: usize,
293}
294
295#[derive(Debug, Clone, PartialEq, Eq)]
296pub enum RelayDropReason {
297    NoSession,
298    QueueOverflow,
299    BudgetExceeded(RelayBudgetExceeded),
300    PolicyDrop,
301    ContractViolation(RelayContractError),
302}
303
304#[derive(Debug, Clone, PartialEq, Eq)]
305pub enum RelaySessionCloseReason {
306    DownstreamDisconnected {
307        reason: PeerDisconnectReason,
308    },
309    UpstreamDisconnected {
310        reason: ClientDisconnectReason,
311    },
312    UpstreamConnectFailed {
313        message: String,
314    },
315    UpstreamSendFailed {
316        message: String,
317    },
318    DownstreamSendFailed {
319        message: String,
320    },
321    ContractViolation {
322        direction: RelayDirection,
323        error: RelayContractError,
324    },
325    PolicyDisconnect {
326        direction: RelayDirection,
327        reason: &'static str,
328    },
329    BudgetExceeded {
330        direction: RelayDirection,
331        details: RelayBudgetExceeded,
332    },
333    DownstreamQueueOverflow,
334    CommandChannelClosed,
335    ProxyShutdown,
336}
337
338#[derive(Debug)]
339pub enum RaknetRelayProxyEvent {
340    SessionStarted {
341        peer_id: PeerId,
342        downstream_addr: SocketAddr,
343        upstream_addr: SocketAddr,
344    },
345    Forwarded {
346        peer_id: PeerId,
347        direction: RelayDirection,
348        payload_len: usize,
349    },
350    Dropped {
351        peer_id: PeerId,
352        direction: RelayDirection,
353        reason: RelayDropReason,
354    },
355    DecodeError {
356        peer_id: PeerId,
357        direction: RelayDirection,
358        error: String,
359    },
360    SessionClosed {
361        peer_id: PeerId,
362        reason: RelaySessionCloseReason,
363    },
364    DownstreamRateLimited {
365        addr: SocketAddr,
366    },
367    DownstreamSessionLimitReached {
368        addr: SocketAddr,
369    },
370    DownstreamProxyDropped {
371        addr: SocketAddr,
372    },
373    DownstreamDecodeError {
374        addr: SocketAddr,
375        error: String,
376    },
377    DownstreamWorkerError {
378        shard_id: usize,
379        message: String,
380    },
381    DownstreamWorkerStopped {
382        shard_id: usize,
383    },
384}
385
386struct RelaySessionHandle {
387    command_tx: mpsc::Sender<RelaySessionCommand>,
388    stop: Arc<AtomicBool>,
389    join: JoinHandle<()>,
390    downstream_pending_packets: Arc<AtomicUsize>,
391    downstream_pending_bytes: Arc<AtomicUsize>,
392    upstream_pending_packets: Arc<AtomicUsize>,
393    upstream_pending_bytes: Arc<AtomicUsize>,
394}
395
396enum RelayInput {
397    Server(Option<RaknetServerEvent>),
398    Session(Option<RelaySessionEvent>),
399}
400
401enum RelaySessionCommand {
402    ForwardDownstreamPayload {
403        payload: Bytes,
404        send_options: ClientSendOptions,
405    },
406    Disconnect,
407}
408
409enum RelaySessionEvent {
410    ForwardToDownstream {
411        peer_id: PeerId,
412        payload: Bytes,
413        send_options: SendOptions,
414    },
415    Forwarded {
416        peer_id: PeerId,
417        direction: RelayDirection,
418        payload_len: usize,
419    },
420    Dropped {
421        peer_id: PeerId,
422        direction: RelayDirection,
423        reason: RelayDropReason,
424    },
425    DecodeError {
426        peer_id: PeerId,
427        direction: RelayDirection,
428        error: String,
429    },
430    Terminated {
431        peer_id: PeerId,
432        reason: RelaySessionCloseReason,
433    },
434}
435
436struct RelaySessionRuntimeContext {
437    event_tx: mpsc::Sender<RelaySessionEvent>,
438    upstream_to_downstream_send: SendOptions,
439    runtime_config: RelayRuntimeConfig,
440    stop: Arc<AtomicBool>,
441    downstream_pending_packets: Arc<AtomicUsize>,
442    downstream_pending_bytes: Arc<AtomicUsize>,
443    upstream_pending_packets: Arc<AtomicUsize>,
444    upstream_pending_bytes: Arc<AtomicUsize>,
445}
446
447pub struct RaknetRelayProxy<P = PassthroughRelayPolicy> {
448    server: RaknetServer,
449    upstream_connector: UpstreamConnector,
450    contract: Arc<RelayContract<P>>,
451    runtime_config: RelayRuntimeConfig,
452    sessions: HashMap<PeerId, RelaySessionHandle>,
453    session_event_tx: mpsc::Sender<RelaySessionEvent>,
454    session_event_rx: mpsc::Receiver<RelaySessionEvent>,
455    pending_events: VecDeque<RaknetRelayProxyEvent>,
456}
457
458impl<P> RaknetRelayProxy<P>
459where
460    P: RelayPolicy,
461{
462    pub fn try_new(
463        server: RaknetServer,
464        upstream_connector: UpstreamConnector,
465        contract: RelayContract<P>,
466        runtime_config: RelayRuntimeConfig,
467    ) -> io::Result<Self> {
468        upstream_connector
469            .config
470            .validate()
471            .map_err(invalid_config_io_error)?;
472        contract
473            .config()
474            .validate()
475            .map_err(invalid_config_io_error)?;
476        runtime_config.validate().map_err(invalid_config_io_error)?;
477        Ok(Self::new(
478            server,
479            upstream_connector,
480            contract,
481            runtime_config,
482        ))
483    }
484
485    pub fn new(
486        server: RaknetServer,
487        upstream_connector: UpstreamConnector,
488        contract: RelayContract<P>,
489        runtime_config: RelayRuntimeConfig,
490    ) -> Self {
491        let (session_event_tx, session_event_rx) =
492            mpsc::channel(runtime_config.session_event_queue_capacity.max(1));
493
494        Self {
495            server,
496            upstream_connector,
497            contract: Arc::new(contract),
498            runtime_config,
499            sessions: HashMap::new(),
500            session_event_tx,
501            session_event_rx,
502            pending_events: VecDeque::new(),
503        }
504    }
505
506    pub async fn bind(
507        downstream_bind_addr: SocketAddr,
508        upstream_connector: UpstreamConnector,
509        contract: RelayContract<P>,
510        runtime_config: RelayRuntimeConfig,
511    ) -> io::Result<Self> {
512        let server = RaknetServer::bind(downstream_bind_addr).await?;
513        Self::try_new(server, upstream_connector, contract, runtime_config)
514    }
515
516    pub fn session_count(&self) -> usize {
517        self.sessions.len()
518    }
519
520    pub async fn next_event(&mut self) -> Option<RaknetRelayProxyEvent> {
521        if let Some(event) = self.pending_events.pop_front() {
522            return Some(event);
523        }
524
525        loop {
526            let input = {
527                let server = &mut self.server;
528                let session_event_rx = &mut self.session_event_rx;
529                tokio::select! {
530                    event = server.next_event() => RelayInput::Server(event),
531                    event = session_event_rx.recv() => RelayInput::Session(event),
532                }
533            };
534
535            match input {
536                RelayInput::Server(Some(event)) => self.handle_server_event(event).await,
537                RelayInput::Session(Some(event)) => self.handle_session_event(event).await,
538                RelayInput::Server(None) => return None,
539                RelayInput::Session(None) => {
540                    if self.sessions.is_empty() {
541                        return None;
542                    }
543                }
544            }
545
546            if let Some(event) = self.pending_events.pop_front() {
547                return Some(event);
548            }
549        }
550    }
551
552    pub async fn shutdown(mut self) -> io::Result<()> {
553        self.stop_all_sessions().await;
554        self.server.shutdown().await
555    }
556
557    async fn handle_server_event(&mut self, event: RaknetServerEvent) {
558        match event {
559            RaknetServerEvent::PeerConnected { peer_id, addr, .. } => {
560                if let Some(existing) = self.sessions.remove(&peer_id) {
561                    self.finalize_session_handle(existing, true).await;
562                }
563
564                match self.upstream_connector.connect().await {
565                    Ok(upstream_client) => {
566                        let upstream_addr = upstream_client.server_addr();
567                        self.spawn_session(peer_id, upstream_client);
568                        self.pending_events
569                            .push_back(RaknetRelayProxyEvent::SessionStarted {
570                                peer_id,
571                                downstream_addr: addr,
572                                upstream_addr,
573                            });
574                    }
575                    Err(error) => {
576                        let reason = RelaySessionCloseReason::UpstreamConnectFailed {
577                            message: error.to_string(),
578                        };
579                        let _ = self.server.disconnect(peer_id).await;
580                        self.pending_events
581                            .push_back(RaknetRelayProxyEvent::SessionClosed { peer_id, reason });
582                    }
583                }
584            }
585            RaknetServerEvent::PeerDisconnected {
586                peer_id, reason, ..
587            } => {
588                let _ = self
589                    .close_session(
590                        peer_id,
591                        RelaySessionCloseReason::DownstreamDisconnected { reason },
592                        true,
593                        false,
594                    )
595                    .await;
596            }
597            RaknetServerEvent::Packet {
598                peer_id,
599                payload,
600                reliability,
601                ordering_channel,
602                ..
603            } => {
604                self.forward_downstream_payload(peer_id, payload, reliability, ordering_channel)
605                    .await;
606            }
607            RaknetServerEvent::PeerRateLimited { addr } => {
608                self.pending_events
609                    .push_back(RaknetRelayProxyEvent::DownstreamRateLimited { addr });
610            }
611            RaknetServerEvent::SessionLimitReached { addr } => {
612                self.pending_events
613                    .push_back(RaknetRelayProxyEvent::DownstreamSessionLimitReached { addr });
614            }
615            RaknetServerEvent::ProxyDropped { addr } => {
616                self.pending_events
617                    .push_back(RaknetRelayProxyEvent::DownstreamProxyDropped { addr });
618            }
619            RaknetServerEvent::DecodeError { addr, error } => {
620                self.pending_events
621                    .push_back(RaknetRelayProxyEvent::DownstreamDecodeError { addr, error });
622            }
623            RaknetServerEvent::WorkerError { shard_id, message } => {
624                self.pending_events
625                    .push_back(RaknetRelayProxyEvent::DownstreamWorkerError { shard_id, message });
626            }
627            RaknetServerEvent::WorkerStopped { shard_id } => {
628                self.pending_events
629                    .push_back(RaknetRelayProxyEvent::DownstreamWorkerStopped { shard_id });
630            }
631            RaknetServerEvent::OfflinePacket { .. } => {}
632            RaknetServerEvent::ReceiptAcked { .. } => {}
633            RaknetServerEvent::Metrics { .. } => {}
634        }
635    }
636
637    async fn forward_downstream_payload(
638        &mut self,
639        peer_id: PeerId,
640        payload: Bytes,
641        reliability: Reliability,
642        ordering_channel: Option<u8>,
643    ) {
644        let Some(session) = self.sessions.get(&peer_id) else {
645            self.pending_events
646                .push_back(RaknetRelayProxyEvent::Dropped {
647                    peer_id,
648                    direction: RelayDirection::DownstreamToUpstream,
649                    reason: RelayDropReason::NoSession,
650                });
651            return;
652        };
653
654        let payload_len = payload.len();
655        let send_options = downstream_to_upstream_send_options(
656            self.runtime_config.downstream_to_upstream_send,
657            reliability,
658            ordering_channel,
659        );
660        if let Err(details) = try_reserve_budget(
661            RelayDirection::DownstreamToUpstream,
662            payload_len,
663            self.runtime_config,
664            &session.downstream_pending_packets,
665            &session.downstream_pending_bytes,
666            &session.upstream_pending_packets,
667            &session.upstream_pending_bytes,
668        ) {
669            match self.runtime_config.budget_overflow_policy {
670                RelayOverflowPolicy::DropNewest => {
671                    self.pending_events
672                        .push_back(RaknetRelayProxyEvent::Dropped {
673                            peer_id,
674                            direction: RelayDirection::DownstreamToUpstream,
675                            reason: RelayDropReason::BudgetExceeded(details),
676                        });
677                }
678                RelayOverflowPolicy::DisconnectSession => {
679                    let _ = self
680                        .close_session(
681                            peer_id,
682                            RelaySessionCloseReason::BudgetExceeded {
683                                direction: RelayDirection::DownstreamToUpstream,
684                                details,
685                            },
686                            true,
687                            true,
688                        )
689                        .await;
690                }
691            }
692            return;
693        }
694
695        match session
696            .command_tx
697            .try_send(RelaySessionCommand::ForwardDownstreamPayload {
698                payload,
699                send_options,
700            }) {
701            Ok(()) => {}
702            Err(TrySendError::Full(_)) => match self.runtime_config.downstream_overflow_policy {
703                RelayOverflowPolicy::DropNewest => {
704                    release_reserved_budget(
705                        RelayDirection::DownstreamToUpstream,
706                        payload_len,
707                        &session.downstream_pending_packets,
708                        &session.downstream_pending_bytes,
709                        &session.upstream_pending_packets,
710                        &session.upstream_pending_bytes,
711                    );
712                    self.pending_events
713                        .push_back(RaknetRelayProxyEvent::Dropped {
714                            peer_id,
715                            direction: RelayDirection::DownstreamToUpstream,
716                            reason: RelayDropReason::QueueOverflow,
717                        });
718                }
719                RelayOverflowPolicy::DisconnectSession => {
720                    release_reserved_budget(
721                        RelayDirection::DownstreamToUpstream,
722                        payload_len,
723                        &session.downstream_pending_packets,
724                        &session.downstream_pending_bytes,
725                        &session.upstream_pending_packets,
726                        &session.upstream_pending_bytes,
727                    );
728                    let _ = self
729                        .close_session(
730                            peer_id,
731                            RelaySessionCloseReason::DownstreamQueueOverflow,
732                            true,
733                            true,
734                        )
735                        .await;
736                }
737            },
738            Err(TrySendError::Closed(_)) => {
739                release_reserved_budget(
740                    RelayDirection::DownstreamToUpstream,
741                    payload_len,
742                    &session.downstream_pending_packets,
743                    &session.downstream_pending_bytes,
744                    &session.upstream_pending_packets,
745                    &session.upstream_pending_bytes,
746                );
747                self.pending_events
748                    .push_back(RaknetRelayProxyEvent::Dropped {
749                        peer_id,
750                        direction: RelayDirection::DownstreamToUpstream,
751                        reason: RelayDropReason::NoSession,
752                    });
753
754                let _ = self
755                    .close_session(
756                        peer_id,
757                        RelaySessionCloseReason::CommandChannelClosed,
758                        false,
759                        true,
760                    )
761                    .await;
762            }
763        }
764    }
765
766    async fn handle_session_event(&mut self, event: RelaySessionEvent) {
767        match event {
768            RelaySessionEvent::ForwardToDownstream {
769                peer_id,
770                payload,
771                send_options,
772            } => {
773                let payload_len = payload.len();
774                if let Some(session) = self.sessions.get(&peer_id) {
775                    release_reserved_budget(
776                        RelayDirection::UpstreamToDownstream,
777                        payload_len,
778                        &session.downstream_pending_packets,
779                        &session.downstream_pending_bytes,
780                        &session.upstream_pending_packets,
781                        &session.upstream_pending_bytes,
782                    );
783                }
784
785                match self
786                    .server
787                    .send_with_options(peer_id, payload, send_options)
788                    .await
789                {
790                    Ok(()) => {
791                        self.pending_events
792                            .push_back(RaknetRelayProxyEvent::Forwarded {
793                                peer_id,
794                                direction: RelayDirection::UpstreamToDownstream,
795                                payload_len,
796                            });
797                    }
798                    Err(error) => {
799                        let _ = self
800                            .close_session(
801                                peer_id,
802                                RelaySessionCloseReason::DownstreamSendFailed {
803                                    message: error.to_string(),
804                                },
805                                true,
806                                true,
807                            )
808                            .await;
809                    }
810                }
811            }
812            RelaySessionEvent::Forwarded {
813                peer_id,
814                direction,
815                payload_len,
816            } => {
817                self.pending_events
818                    .push_back(RaknetRelayProxyEvent::Forwarded {
819                        peer_id,
820                        direction,
821                        payload_len,
822                    });
823            }
824            RelaySessionEvent::Dropped {
825                peer_id,
826                direction,
827                reason,
828            } => {
829                self.pending_events
830                    .push_back(RaknetRelayProxyEvent::Dropped {
831                        peer_id,
832                        direction,
833                        reason,
834                    });
835            }
836            RelaySessionEvent::DecodeError {
837                peer_id,
838                direction,
839                error,
840            } => {
841                self.pending_events
842                    .push_back(RaknetRelayProxyEvent::DecodeError {
843                        peer_id,
844                        direction,
845                        error,
846                    });
847            }
848            RelaySessionEvent::Terminated { peer_id, reason } => {
849                let disconnect_downstream = should_disconnect_downstream(&reason);
850                let _ = self
851                    .close_session(peer_id, reason, false, disconnect_downstream)
852                    .await;
853            }
854        }
855    }
856
857    fn spawn_session(&mut self, peer_id: PeerId, upstream_client: RaknetClient) {
858        let (command_tx, command_rx) = mpsc::channel(
859            self.runtime_config
860                .per_session_downstream_queue_capacity
861                .max(1),
862        );
863        let stop = Arc::new(AtomicBool::new(false));
864        let downstream_pending_packets = Arc::new(AtomicUsize::new(0));
865        let downstream_pending_bytes = Arc::new(AtomicUsize::new(0));
866        let upstream_pending_packets = Arc::new(AtomicUsize::new(0));
867        let upstream_pending_bytes = Arc::new(AtomicUsize::new(0));
868
869        let contract = Arc::clone(&self.contract);
870        let session_context = RelaySessionRuntimeContext {
871            event_tx: self.session_event_tx.clone(),
872            upstream_to_downstream_send: self.runtime_config.upstream_to_downstream_send,
873            runtime_config: self.runtime_config,
874            stop: Arc::clone(&stop),
875            downstream_pending_packets: Arc::clone(&downstream_pending_packets),
876            downstream_pending_bytes: Arc::clone(&downstream_pending_bytes),
877            upstream_pending_packets: Arc::clone(&upstream_pending_packets),
878            upstream_pending_bytes: Arc::clone(&upstream_pending_bytes),
879        };
880
881        let join = tokio::spawn(async move {
882            run_relay_session(
883                peer_id,
884                upstream_client,
885                contract,
886                command_rx,
887                session_context,
888            )
889            .await;
890        });
891
892        self.sessions.insert(
893            peer_id,
894            RelaySessionHandle {
895                command_tx,
896                stop,
897                join,
898                downstream_pending_packets,
899                downstream_pending_bytes,
900                upstream_pending_packets,
901                upstream_pending_bytes,
902            },
903        );
904    }
905
906    async fn stop_all_sessions(&mut self) {
907        let sessions = self
908            .sessions
909            .drain()
910            .map(|(_, session)| session)
911            .collect::<Vec<_>>();
912
913        for session in sessions {
914            self.finalize_session_handle(session, true).await;
915        }
916    }
917
918    async fn finalize_session_handle(&self, session: RelaySessionHandle, request_disconnect: bool) {
919        let RelaySessionHandle {
920            command_tx,
921            stop,
922            mut join,
923            downstream_pending_packets: _,
924            downstream_pending_bytes: _,
925            upstream_pending_packets: _,
926            upstream_pending_bytes: _,
927        } = session;
928
929        if request_disconnect {
930            stop.store(true, Ordering::Relaxed);
931            let _ = command_tx.try_send(RelaySessionCommand::Disconnect);
932        }
933
934        if timeout(Duration::from_millis(200), &mut join)
935            .await
936            .is_err()
937        {
938            join.abort();
939            let _ = join.await;
940        }
941    }
942
943    async fn close_session(
944        &mut self,
945        peer_id: PeerId,
946        reason: RelaySessionCloseReason,
947        request_upstream_disconnect: bool,
948        request_downstream_disconnect: bool,
949    ) -> bool {
950        let Some(session) = self.sessions.remove(&peer_id) else {
951            return false;
952        };
953
954        self.finalize_session_handle(session, request_upstream_disconnect)
955            .await;
956        if request_downstream_disconnect {
957            let _ = self.server.disconnect(peer_id).await;
958        }
959        self.pending_events
960            .push_back(RaknetRelayProxyEvent::SessionClosed { peer_id, reason });
961        true
962    }
963}
964
965fn try_reserve_budget(
966    direction: RelayDirection,
967    payload_len: usize,
968    runtime_config: RelayRuntimeConfig,
969    downstream_pending_packets: &AtomicUsize,
970    downstream_pending_bytes: &AtomicUsize,
971    upstream_pending_packets: &AtomicUsize,
972    upstream_pending_bytes: &AtomicUsize,
973) -> Result<(), RelayBudgetExceeded> {
974    let (dir_pending_packets, dir_pending_bytes, packet_limit, byte_limit) = match direction {
975        RelayDirection::DownstreamToUpstream => (
976            downstream_pending_packets.load(Ordering::Relaxed),
977            downstream_pending_bytes.load(Ordering::Relaxed),
978            runtime_config.downstream_max_pending_packets.max(1),
979            runtime_config.downstream_max_pending_bytes.max(1),
980        ),
981        RelayDirection::UpstreamToDownstream => (
982            upstream_pending_packets.load(Ordering::Relaxed),
983            upstream_pending_bytes.load(Ordering::Relaxed),
984            runtime_config.upstream_max_pending_packets.max(1),
985            runtime_config.upstream_max_pending_bytes.max(1),
986        ),
987    };
988
989    let total_pending_bytes = downstream_pending_bytes.load(Ordering::Relaxed)
990        + upstream_pending_bytes.load(Ordering::Relaxed);
991    let total_limit = runtime_config.session_total_max_pending_bytes.max(1);
992
993    if dir_pending_packets.saturating_add(1) > packet_limit
994        || dir_pending_bytes.saturating_add(payload_len) > byte_limit
995        || total_pending_bytes.saturating_add(payload_len) > total_limit
996    {
997        return Err(RelayBudgetExceeded {
998            pending_packets: dir_pending_packets,
999            pending_bytes: dir_pending_bytes,
1000            packet_limit,
1001            byte_limit,
1002            total_pending_bytes,
1003            total_limit,
1004        });
1005    }
1006
1007    match direction {
1008        RelayDirection::DownstreamToUpstream => {
1009            downstream_pending_packets.fetch_add(1, Ordering::Relaxed);
1010            downstream_pending_bytes.fetch_add(payload_len, Ordering::Relaxed);
1011        }
1012        RelayDirection::UpstreamToDownstream => {
1013            upstream_pending_packets.fetch_add(1, Ordering::Relaxed);
1014            upstream_pending_bytes.fetch_add(payload_len, Ordering::Relaxed);
1015        }
1016    }
1017
1018    Ok(())
1019}
1020
1021fn release_reserved_budget(
1022    direction: RelayDirection,
1023    payload_len: usize,
1024    downstream_pending_packets: &AtomicUsize,
1025    downstream_pending_bytes: &AtomicUsize,
1026    upstream_pending_packets: &AtomicUsize,
1027    upstream_pending_bytes: &AtomicUsize,
1028) {
1029    match direction {
1030        RelayDirection::DownstreamToUpstream => {
1031            atomic_saturating_sub(downstream_pending_packets, 1);
1032            atomic_saturating_sub(downstream_pending_bytes, payload_len);
1033        }
1034        RelayDirection::UpstreamToDownstream => {
1035            atomic_saturating_sub(upstream_pending_packets, 1);
1036            atomic_saturating_sub(upstream_pending_bytes, payload_len);
1037        }
1038    }
1039}
1040
1041fn atomic_saturating_sub(counter: &AtomicUsize, value: usize) {
1042    let _ = counter.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| {
1043        Some(current.saturating_sub(value))
1044    });
1045}
1046
1047fn should_disconnect_downstream(reason: &RelaySessionCloseReason) -> bool {
1048    matches!(
1049        reason,
1050        RelaySessionCloseReason::UpstreamDisconnected { .. }
1051            | RelaySessionCloseReason::UpstreamSendFailed { .. }
1052            | RelaySessionCloseReason::ContractViolation { .. }
1053            | RelaySessionCloseReason::PolicyDisconnect { .. }
1054            | RelaySessionCloseReason::BudgetExceeded { .. }
1055            | RelaySessionCloseReason::CommandChannelClosed
1056    )
1057}
1058
1059#[inline]
1060fn apply_channel_hint(default_channel: u8, ordering_channel: Option<u8>) -> u8 {
1061    ordering_channel.unwrap_or(default_channel)
1062}
1063
1064#[inline]
1065fn downstream_to_upstream_send_options(
1066    defaults: ClientSendOptions,
1067    reliability: Reliability,
1068    ordering_channel: Option<u8>,
1069) -> ClientSendOptions {
1070    ClientSendOptions {
1071        reliability,
1072        channel: apply_channel_hint(defaults.channel, ordering_channel),
1073        priority: defaults.priority,
1074    }
1075}
1076
1077#[inline]
1078fn upstream_to_downstream_send_options(
1079    defaults: SendOptions,
1080    reliability: Reliability,
1081    ordering_channel: Option<u8>,
1082) -> SendOptions {
1083    SendOptions {
1084        reliability,
1085        channel: apply_channel_hint(defaults.channel, ordering_channel),
1086        priority: defaults.priority,
1087    }
1088}
1089
1090fn invalid_config_io_error(error: ConfigValidationError) -> io::Error {
1091    io::Error::new(io::ErrorKind::InvalidInput, error.to_string())
1092}
1093
1094async fn emit_session_event(
1095    tx: &mpsc::Sender<RelaySessionEvent>,
1096    event: RelaySessionEvent,
1097) -> bool {
1098    tx.send(event).await.is_ok()
1099}
1100
1101async fn run_relay_session<P>(
1102    peer_id: PeerId,
1103    mut upstream: RaknetClient,
1104    contract: Arc<RelayContract<P>>,
1105    mut command_rx: mpsc::Receiver<RelaySessionCommand>,
1106    context: RelaySessionRuntimeContext,
1107) where
1108    P: RelayPolicy,
1109{
1110    let RelaySessionRuntimeContext {
1111        event_tx,
1112        upstream_to_downstream_send,
1113        runtime_config,
1114        stop,
1115        downstream_pending_packets,
1116        downstream_pending_bytes,
1117        upstream_pending_packets,
1118        upstream_pending_bytes,
1119    } = context;
1120
1121    loop {
1122        if stop.load(Ordering::Relaxed) {
1123            let _ = upstream.disconnect(None).await;
1124            emit_session_event(
1125                &event_tx,
1126                RelaySessionEvent::Terminated {
1127                    peer_id,
1128                    reason: RelaySessionCloseReason::ProxyShutdown,
1129                },
1130            )
1131            .await;
1132            break;
1133        }
1134
1135        tokio::select! {
1136            command = command_rx.recv() => {
1137                match command {
1138                    Some(RelaySessionCommand::ForwardDownstreamPayload {
1139                        payload,
1140                        send_options,
1141                    }) => {
1142                        release_reserved_budget(
1143                            RelayDirection::DownstreamToUpstream,
1144                            payload.len(),
1145                            &downstream_pending_packets,
1146                            &downstream_pending_bytes,
1147                            &upstream_pending_packets,
1148                            &upstream_pending_bytes,
1149                        );
1150
1151                        match contract.apply(RelayDirection::DownstreamToUpstream, payload) {
1152                            Ok(Some(forward_payload)) => {
1153                                let payload_len = forward_payload.len();
1154                                match upstream.send_with_options(forward_payload, send_options).await {
1155                                    Ok(()) => {
1156                                        emit_session_event(
1157                                            &event_tx,
1158                                            RelaySessionEvent::Forwarded {
1159                                                peer_id,
1160                                                direction: RelayDirection::DownstreamToUpstream,
1161                                                payload_len,
1162                                            },
1163                                        ).await;
1164                                    }
1165                                    Err(error) => {
1166                                        emit_session_event(
1167                                            &event_tx,
1168                                            RelaySessionEvent::Terminated {
1169                                                peer_id,
1170                                                reason: RelaySessionCloseReason::UpstreamSendFailed {
1171                                                    message: error.to_string(),
1172                                                },
1173                                            },
1174                                        ).await;
1175                                        break;
1176                                    }
1177                                }
1178                            }
1179                            Ok(None) => {
1180                                emit_session_event(
1181                                    &event_tx,
1182                                    RelaySessionEvent::Dropped {
1183                                        peer_id,
1184                                        direction: RelayDirection::DownstreamToUpstream,
1185                                        reason: RelayDropReason::PolicyDrop,
1186                                    },
1187                                ).await;
1188                            }
1189                            Err(RelayContractError::PolicyDisconnect { reason }) => {
1190                                emit_session_event(
1191                                    &event_tx,
1192                                    RelaySessionEvent::Terminated {
1193                                        peer_id,
1194                                        reason: RelaySessionCloseReason::PolicyDisconnect {
1195                                            direction: RelayDirection::DownstreamToUpstream,
1196                                            reason,
1197                                        },
1198                                    },
1199                                ).await;
1200                                break;
1201                            }
1202                            Err(error) => {
1203                                emit_session_event(
1204                                    &event_tx,
1205                                    RelaySessionEvent::Dropped {
1206                                        peer_id,
1207                                        direction: RelayDirection::DownstreamToUpstream,
1208                                        reason: RelayDropReason::ContractViolation(error.clone()),
1209                                    },
1210                                ).await;
1211
1212                                emit_session_event(
1213                                    &event_tx,
1214                                    RelaySessionEvent::Terminated {
1215                                        peer_id,
1216                                        reason: RelaySessionCloseReason::ContractViolation {
1217                                            direction: RelayDirection::DownstreamToUpstream,
1218                                            error,
1219                                        },
1220                                    },
1221                                ).await;
1222                                break;
1223                            }
1224                        }
1225                    }
1226                    Some(RelaySessionCommand::Disconnect) => {
1227                        let _ = upstream.disconnect(None).await;
1228                        emit_session_event(
1229                            &event_tx,
1230                            RelaySessionEvent::Terminated {
1231                                peer_id,
1232                                reason: RelaySessionCloseReason::ProxyShutdown,
1233                            },
1234                        ).await;
1235                        break;
1236                    }
1237                    None => {
1238                        emit_session_event(
1239                            &event_tx,
1240                            RelaySessionEvent::Terminated {
1241                                peer_id,
1242                                reason: RelaySessionCloseReason::CommandChannelClosed,
1243                            },
1244                        ).await;
1245                        break;
1246                    }
1247                }
1248            }
1249            upstream_event = upstream.next_event() => {
1250                match upstream_event {
1251                    Some(RaknetClientEvent::Packet {
1252                        payload,
1253                        reliability,
1254                        ordering_channel,
1255                        ..
1256                    }) => {
1257                        match contract.apply(RelayDirection::UpstreamToDownstream, payload) {
1258                            Ok(Some(forward_payload)) => {
1259                                let payload_len = forward_payload.len();
1260                                let send_options = upstream_to_downstream_send_options(
1261                                    upstream_to_downstream_send,
1262                                    reliability,
1263                                    ordering_channel,
1264                                );
1265                                match try_reserve_budget(
1266                                    RelayDirection::UpstreamToDownstream,
1267                                    payload_len,
1268                                    runtime_config,
1269                                    &downstream_pending_packets,
1270                                    &downstream_pending_bytes,
1271                                    &upstream_pending_packets,
1272                                    &upstream_pending_bytes,
1273                                ) {
1274                                    Ok(()) => {
1275                                        if !emit_session_event(
1276                                            &event_tx,
1277                                            RelaySessionEvent::ForwardToDownstream {
1278                                                peer_id,
1279                                                payload: forward_payload,
1280                                                send_options,
1281                                            },
1282                                        )
1283                                        .await
1284                                        {
1285                                            release_reserved_budget(
1286                                                RelayDirection::UpstreamToDownstream,
1287                                                payload_len,
1288                                                &downstream_pending_packets,
1289                                                &downstream_pending_bytes,
1290                                                &upstream_pending_packets,
1291                                                &upstream_pending_bytes,
1292                                            );
1293                                            emit_session_event(
1294                                                &event_tx,
1295                                                RelaySessionEvent::Terminated {
1296                                                    peer_id,
1297                                                    reason: RelaySessionCloseReason::CommandChannelClosed,
1298                                                },
1299                                            )
1300                                            .await;
1301                                            break;
1302                                        }
1303                                    }
1304                                    Err(details) => match runtime_config.budget_overflow_policy {
1305                                        RelayOverflowPolicy::DropNewest => {
1306                                            emit_session_event(
1307                                                &event_tx,
1308                                                RelaySessionEvent::Dropped {
1309                                                    peer_id,
1310                                                    direction: RelayDirection::UpstreamToDownstream,
1311                                                    reason: RelayDropReason::BudgetExceeded(details),
1312                                                },
1313                                            )
1314                                            .await;
1315                                        }
1316                                        RelayOverflowPolicy::DisconnectSession => {
1317                                            emit_session_event(
1318                                                &event_tx,
1319                                                RelaySessionEvent::Terminated {
1320                                                    peer_id,
1321                                                    reason: RelaySessionCloseReason::BudgetExceeded {
1322                                                        direction: RelayDirection::UpstreamToDownstream,
1323                                                        details,
1324                                                    },
1325                                                },
1326                                            )
1327                                            .await;
1328                                            break;
1329                                        }
1330                                    },
1331                                }
1332                            }
1333                            Ok(None) => {
1334                                emit_session_event(
1335                                    &event_tx,
1336                                    RelaySessionEvent::Dropped {
1337                                        peer_id,
1338                                        direction: RelayDirection::UpstreamToDownstream,
1339                                        reason: RelayDropReason::PolicyDrop,
1340                                    },
1341                                ).await;
1342                            }
1343                            Err(RelayContractError::PolicyDisconnect { reason }) => {
1344                                emit_session_event(
1345                                    &event_tx,
1346                                    RelaySessionEvent::Terminated {
1347                                        peer_id,
1348                                        reason: RelaySessionCloseReason::PolicyDisconnect {
1349                                            direction: RelayDirection::UpstreamToDownstream,
1350                                            reason,
1351                                        },
1352                                    },
1353                                ).await;
1354                                break;
1355                            }
1356                            Err(error) => {
1357                                emit_session_event(
1358                                    &event_tx,
1359                                    RelaySessionEvent::Dropped {
1360                                        peer_id,
1361                                        direction: RelayDirection::UpstreamToDownstream,
1362                                        reason: RelayDropReason::ContractViolation(error.clone()),
1363                                    },
1364                                ).await;
1365
1366                                emit_session_event(
1367                                    &event_tx,
1368                                    RelaySessionEvent::Terminated {
1369                                        peer_id,
1370                                        reason: RelaySessionCloseReason::ContractViolation {
1371                                            direction: RelayDirection::UpstreamToDownstream,
1372                                            error,
1373                                        },
1374                                    },
1375                                ).await;
1376                                break;
1377                            }
1378                        }
1379                    }
1380                    Some(RaknetClientEvent::Disconnected { reason }) => {
1381                        emit_session_event(
1382                            &event_tx,
1383                            RelaySessionEvent::Terminated {
1384                                peer_id,
1385                                reason: RelaySessionCloseReason::UpstreamDisconnected { reason },
1386                            },
1387                        ).await;
1388                        break;
1389                    }
1390                    Some(RaknetClientEvent::DecodeError { error }) => {
1391                        emit_session_event(
1392                            &event_tx,
1393                            RelaySessionEvent::DecodeError {
1394                                peer_id,
1395                                direction: RelayDirection::UpstreamToDownstream,
1396                                error,
1397                            },
1398                        ).await;
1399                    }
1400                    Some(RaknetClientEvent::Connected { .. })
1401                    | Some(RaknetClientEvent::ReceiptAcked { .. }) => {}
1402                    None => {
1403                        emit_session_event(
1404                            &event_tx,
1405                            RelaySessionEvent::Terminated {
1406                                peer_id,
1407                                reason: RelaySessionCloseReason::UpstreamDisconnected {
1408                                    reason: ClientDisconnectReason::TransportError {
1409                                        message: "upstream event stream ended".to_string(),
1410                                    },
1411                                },
1412                            },
1413                        ).await;
1414                        break;
1415                    }
1416                }
1417            }
1418        }
1419    }
1420}
1421
1422#[cfg(test)]
1423mod tests {
1424    use bytes::Bytes;
1425
1426    use crate::client::ClientSendOptions;
1427    use crate::protocol::reliability::Reliability;
1428    use crate::server::SendOptions;
1429    use crate::session::RakPriority;
1430
1431    use super::{
1432        PassthroughRelayPolicy, RelayContract, RelayContractConfig, RelayContractError,
1433        RelayDecision, RelayDirection, RelayPolicy, RelayRuntimeConfig,
1434    };
1435
1436    #[test]
1437    fn contract_rejects_empty_when_disabled() {
1438        let contract = RelayContract::new(
1439            RelayContractConfig {
1440                max_payload_bytes: 64,
1441                allow_empty_payload: false,
1442            },
1443            PassthroughRelayPolicy,
1444        );
1445
1446        let err = contract
1447            .apply(RelayDirection::DownstreamToUpstream, Bytes::new())
1448            .expect_err("empty payload must be rejected");
1449        assert_eq!(err, RelayContractError::EmptyPayload);
1450    }
1451
1452    #[test]
1453    fn contract_rejects_oversized_payload() {
1454        let contract = RelayContract::new(
1455            RelayContractConfig {
1456                max_payload_bytes: 2,
1457                allow_empty_payload: true,
1458            },
1459            PassthroughRelayPolicy,
1460        );
1461
1462        let err = contract
1463            .apply(
1464                RelayDirection::DownstreamToUpstream,
1465                Bytes::from_static(b"abc"),
1466            )
1467            .expect_err("oversized payload must be rejected");
1468
1469        assert_eq!(
1470            err,
1471            RelayContractError::PayloadTooLarge { actual: 3, max: 2 }
1472        );
1473    }
1474
1475    struct DropPolicy;
1476
1477    impl RelayPolicy for DropPolicy {
1478        fn decide(&self, _direction: RelayDirection, _payload: &Bytes) -> RelayDecision {
1479            RelayDecision::Drop
1480        }
1481    }
1482
1483    struct DisconnectPolicy;
1484
1485    impl RelayPolicy for DisconnectPolicy {
1486        fn decide(&self, _direction: RelayDirection, _payload: &Bytes) -> RelayDecision {
1487            RelayDecision::Disconnect {
1488                reason: "policy_disconnect",
1489            }
1490        }
1491    }
1492
1493    #[test]
1494    fn policy_can_drop_payload() {
1495        let contract = RelayContract::new(RelayContractConfig::default(), DropPolicy);
1496        let result = contract
1497            .apply(
1498                RelayDirection::UpstreamToDownstream,
1499                Bytes::from_static(b"ok"),
1500            )
1501            .expect("drop should not error");
1502
1503        assert!(result.is_none());
1504    }
1505
1506    #[test]
1507    fn policy_disconnect_is_reported() {
1508        let contract = RelayContract::new(RelayContractConfig::default(), DisconnectPolicy);
1509        let err = contract
1510            .apply(
1511                RelayDirection::UpstreamToDownstream,
1512                Bytes::from_static(b"ok"),
1513            )
1514            .expect_err("disconnect policy must error");
1515
1516        assert_eq!(
1517            err,
1518            RelayContractError::PolicyDisconnect {
1519                reason: "policy_disconnect"
1520            }
1521        );
1522    }
1523
1524    #[test]
1525    fn runtime_config_defaults_are_non_zero() {
1526        let cfg = RelayRuntimeConfig::default();
1527        assert!(cfg.per_session_downstream_queue_capacity > 0);
1528        assert!(cfg.session_event_queue_capacity > 0);
1529        cfg.validate()
1530            .expect("default relay runtime config must be valid");
1531    }
1532
1533    #[test]
1534    fn contract_config_validate_rejects_zero_payload_limit() {
1535        let cfg = RelayContractConfig {
1536            max_payload_bytes: 0,
1537            allow_empty_payload: true,
1538        };
1539        let err = cfg
1540            .validate()
1541            .expect_err("max_payload_bytes=0 must be rejected");
1542        assert_eq!(err.config, "RelayContractConfig");
1543        assert_eq!(err.field, "max_payload_bytes");
1544    }
1545
1546    #[test]
1547    fn runtime_config_validate_rejects_total_budget_below_directional_max() {
1548        let cfg = RelayRuntimeConfig {
1549            downstream_max_pending_bytes: 512,
1550            upstream_max_pending_bytes: 1024,
1551            session_total_max_pending_bytes: 900,
1552            ..RelayRuntimeConfig::default()
1553        };
1554        let err = cfg.validate().expect_err("invalid total budget must fail");
1555        assert_eq!(err.config, "RelayRuntimeConfig");
1556        assert_eq!(err.field, "session_total_max_pending_bytes");
1557    }
1558
1559    #[test]
1560    fn downstream_send_options_preserve_reliability_and_channel_hint() {
1561        let defaults = ClientSendOptions {
1562            reliability: Reliability::ReliableOrdered,
1563            channel: 0,
1564            priority: RakPriority::Immediate,
1565        };
1566        let options =
1567            super::downstream_to_upstream_send_options(defaults, Reliability::Reliable, Some(9));
1568        assert_eq!(options.reliability, Reliability::Reliable);
1569        assert_eq!(options.channel, 9);
1570        assert_eq!(options.priority, RakPriority::Immediate);
1571    }
1572
1573    #[test]
1574    fn upstream_send_options_fall_back_to_default_channel_when_missing_hint() {
1575        let defaults = SendOptions {
1576            reliability: Reliability::ReliableOrdered,
1577            channel: 3,
1578            priority: RakPriority::High,
1579        };
1580        let options =
1581            super::upstream_to_downstream_send_options(defaults, Reliability::Unreliable, None);
1582        assert_eq!(options.reliability, Reliability::Unreliable);
1583        assert_eq!(options.channel, 3);
1584        assert_eq!(options.priority, RakPriority::High);
1585    }
1586}