naia-shared 0.25.0

Common functionality shared between naia-server & naia-client crates
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
use std::{
    collections::HashMap,
    net::SocketAddr,
    sync::Arc,
};

use parking_lot::Mutex;

use log::debug;
use std::sync::mpsc;

use crate::transport::local::shared::{create_auth_channels, create_data_channels};
use crate::{link_condition_logic, Instant, LinkConditionerConfig, TimeQueue};

/// Per-client connection state stored in the hub
/// Only stores server-side channels (what the server needs to receive/send)
struct ClientConnection {
    // Auth channels (client -> server) - server receives
    auth_req_rx: Arc<Mutex<mpsc::Receiver<Vec<u8>>>>,
    // Auth channels (server -> client) - server sends
    auth_resp_tx: mpsc::Sender<Vec<u8>>,

    // Data channels (client -> server) - server receives
    server_data_rx: Arc<Mutex<mpsc::Receiver<Vec<u8>>>>,
    // Data channels (server -> client) - server sends
    server_data_tx: mpsc::Sender<Vec<u8>>,
    // Data channels (client -> server) - server "receives" via this sender (injection)
    #[allow(dead_code)]
    client_data_tx_injection: mpsc::Sender<Vec<u8>>,

    // Link conditioner configuration (bidirectional)
    // None means no conditioning (perfect connection)
    client_to_server_conditioner: Option<LinkConditionerConfig>,
    server_to_client_conditioner: Option<LinkConditionerConfig>,

    // Time queues for delayed packet delivery
    client_to_server_queue: Arc<Mutex<TimeQueue<Vec<u8>>>>,
    server_to_client_queue: Arc<Mutex<TimeQueue<Vec<u8>>>>,
}

type PacketRecorder = Arc<Mutex<Option<Vec<(bool, Vec<u8>)>>>>;
type ClientChannels = (
    SocketAddr,
    mpsc::Sender<Vec<u8>>,
    mpsc::Receiver<Vec<u8>>,
    mpsc::Sender<Vec<u8>>,
    mpsc::Receiver<Vec<u8>>,
);

/// Shared transport hub managing multiple client connections
#[derive(Clone)]
pub struct LocalTransportHub {
    server_addr: SocketAddr,
    connections: Arc<Mutex<HashMap<SocketAddr, ClientConnection>>>,
    next_client_id: Arc<Mutex<u16>>,
    traffic_paused: Arc<Mutex<bool>>,
    /// Wire-level packet recorder. `None` = disabled; `Some(buf)` = recording.
    /// bool field in each tuple: `true` = server-to-client, `false` = client-to-server.
    packet_recorder: PacketRecorder,
}

impl LocalTransportHub {
    /// Creates a new `LocalTransportHub` bound to the given server address.
    pub fn new(server_addr: SocketAddr) -> Self {
        Self {
            // shared,
            server_addr,
            connections: Arc::new(Mutex::new(HashMap::new())),
            next_client_id: Arc::new(Mutex::new(1)),
            traffic_paused: Arc::new(Mutex::new(false)),
            packet_recorder: Arc::new(Mutex::new(None)),
        }
    }

    /// Enable wire-level packet recording. Each packet that passes through
    /// `try_recv_data` (client→server) or `send_data` (server→client) is
    /// appended to an internal buffer.
    pub fn enable_packet_recording(&self) {
        *self.packet_recorder.lock() = Some(Vec::new());
    }

    /// Consume and return all recorded packets since the last call.
    /// Returns `(server_to_client: bool, bytes)` pairs.
    /// Recording remains active; the buffer is just cleared.
    pub fn take_recorded_packets(&self) -> Vec<(bool, Vec<u8>)> {
        let mut guard = self.packet_recorder.lock();
        match &mut *guard {
            Some(buf) => std::mem::take(buf),
            None => Vec::new(),
        }
    }

    /// Register a new client connection and return its address and channel handles
    pub fn register_client(
        &self,
    ) -> ClientChannels {
        // Generate unique client address
        let client_id = {
            let mut id = self.next_client_id.lock();
            let current = *id;
            *id = current.wrapping_add(1);
            current
        };

        // Create fake client address based on ID
        let client_addr: SocketAddr = format!("127.0.0.1:{}", 12345 + client_id)
            .parse()
            .expect("invalid client addr");

        // Create 1:1 auth channels
        let (auth_req_tx, auth_req_rx, auth_resp_tx, auth_resp_rx) = create_auth_channels();

        // Create 1:1 data channels
        let (client_data_tx, server_data_rx, server_data_tx, client_data_rx) =
            create_data_channels();

        // Store connection (only server-side channels, client-side channels are returned)
        let connection = ClientConnection {
            auth_req_rx: Arc::new(Mutex::new(auth_req_rx)),
            auth_resp_tx: auth_resp_tx.clone(),
            server_data_rx: Arc::new(Mutex::new(server_data_rx)),
            server_data_tx: server_data_tx.clone(),
            client_data_tx_injection: client_data_tx.clone(),
            client_to_server_conditioner: None,
            server_to_client_conditioner: None,
            client_to_server_queue: Arc::new(Mutex::new(TimeQueue::new())),
            server_to_client_queue: Arc::new(Mutex::new(TimeQueue::new())),
        };

        self.connections
            .lock()
            .insert(client_addr, connection);

        (
            client_addr,
            auth_req_tx,
            auth_resp_rx,
            client_data_tx,
            client_data_rx,
        )
    }
    //
    // /// Get the shared queues (for identity token, etc.)
    // pub fn shared(&self) -> &LocalTransportQueues {
    //     &self.shared
    // }

    /// Get the server address
    pub fn server_addr(&self) -> SocketAddr {
        self.server_addr
    }

    /// Inject a packet from a client (used for testing/fuzzing)
    pub fn inject_client_packet(&self, client_addr: &SocketAddr, data: Vec<u8>) -> bool {
        let connections = self.connections.lock();
        if let Some(conn) = connections.get(client_addr) {
            let _ = conn.client_data_tx_injection.send(data);
            return true;
        }
        false
    }

    /// Inject a packet from the server to a client (used for testing/fuzzing)
    /// This bypasses normal server sending and directly injects raw data to the client.
    pub fn inject_server_packet(&self, client_addr: &SocketAddr, data: Vec<u8>) -> bool {
        let connections = self.connections.lock();
        if let Some(conn) = connections.get(client_addr) {
            let _ = conn.server_data_tx.send(data);
            return true;
        }
        false
    }

    /// Try to receive an auth request from any client (returns (client_addr, bytes))
    /// Returns None if traffic is paused (packets are dropped)
    pub fn try_recv_auth_request(&self) -> Option<(SocketAddr, Vec<u8>)> {
        let paused = *self.traffic_paused.lock(); // Single check
        let connections = self.connections.lock();

        for (addr, conn) in connections.iter() {
            let rx_guard = conn.auth_req_rx.lock();
            if paused {
                // Drain ALL packets when paused, not just one
                while rx_guard.try_recv().is_ok() {}
            } else if let Ok(bytes) = rx_guard.try_recv() {
                return Some((*addr, bytes));
            }
        }
        None
    }

    /// Try to receive a data packet from any client (returns (client_addr, bytes))
    /// Returns None if traffic is paused (packets are dropped)
    /// Applies link conditioning if configured
    /// Also processes time queues to deliver ready packets to clients
    pub fn try_recv_data(&self) -> Option<(SocketAddr, Vec<u8>)> {
        let paused = *self.traffic_paused.lock(); // Single check
        let now = Instant::now();
        let mut connections = self.connections.lock();

        // First, deliver any ready packets from server-to-client queues for all clients
        self.deliver_all_queued_packets_to_clients(&mut connections, &now);

        // Then check time queues for client-to-server delayed packets that are now ready
        for (addr, conn) in connections.iter_mut() {
            let mut queue_guard = conn.client_to_server_queue.lock();
            if queue_guard.has_item(&now) {
                if let Some(bytes) = queue_guard.pop_item(&now) {
                    return Some((*addr, bytes));
                }
            }
        }

        // Finally check direct channels and apply link conditioning
        for (addr, conn) in connections.iter_mut() {
            let rx_guard = conn.server_data_rx.lock();
            if paused {
                // Drain ALL packets when paused, not just one
                while rx_guard.try_recv().is_ok() {}
            } else if let Ok(bytes) = rx_guard.try_recv() {
                // Apply link conditioning if configured
                if let Some(ref config) = conn.client_to_server_conditioner {
                    let mut queue_guard = conn.client_to_server_queue.lock();
                    link_condition_logic::process_packet(config, &mut queue_guard, bytes);
                    // Packet is now in queue, will be delivered later
                    continue;
                } else {
                    // No conditioning, deliver immediately
                    if let Some(ref mut buf) = *self.packet_recorder.lock() {
                        buf.push((false, bytes.clone()));
                    }
                    return Some((*addr, bytes));
                }
            }
        }
        None
    }

    /// Send auth response to a specific client
    /// Returns Err(()) if traffic is paused (packets are dropped)
    #[allow(clippy::result_unit_err)]
    pub fn send_auth_response(&self, client_addr: &SocketAddr, bytes: Vec<u8>) -> Result<(), ()> {
        let paused = *self.traffic_paused.lock(); // Single check
        if paused {
            return Err(()); // Drop packet
        }

        let connections = self.connections.lock();
        if let Some(conn) = connections.get(client_addr) {
            conn.auth_resp_tx.send(bytes).map_err(|_| ())
        } else {
            Err(())
        }
    }

    /// Send data packet to a specific client
    /// Returns Err(()) if traffic is paused (packets are dropped)
    /// Applies link conditioning if configured
    /// Also processes time queues to deliver any ready packets
    #[allow(clippy::result_unit_err)]
    pub fn send_data(&self, client_addr: &SocketAddr, bytes: Vec<u8>) -> Result<(), ()> {
        let paused = *self.traffic_paused.lock(); // Single check
        if paused {
            debug!("[HUB] send_data: Packet dropped (traffic paused)");
            return Err(()); // Drop packet
        }

        let now = Instant::now();
        let mut connections = self.connections.lock();

        // First, deliver any ready packets from server-to-client queues for all clients
        self.deliver_all_queued_packets_to_clients(&mut connections, &now);

        if let Some(conn) = connections.get_mut(client_addr) {
            // Apply link conditioning if configured
            if let Some(ref config) = conn.server_to_client_conditioner {
                let packet_len = bytes.len();
                let mut queue_guard = conn.server_to_client_queue.lock();
                let queue_len_before = queue_guard.len();
                link_condition_logic::process_packet(config, &mut queue_guard, bytes);
                let queue_len_after = queue_guard.len();
                // Packet queued with link conditioner - use debug logging instead
                debug!(
                    "[HUB] send_data: Queued packet for client {} ({} bytes, queue: {} -> {})",
                    client_addr, packet_len, queue_len_before, queue_len_after
                );
                // Packet is now in queue, will be delivered later
                Ok(())
            } else {
                // No conditioning, send immediately - use debug logging instead
                debug!(
                    "[HUB] send_data: Sending packet immediately to {} ({} bytes, no conditioner)",
                    client_addr,
                    bytes.len()
                );
                if let Some(ref mut buf) = *self.packet_recorder.lock() {
                    buf.push((true, bytes.clone()));
                }
                conn.server_data_tx.send(bytes).map_err(|_| ())
            }
        } else {
            debug!(
                "[HUB] send_data: Client {} not found in connections",
                client_addr
            );
            Err(())
        }
    }

    /// Deliver any ready packets from server-to-client time queues to client channels
    /// Called periodically to ensure delayed packets are delivered
    /// Returns the number of packets delivered
    fn deliver_all_queued_packets_to_clients(
        &self,
        connections: &mut HashMap<SocketAddr, ClientConnection>,
        now: &Instant,
    ) -> usize {
        let mut total_delivered = 0;
        for (addr, conn) in connections.iter_mut() {
            let mut queue_guard = conn.server_to_client_queue.lock();
            let queue_len_before = queue_guard.len();
            let mut delivered_this_client = 0;
            while queue_guard.has_item(now) {
                if let Some(bytes) = queue_guard.pop_item(now) {
                    match conn.server_data_tx.send(bytes) {
                        Ok(()) => {
                            delivered_this_client += 1;
                            total_delivered += 1;
                        }
                        Err(_) => {
                            debug!("[HUB] deliver_all_queued: Failed to send packet to client {} (channel closed?)", addr);
                        }
                    }
                }
            }
            if delivered_this_client > 0 {
                debug!(
                    "[HUB] deliver_all_queued: Delivered {} packets to client {} (queue: {} -> {})",
                    delivered_this_client,
                    addr,
                    queue_len_before,
                    queue_guard.len()
                );
            }
        }
        total_delivered
    }

    /// Process time queues for all connections to deliver any ready packets
    /// This should be called periodically (e.g., during each tick) to ensure
    /// delayed packets are delivered even when there's no active send/recv
    pub fn process_time_queues(&self) {
        let now = Instant::now();
        let mut connections = self.connections.lock();

        // Deliver ready packets from server-to-client queues
        let delivered = self.deliver_all_queued_packets_to_clients(&mut connections, &now);
        if delivered > 0 {
            debug!(
                "[HUB] process_time_queues: Delivered {} total packets",
                delivered
            );
        }

        // Note: client-to-server queues are processed in try_recv_data(),
        // which is called during server receive operations
    }

    /// Configure link conditioner for a specific client connection
    /// `client_to_server` applies to packets from client to server
    /// `server_to_client` applies to packets from server to client
    /// Pass `None` to disable conditioning for that direction
    pub fn configure_link_conditioner(
        &self,
        client_addr: &SocketAddr,
        client_to_server: Option<LinkConditionerConfig>,
        server_to_client: Option<LinkConditionerConfig>,
    ) -> bool {
        let mut connections = self.connections.lock();
        if let Some(conn) = connections.get_mut(client_addr) {
            conn.client_to_server_conditioner = client_to_server;
            conn.server_to_client_conditioner = server_to_client;
            true
        } else {
            false
        }
    }

    /// Pause all traffic (drop all packets)
    pub fn pause_traffic(&self) {
        *self.traffic_paused.lock() = true;
    }

    /// Resume normal traffic delivery
    pub fn resume_traffic(&self) {
        *self.traffic_paused.lock() = false;
    }

    /// Check if traffic is paused
    pub fn is_traffic_paused(&self) -> bool {
        *self.traffic_paused.lock()
    }
}