steamworks/networking_sockets.rs
1use crate::{networking_sockets_callback, networking_types::NetConnectionRealTimeLaneStatus};
2use crate::{
3 networking_types::{
4 ListenSocketEvent, MessageNumber, NetConnectionEnd, NetConnectionInfo,
5 NetConnectionRealTimeInfo, NetworkingAvailability, NetworkingAvailabilityError,
6 NetworkingConfigEntry, NetworkingIdentity, NetworkingMessage, SendFlags, SteamIpAddr,
7 },
8 SteamError,
9};
10use crate::{CallbackHandle, Inner, SResult};
11#[cfg(test)]
12use serial_test::serial;
13use std::convert::TryInto;
14use std::ffi::CString;
15use std::net::SocketAddr;
16use std::sync::mpsc::Receiver;
17use std::sync::Arc;
18use sys::SteamNetworkingMessage_t;
19
20use steamworks_sys as sys;
21
22/// Access to the steam networking sockets interface
23pub struct NetworkingSockets<Manager> {
24 pub(crate) sockets: *mut sys::ISteamNetworkingSockets,
25 pub(crate) inner: Arc<Inner<Manager>>,
26}
27
28unsafe impl<T> Send for NetworkingSockets<T> {}
29unsafe impl<T> Sync for NetworkingSockets<T> {}
30
31impl<Manager: 'static> NetworkingSockets<Manager> {
32 /// Creates a "server" socket that listens for clients to connect to by calling ConnectByIPAddress, over ordinary UDP (IPv4 or IPv6)
33 ///
34 /// You must select a specific local port to listen on and set it as the port field of the local address.
35 ///
36 /// Usually you will set the IP portion of the address to zero, (SteamNetworkingIPAddr::Clear()).
37 /// This means that you will not bind to any particular local interface (i.e. the same as INADDR_ANY in plain socket code).
38 /// Furthermore, if possible the socket will be bound in "dual stack" mode, which means that it can accept both IPv4 and IPv6 client connections.
39 /// If you really do wish to bind a particular interface, then set the local address to the appropriate IPv4 or IPv6 IP.
40 ///
41 /// If you need to set any initial config options, pass them here.
42 /// See SteamNetworkingConfigValue_t for more about why this is preferable to setting the options "immediately" after creation.
43 ///
44 /// When a client attempts to connect, a SteamNetConnectionStatusChangedCallback_t will be posted.
45 /// The connection will be in the k_ESteamNetworkingConnectionState_Connecting state.
46 pub fn create_listen_socket_ip(
47 &self,
48 local_address: SocketAddr,
49 options: impl IntoIterator<Item = NetworkingConfigEntry>,
50 ) -> Result<ListenSocket<Manager>, InvalidHandle> {
51 let local_address = SteamIpAddr::from(local_address);
52 let options: Vec<_> = options.into_iter().map(|x| x.into()).collect();
53 let handle = unsafe {
54 sys::SteamAPI_ISteamNetworkingSockets_CreateListenSocketIP(
55 self.sockets,
56 local_address.as_ptr(),
57 options.len() as _,
58 options.as_ptr(),
59 )
60 };
61 if handle == sys::k_HSteamListenSocket_Invalid {
62 Err(InvalidHandle)
63 } else {
64 Ok(ListenSocket::new(handle, self.sockets, self.inner.clone()))
65 }
66 }
67 /// Creates a connection and begins talking to a "server" over UDP at the
68 /// given IPv4 or IPv6 address. The remote host must be listening with a
69 /// matching call to CreateListenSocketIP on the specified port.
70 ///
71 /// A SteamNetConnectionStatusChangedCallback_t callback will be triggered when we start
72 /// connecting, and then another one on either timeout or successful connection.
73 ///
74 /// If the server does not have any identity configured, then their network address
75 /// will be the only identity in use. Or, the network host may provide a platform-specific
76 /// identity with or without a valid certificate to authenticate that identity. (These
77 /// details will be contained in the SteamNetConnectionStatusChangedCallback_t.) It's
78 /// up to your application to decide whether to allow the connection.
79 ///
80 /// By default, all connections will get basic encryption sufficient to prevent
81 /// casual eavesdropping. But note that without certificates (or a shared secret
82 /// distributed through some other out-of-band mechanism), you don't have any
83 /// way of knowing who is actually on the other end, and thus are vulnerable to
84 /// man-in-the-middle attacks.
85 ///
86 /// If you need to set any initial config options, pass them here. See
87 /// SteamNetworkingConfigValue_t for more about why this is preferable to
88 /// setting the options "immediately" after creation.
89 pub fn connect_by_ip_address(
90 &self,
91 address: SocketAddr,
92 options: impl IntoIterator<Item = NetworkingConfigEntry>,
93 ) -> Result<NetConnection<Manager>, InvalidHandle> {
94 let handle = unsafe {
95 let address = SteamIpAddr::from(address);
96 let options: Vec<_> = options.into_iter().map(|x| x.into()).collect();
97 sys::SteamAPI_ISteamNetworkingSockets_ConnectByIPAddress(
98 self.sockets,
99 address.as_ptr(),
100 options.len() as _,
101 options.as_ptr(),
102 )
103 };
104 if handle == sys::k_HSteamNetConnection_Invalid {
105 Err(InvalidHandle)
106 } else {
107 Ok(NetConnection::new_independent(
108 handle,
109 self.sockets,
110 self.inner.clone(),
111 ))
112 }
113 }
114
115 /// Like CreateListenSocketIP, but clients will connect using ConnectP2P.
116 ///
117 /// nLocalVirtualPort specifies how clients can connect to this socket using
118 /// ConnectP2P. It's very common for applications to only have one listening socket;
119 /// in that case, use zero. If you need to open multiple listen sockets and have clients
120 /// be able to connect to one or the other, then nLocalVirtualPort should be a small
121 /// integer (<1000) unique to each listen socket you create.
122 ///
123 /// If you use this, you probably want to call ISteamNetworkingUtils::InitRelayNetworkAccess()
124 /// when your app initializes.
125 ///
126 /// If you are listening on a dedicated servers in known data center,
127 /// then you can listen using this function instead of CreateHostedDedicatedServerListenSocket,
128 /// to allow clients to connect without a ticket. Any user that owns
129 /// the app and is signed into Steam will be able to attempt to connect to
130 /// your server. Also, a connection attempt may require the client to
131 /// be connected to Steam, which is one more moving part that may fail. When
132 /// tickets are used, then once a ticket is obtained, a client can connect to
133 /// your server even if they got disconnected from Steam or Steam is offline.
134 ///
135 /// If you need to set any initial config options, pass them here. See
136 /// SteamNetworkingConfigValue_t for more about why this is preferable to
137 /// setting the options "immediately" after creation.
138 pub fn create_listen_socket_p2p(
139 &self,
140 local_virtual_port: i32,
141 options: impl IntoIterator<Item = NetworkingConfigEntry>,
142 ) -> Result<ListenSocket<Manager>, InvalidHandle> {
143 let options: Vec<_> = options.into_iter().map(|x| x.into()).collect();
144 let handle = unsafe {
145 sys::SteamAPI_ISteamNetworkingSockets_CreateListenSocketP2P(
146 self.sockets,
147 local_virtual_port as _,
148 options.len() as _,
149 options.as_ptr(),
150 )
151 };
152 if handle == sys::k_HSteamListenSocket_Invalid {
153 Err(InvalidHandle)
154 } else {
155 Ok(ListenSocket::new(handle, self.sockets, self.inner.clone()))
156 }
157 }
158
159 /// Begin connecting to a peer that is identified using a platform-specific identifier.
160 /// This uses the default rendezvous service, which depends on the platform and library
161 /// configuration. (E.g. on Steam, it goes through the steam backend.)
162 ///
163 /// If you need to set any initial config options, pass them here. See
164 /// SteamNetworkingConfigValue_t for more about why this is preferable to
165 /// setting the options "immediately" after creation.
166 ///
167 /// To use your own signaling service, see:
168 /// - ConnectP2PCustomSignaling
169 /// - k_ESteamNetworkingConfig_Callback_CreateConnectionSignaling
170 pub fn connect_p2p(
171 &self,
172 identity_remote: NetworkingIdentity,
173 remote_virtual_port: i32,
174 options: impl IntoIterator<Item = NetworkingConfigEntry>,
175 ) -> Result<NetConnection<Manager>, InvalidHandle> {
176 let handle = unsafe {
177 let options: Vec<_> = options.into_iter().map(|x| x.into()).collect();
178 sys::SteamAPI_ISteamNetworkingSockets_ConnectP2P(
179 self.sockets,
180 identity_remote.as_ptr(),
181 remote_virtual_port as _,
182 options.len() as _,
183 options.as_ptr(),
184 )
185 };
186 if handle == sys::k_HSteamNetConnection_Invalid {
187 Err(InvalidHandle)
188 } else {
189 Ok(NetConnection::new_independent(
190 handle,
191 self.sockets,
192 self.inner.clone(),
193 ))
194 }
195 }
196
197 /// Create a listen socket on the specified virtual port. The physical UDP port to use
198 /// will be determined by the SDR_LISTEN_PORT environment variable. If a UDP port is not
199 /// configured, this call will fail.
200 ///
201 /// This call MUST be made through the SteamGameServerNetworkingSockets() interface.
202 ///
203 /// This function should be used when you are using the ticket generator library
204 /// to issue your own tickets. Clients connecting to the server on this virtual
205 /// port will need a ticket, and they must connect using ConnectToHostedDedicatedServer.
206 ///
207 /// If you need to set any initial config options, pass them here. See
208 /// SteamNetworkingConfigValue_t for more about why this is preferable to
209 /// setting the options "immediately" after creation.
210 pub fn create_hosted_dedicated_server_listen_socket(
211 &self,
212 local_virtual_port: u32,
213 options: impl IntoIterator<Item = NetworkingConfigEntry>,
214 ) -> Result<ListenSocket<Manager>, InvalidHandle> {
215 let options: Vec<_> = options.into_iter().map(|x| x.into()).collect();
216 let handle = unsafe {
217 sys::SteamAPI_ISteamNetworkingSockets_CreateHostedDedicatedServerListenSocket(
218 self.sockets,
219 local_virtual_port as _,
220 options.len() as _,
221 options.as_ptr(),
222 )
223 };
224 if handle == sys::k_HSteamListenSocket_Invalid {
225 Err(InvalidHandle)
226 } else {
227 Ok(ListenSocket::new(handle, self.sockets, self.inner.clone()))
228 }
229 }
230
231 /// Indicate our desire to be ready participate in authenticated communications.
232 /// If we are currently not ready, then steps will be taken to obtain the necessary
233 /// certificates. (This includes a certificate for us, as well as any CA certificates
234 /// needed to authenticate peers.)
235 ///
236 /// You can call this at program init time if you know that you are going to
237 /// be making authenticated connections, so that we will be ready immediately when
238 /// those connections are attempted. (Note that essentially all connections require
239 /// authentication, with the exception of ordinary UDP connections with authentication
240 /// disabled using k_ESteamNetworkingConfig_IP_AllowWithoutAuth.) If you don't call
241 /// this function, we will wait until a feature is utilized that that necessitates
242 /// these resources.
243 ///
244 /// You can also call this function to force a retry, if failure has occurred.
245 /// Once we make an attempt and fail, we will not automatically retry.
246 /// In this respect, the behavior of the system after trying and failing is the same
247 /// as before the first attempt: attempting authenticated communication or calling
248 /// this function will call the system to attempt to acquire the necessary resources.
249 ///
250 /// You can use GetAuthenticationStatus or listen for SteamNetAuthenticationStatus_t
251 /// to monitor the status.
252 ///
253 /// Returns the current value that would be returned from GetAuthenticationStatus.
254 pub fn init_authentication(
255 &self,
256 ) -> Result<NetworkingAvailability, NetworkingAvailabilityError> {
257 unsafe { sys::SteamAPI_ISteamNetworkingSockets_InitAuthentication(self.sockets).try_into() }
258 }
259
260 /// Create a new poll group.
261 ///
262 /// You should destroy the poll group when you are done using DestroyPollGroup
263 pub fn create_poll_group(&self) -> NetPollGroup<Manager> {
264 let poll_group =
265 unsafe { sys::SteamAPI_ISteamNetworkingSockets_CreatePollGroup(self.sockets) };
266 NetPollGroup {
267 handle: poll_group,
268 sockets: self.sockets,
269 inner: self.inner.clone(),
270 message_buffer: Vec::new(),
271 }
272 }
273
274 pub fn get_authentication_status(
275 &self,
276 ) -> Result<NetworkingAvailability, NetworkingAvailabilityError> {
277 let mut details: sys::SteamNetAuthenticationStatus_t = unsafe { std::mem::zeroed() };
278 let auth = unsafe {
279 sys::SteamAPI_ISteamNetworkingSockets_GetAuthenticationStatus(
280 self.sockets,
281 &mut details,
282 )
283 };
284
285 auth.try_into()
286 }
287
288 /// Returns basic information about the high-level state of the connection.
289 ///
290 /// Returns false if the connection handle is invalid.
291 pub fn get_connection_info(
292 &self,
293 connection: &NetConnection<Manager>,
294 ) -> Result<NetConnectionInfo, bool> {
295 let mut info: sys::SteamNetConnectionInfo_t = unsafe { std::mem::zeroed() };
296 let was_successful = unsafe {
297 sys::SteamAPI_ISteamNetworkingSockets_GetConnectionInfo(
298 self.sockets,
299 connection.handle,
300 &mut info,
301 )
302 };
303 if was_successful {
304 Ok(NetConnectionInfo { inner: info })
305 } else {
306 Err(false)
307 }
308 }
309
310 /// Returns a small set of information about the real-time state of the connection and the queue status of each lane.
311 ///
312 /// On entry, lanes specifies the length of the lanes array. This may be 0 if you do not wish to receive any lane data. It's OK for this to be smaller than the total number of configured lanes.
313 ///
314 /// pLanes points to an array that will receive lane-specific info. It can be NULL if this is not needed.
315 pub fn get_realtime_connection_status(
316 &self,
317 connection: &NetConnection<Manager>,
318 lanes: i32,
319 ) -> Result<
320 (
321 NetConnectionRealTimeInfo,
322 Vec<NetConnectionRealTimeLaneStatus>,
323 ),
324 SteamError,
325 > {
326 let mut info: sys::SteamNetConnectionRealTimeStatus_t = unsafe { std::mem::zeroed() };
327 let mut p_lanes: Vec<sys::SteamNetConnectionRealTimeLaneStatus_t> =
328 Vec::with_capacity(lanes as usize);
329 let result = unsafe {
330 // Get a reference to the uninitialized part of our Vec's buffer
331 let uninitialized = p_lanes.spare_capacity_mut();
332 let status = sys::SteamAPI_ISteamNetworkingSockets_GetConnectionRealTimeStatus(
333 self.sockets,
334 connection.handle,
335 &mut info,
336 lanes,
337 uninitialized.as_mut_ptr().cast(),
338 );
339 // Tell the Vec that we've manually initialized some elements
340 p_lanes.set_len(lanes as usize);
341 status
342 };
343 if result == sys::EResult::k_EResultOK {
344 Ok((
345 NetConnectionRealTimeInfo { inner: info },
346 p_lanes
347 .into_iter()
348 .map(|x| NetConnectionRealTimeLaneStatus { inner: x })
349 .collect(),
350 ))
351 } else {
352 Err(result.into())
353 }
354 }
355 /// Configure multiple outbound messages streams ("lanes") on a connection, and control head-of-line blocking between them. Messages within a given lane are always sent in the order they are queued, but messages from different lanes may be sent out of order. Each lane has its own message number sequence. The first message sent on each lane will be assigned the number 1.
356 ///
357 /// Each lane has a "priority". Lower priority lanes will only be processed when all higher-priority lanes are empty. The magnitudes of the priority values are not relevant, only their sort order. Higher numeric values take priority over lower numeric values.
358 ///
359 /// Each lane also is assigned a weight, which controls the approximate proportion of the bandwidth that will be consumed by the lane, relative to other lanes of the same priority. (This is assuming the lane stays busy. An idle lane does not build up "credits" to be be spent once a message is queued.) This value is only meaningful as a proportion, relative to other lanes with the same priority. For lanes with different priorities, the strict priority order will prevail, and their weights relative to each other are not relevant. Thus, if a lane has a unique priority value, the weight value for that lane is not relevant.
360 ///
361 /// Example: 3 lanes, with priorities { 0, 10, 10 } and weights { (NA), 20, 5 }. Messages sent on the first will always be sent first, before messages in the other two lanes. Its weight value is irrelevant, since there are no other lanes with priority=0. The other two lanes will share bandwidth, with the second and third lanes sharing bandwidth using a ratio of approximately 4:1. (The weights { NA, 4, 1 } would be equivalent.)
362 pub fn configure_connection_lanes(
363 &self,
364 connection: &NetConnection<Manager>,
365 num_lanes: i32,
366 lane_priorities: &[i32],
367 lane_weights: &[u16],
368 ) -> Result<(), SteamError> {
369 let result = unsafe {
370 sys::SteamAPI_ISteamNetworkingSockets_ConfigureConnectionLanes(
371 self.sockets,
372 connection.handle,
373 num_lanes,
374 lane_priorities.as_ptr(),
375 lane_weights.as_ptr(),
376 )
377 };
378 if result == sys::EResult::k_EResultOK {
379 Ok(())
380 } else {
381 Err(result.into())
382 }
383 }
384}
385
386/// A socket that will continually listen for client connections.
387/// Call `events()` to receive incoming connection.
388/// You should regularly check for events and answer `ConnectionRequests` requests immediately or the socket will
389/// appear as unresponsive to the client.
390///
391/// If a Listen Socket goes out of scope while there are still connections, but new requests will be rejected immediately.
392///
393/// Listen Socket Events will only be available if steam callback are regularly called.
394pub struct ListenSocket<Manager> {
395 inner: Arc<InnerSocket<Manager>>,
396 _callback_handle: Arc<CallbackHandle<Manager>>,
397 receiver: Receiver<ListenSocketEvent<Manager>>,
398}
399
400unsafe impl<Manager: Send + Sync> Send for ListenSocket<Manager> {}
401unsafe impl<Manager: Send + Sync> Sync for ListenSocket<Manager> {}
402
403impl<Manager: 'static> ListenSocket<Manager> {
404 pub(crate) fn new(
405 handle: sys::HSteamListenSocket,
406 sockets: *mut sys::ISteamNetworkingSockets,
407 inner: Arc<Inner<Manager>>,
408 ) -> Self {
409 let (sender, receiver) = std::sync::mpsc::channel();
410 let inner_socket = Arc::new(InnerSocket {
411 sockets,
412 handle,
413 inner: inner.clone(),
414 });
415 inner
416 .networking_sockets_data
417 .lock()
418 .unwrap()
419 .sockets
420 .insert(handle, (Arc::downgrade(&inner_socket), sender));
421 let callback_handle =
422 networking_sockets_callback::get_or_create_connection_callback(inner.clone(), sockets);
423 ListenSocket {
424 inner: inner_socket,
425 _callback_handle: callback_handle,
426 receiver,
427 }
428 }
429
430 /// Tries to receive a pending event. This will never block.
431 ///
432 /// You should answer ConnectionRequests immediately or the server will appear as unresponsive.
433 pub fn try_receive_event(&self) -> Option<ListenSocketEvent<Manager>> {
434 self.receiver.try_recv().ok()
435 }
436
437 /// Receive the next event. This will block until the next event is received.
438 ///
439 /// You should answer ConnectionRequests immediately or the server will appear as unresponsive.
440 pub fn receive_event(&self) -> ListenSocketEvent<Manager> {
441 self.receiver
442 .recv()
443 .expect("all senders were closed, even though the listen socket is still in use")
444 }
445
446 /// Returns an iterator for ListenSocketEvents that will block until the next event is received
447 ///
448 /// You should answer ConnectionRequests immediately or the server will appear as unresponsive.
449 pub fn events<'a>(&'a self) -> impl Iterator<Item = ListenSocketEvent<Manager>> + 'a {
450 self.receiver.iter()
451 }
452
453 /// Send one or more messages without copying the message payload.
454 /// This is the most efficient way to send messages. To use this
455 /// function, you must first allocate a message object using
456 /// ISteamNetworkingUtils::AllocateMessage. (Do not declare one
457 /// on the stack or allocate your own.)
458 ///
459 /// You should fill in the message payload. You can either let
460 /// it allocate the buffer for you and then fill in the payload,
461 /// or if you already have a buffer allocated, you can just point
462 /// m_pData at your buffer and set the callback to the appropriate function
463 /// to free it. Note that if you use your own buffer, it MUST remain valid
464 /// until the callback is executed. And also note that your callback can be
465 /// invoked at ant time from any thread (perhaps even before SendMessages
466 /// returns!), so it MUST be fast and threadsafe.
467 ///
468 /// You MUST also fill in:
469 /// - m_conn - the handle of the connection to send the message to
470 /// - m_nFlags - bitmask of k_nSteamNetworkingSend_xxx flags.
471 ///
472 /// All other fields are currently reserved and should not be modified.
473 ///
474 /// The library will take ownership of the message structures. They may
475 /// be modified or become invalid at any time, so you must not read them
476 /// after passing them to this function.
477 ///
478 /// Returns the message number or Steam error for each sent message.
479 pub fn send_messages(
480 &self,
481 messages: impl IntoIterator<Item = NetworkingMessage<Manager>>,
482 ) -> Vec<SResult<MessageNumber>> {
483 let messages: Vec<_> = messages.into_iter().map(|x| x.take_message()).collect();
484 let mut results = vec![0; messages.len()];
485 unsafe {
486 sys::SteamAPI_ISteamNetworkingSockets_SendMessages(
487 self.inner.sockets,
488 messages.len() as _,
489 messages.as_ptr(),
490 results.as_mut_ptr(),
491 );
492 // Error codes are returned as negative numbers, while positive numbers are message numbers
493 results
494 .into_iter()
495 .map(|x| {
496 if x >= 0 {
497 Ok(MessageNumber(x as u64))
498 } else {
499 Err((-x).try_into().expect("invalid error code"))
500 }
501 })
502 .collect()
503 }
504 }
505}
506
507/// Inner struct that keeps sockets alive as long as there is still a connection alive
508pub(crate) struct InnerSocket<Manager> {
509 pub(crate) sockets: *mut sys::ISteamNetworkingSockets,
510 pub(crate) handle: sys::HSteamListenSocket,
511 pub(crate) inner: Arc<Inner<Manager>>,
512}
513
514impl<Manager> Drop for InnerSocket<Manager> {
515 fn drop(&mut self) {
516 // There's no documentation for this return value, so it's most likely false when hSocket is invalid
517 // The handle should always be valid in our case.
518 let _was_successful = unsafe {
519 sys::SteamAPI_ISteamNetworkingSockets_CloseListenSocket(self.sockets, self.handle)
520 };
521
522 if let None = self
523 .inner
524 .networking_sockets_data
525 .lock()
526 .unwrap()
527 .sockets
528 .remove(&self.handle)
529 {
530 eprintln!("error while dropping InnerSocket: socket was already removed")
531 }
532 }
533}
534
535pub struct NetConnection<Manager> {
536 pub(crate) handle: sys::HSteamNetConnection,
537 sockets: *mut sys::ISteamNetworkingSockets,
538 inner: Arc<Inner<Manager>>,
539 socket: Option<Arc<InnerSocket<Manager>>>,
540 _callback_handle: Option<Arc<CallbackHandle<Manager>>>,
541 _event_receiver: Option<Receiver<()>>,
542 message_buffer: Vec<*mut SteamNetworkingMessage_t>,
543 is_handled: bool,
544}
545
546unsafe impl<Manager: Send + Sync> Send for NetConnection<Manager> {}
547unsafe impl<Manager: Send + Sync> Sync for NetConnection<Manager> {}
548
549impl<Manager: 'static> NetConnection<Manager> {
550 pub(crate) fn new(
551 handle: sys::HSteamNetConnection,
552 sockets: *mut sys::ISteamNetworkingSockets,
553 inner: Arc<Inner<Manager>>,
554 socket: Arc<InnerSocket<Manager>>,
555 ) -> Self {
556 NetConnection {
557 handle,
558 sockets,
559 inner,
560 socket: Some(socket),
561 _callback_handle: None,
562 _event_receiver: None,
563 message_buffer: Vec::new(),
564 is_handled: false,
565 }
566 }
567
568 pub(crate) fn new_independent(
569 handle: sys::HSteamNetConnection,
570 sockets: *mut sys::ISteamNetworkingSockets,
571 inner: Arc<Inner<Manager>>,
572 ) -> Self {
573 let (sender, receiver) = std::sync::mpsc::channel();
574 inner
575 .networking_sockets_data
576 .lock()
577 .unwrap()
578 .independent_connections
579 .insert(handle, sender);
580 let callback =
581 networking_sockets_callback::get_or_create_connection_callback(inner.clone(), sockets);
582 NetConnection {
583 handle,
584 sockets,
585 inner,
586 socket: None,
587 _callback_handle: Some(callback),
588 _event_receiver: Some(receiver),
589 message_buffer: Vec::new(),
590 is_handled: false,
591 }
592 }
593
594 /// Create a NetConnection without a callback for internal use (e.g. instantly rejecting connection requests to dropped sockets)
595 /// Don't use this for exposed connections, it is not set up correctly.
596 pub(crate) fn new_internal(
597 handle: sys::HSteamNetConnection,
598 sockets: *mut sys::ISteamNetworkingSockets,
599 inner: Arc<Inner<Manager>>,
600 ) -> Self {
601 NetConnection {
602 handle,
603 sockets,
604 inner,
605 socket: None,
606 _callback_handle: None,
607 _event_receiver: None,
608 message_buffer: Vec::new(),
609 is_handled: false,
610 }
611 }
612
613 /// Clear the poll group for a connection.
614 ///
615 /// Returns `Err(InvalidHandle)` when `connection` is invalid.
616 pub fn clear_poll_group(&self) -> Result<(), InvalidHandle> {
617 let was_successful = unsafe {
618 sys::SteamAPI_ISteamNetworkingSockets_SetConnectionPollGroup(
619 self.sockets,
620 self.handle,
621 sys::k_HSteamNetPollGroup_Invalid,
622 )
623 };
624
625 if was_successful {
626 Ok(())
627 } else {
628 Err(InvalidHandle)
629 }
630 }
631
632 /// Accept an incoming connection that has been received on a listen socket.
633 /// This is internally used in `ConnectionRequest` and should not be called on regular connections.
634 ///
635 /// When a connection attempt is received (perhaps after a few basic handshake
636 /// packets have been exchanged to prevent trivial spoofing), a connection interface
637 /// object is created in the k_ESteamNetworkingConnectionState_Connecting state
638 /// and a SteamNetConnectionStatusChangedCallback_t is posted. At this point, your
639 /// application MUST either accept or close the connection. (It may not ignore it.)
640 /// Accepting the connection will transition it either into the connected state,
641 /// or the finding route state, depending on the connection type.
642 ///
643 /// You should take action within a second or two, because accepting the connection is
644 /// what actually sends the reply notifying the client that they are connected. If you
645 /// delay taking action, from the client's perspective it is the same as the network
646 /// being unresponsive, and the client may timeout the connection attempt. In other
647 /// words, the client cannot distinguish between a delay caused by network problems
648 /// and a delay caused by the application.
649 ///
650 /// This means that if your application goes for more than a few seconds without
651 /// processing callbacks (for example, while loading a map), then there is a chance
652 /// that a client may attempt to connect in that interval and fail due to timeout.
653 ///
654 /// If the application does not respond to the connection attempt in a timely manner,
655 /// and we stop receiving communication from the client, the connection attempt will
656 /// be timed out locally, transitioning the connection to the
657 /// k_ESteamNetworkingConnectionState_ProblemDetectedLocally state. The client may also
658 /// close the connection before it is accepted, and a transition to the
659 /// k_ESteamNetworkingConnectionState_ClosedByPeer is also possible depending the exact
660 /// sequence of events.
661 ///
662 /// Returns k_EResultInvalidParam if the handle is invalid.
663 /// Returns k_EResultInvalidState if the connection is not in the appropriate state.
664 /// (Remember that the connection state could change in between the time that the
665 /// notification being posted to the queue and when it is received by the application.)
666 ///
667 /// A note about connection configuration options. If you need to set any configuration
668 /// options that are common to all connections accepted through a particular listen
669 /// socket, consider setting the options on the listen socket, since such options are
670 /// inherited automatically. If you really do need to set options that are connection
671 /// specific, it is safe to set them on the connection before accepting the connection.
672 pub(crate) fn accept(mut self) -> SResult<()> {
673 self.handle_connection();
674 let result = unsafe {
675 sys::SteamAPI_ISteamNetworkingSockets_AcceptConnection(self.sockets, self.handle)
676 };
677 match result {
678 sys::EResult::k_EResultOK => Ok(()),
679 error => Err(error.into()),
680 }
681 }
682
683 /// Disconnects from the remote host and invalidates the connection handle.
684 /// Any unread data on the connection is discarded.
685 ///
686 /// nReason is an application defined code that will be received on the other
687 /// end and recorded (when possible) in backend analytics. The value should
688 /// come from a restricted range. (See ESteamNetConnectionEnd.) If you don't need
689 /// to communicate any information to the remote host, and do not want analytics to
690 /// be able to distinguish "normal" connection terminations from "exceptional" ones,
691 /// You may pass zero, in which case the generic value of
692 /// k_ESteamNetConnectionEnd_App_Generic will be used.
693 ///
694 /// pszDebug is an optional human-readable diagnostic string that will be received
695 /// by the remote host and recorded (when possible) in backend analytics.
696 ///
697 /// If you wish to put the socket into a "linger" state, where an attempt is made to
698 /// flush any remaining sent data, use bEnableLinger=true. Otherwise reliable data
699 /// is not flushed.
700 ///
701 /// If the connection has already ended and you are just freeing up the
702 /// connection interface, the reason code, debug string, and linger flag are
703 /// ignored.
704 pub fn close(
705 mut self,
706 reason: NetConnectionEnd,
707 debug_string: Option<&str>,
708 enable_linger: bool,
709 ) -> bool {
710 let debug_string = debug_string.map(|x| CString::new(x).unwrap());
711 let debug_string_ptr = match debug_string {
712 None => std::ptr::null(),
713 Some(s) => s.as_ptr(),
714 };
715 self.handle_connection();
716 unsafe {
717 sys::SteamAPI_ISteamNetworkingSockets_CloseConnection(
718 self.sockets,
719 self.handle,
720 reason.into(),
721 debug_string_ptr,
722 enable_linger,
723 )
724 }
725 }
726
727 /// Fetch connection user data. Returns -1 if handle is invalid
728 /// or if you haven't set any userdata on the connection.
729 pub fn connection_user_data(&self) -> Result<i64, InvalidHandle> {
730 let user_data = unsafe {
731 sys::SteamAPI_ISteamNetworkingSockets_GetConnectionUserData(self.sockets, self.handle)
732 };
733 if user_data == -1 {
734 // I'm not sure if a connection can become invalid on its own, so returning a result may be unnecessary
735 Err(InvalidHandle)
736 } else {
737 Ok(user_data)
738 }
739 }
740
741 /// Set connection user data. the data is returned in the following places
742 /// - You can query it using GetConnectionUserData.
743 /// - The SteamNetworkingmessage_t structure.
744 /// - The SteamNetConnectionInfo_t structure. (Which is a member of SteamNetConnectionStatusChangedCallback_t.)
745 ///
746 /// Returns false if the handle is invalid.
747 pub fn set_connection_user_data(&self, user_data: i64) -> Result<(), InvalidHandle> {
748 let was_successful = unsafe {
749 sys::SteamAPI_ISteamNetworkingSockets_SetConnectionUserData(
750 self.sockets,
751 self.handle,
752 user_data,
753 )
754 };
755 if was_successful {
756 Ok(())
757 } else {
758 Err(InvalidHandle)
759 }
760 }
761
762 /// Set a name for the connection, used mostly for debugging
763 pub fn set_connection_name(&self, name: &str) {
764 let name = CString::new(name).unwrap();
765 unsafe {
766 sys::SteamAPI_ISteamNetworkingSockets_SetConnectionName(
767 self.sockets,
768 self.handle,
769 name.as_ptr(),
770 )
771 }
772 }
773
774 /// Send a message to the remote host on the specified connection.
775 ///
776 /// nSendFlags determines the delivery guarantees that will be provided,
777 /// when data should be buffered, etc. E.g. k_nSteamNetworkingSend_Unreliable
778 ///
779 /// Note that the semantics we use for messages are not precisely
780 /// the same as the semantics of a standard "stream" socket.
781 /// (SOCK_STREAM) For an ordinary stream socket, the boundaries
782 /// between chunks are not considered relevant, and the sizes of
783 /// the chunks of data written will not necessarily match up to
784 /// the sizes of the chunks that are returned by the reads on
785 /// the other end. The remote host might read a partial chunk,
786 /// or chunks might be coalesced. For the message semantics
787 /// used here, however, the sizes WILL match. Each send call
788 /// will match a successful read call on the remote host
789 /// one-for-one. If you are porting existing stream-oriented
790 /// code to the semantics of reliable messages, your code should
791 /// work the same, since reliable message semantics are more
792 /// strict than stream semantics. The only caveat is related to
793 /// performance: there is per-message overhead to retain the
794 /// message sizes, and so if your code sends many small chunks
795 /// of data, performance will suffer. Any code based on stream
796 /// sockets that does not write excessively small chunks will
797 /// work without any changes.
798 ///
799 /// The pOutMessageNumber is an optional pointer to receive the
800 /// message number assigned to the message, if sending was successful.
801 ///
802 /// Returns:
803 /// - k_EResultInvalidParam: invalid connection handle, or the individual message is too big.
804 /// (See k_cbMaxSteamNetworkingSocketsMessageSizeSend)
805 /// - k_EResultInvalidState: connection is in an invalid state
806 /// - k_EResultNoConnection: connection has ended
807 /// - k_EResultIgnored: You used k_nSteamNetworkingSend_NoDelay, and the message was dropped because
808 /// we were not ready to send it.
809 /// - k_EResultLimitExceeded: there was already too much data queued to be sent.
810 /// (See k_ESteamNetworkingConfig_SendBufferSize)
811 pub fn send_message(&self, data: &[u8], send_flags: SendFlags) -> SResult<MessageNumber> {
812 unsafe {
813 let mut out_message_number = 0i64;
814 let result = sys::SteamAPI_ISteamNetworkingSockets_SendMessageToConnection(
815 self.sockets,
816 self.handle,
817 data.as_ptr() as _,
818 data.len() as _,
819 send_flags.bits(),
820 &mut out_message_number,
821 );
822 match result {
823 sys::EResult::k_EResultOK => Ok(MessageNumber(out_message_number as u64)),
824 error => Err(error.into()),
825 }
826 }
827 }
828
829 /// Fetch connection name. Returns false if handle is invalid
830 pub fn connection_name(&self) -> Result<(), InvalidHandle> {
831 unimplemented!()
832 }
833
834 /// Flush any messages waiting on the Nagle timer and send them
835 /// at the next transmission opportunity (often that means right now).
836 ///
837 /// If Nagle is enabled (it's on by default) then when calling
838 /// SendMessageToConnection the message will be buffered, up to the Nagle time
839 /// before being sent, to merge small messages into the same packet.
840 /// (See k_ESteamNetworkingConfig_NagleTime)
841 ///
842 /// Returns:
843 /// k_EResultInvalidParam: invalid connection handle
844 /// k_EResultInvalidState: connection is in an invalid state
845 /// k_EResultNoConnection: connection has ended
846 /// k_EResultIgnored: We weren't (yet) connected, so this operation has no effect.
847 pub fn flush_messages(&self) -> SResult<()> {
848 unsafe {
849 let result = sys::SteamAPI_ISteamNetworkingSockets_FlushMessagesOnConnection(
850 self.sockets,
851 self.handle,
852 );
853 if let sys::EResult::k_EResultOK = result {
854 Ok(())
855 } else {
856 Err(result.into())
857 }
858 }
859 }
860
861 /// Fetch the next available message(s) from the connection, if any.
862 /// Returns the number of messages returned into your array, up to nMaxMessages.
863 /// If the connection handle is invalid, -1 is returned.
864 ///
865 /// The order of the messages returned in the array is relevant.
866 /// Reliable messages will be received in the order they were sent (and with the
867 /// same sizes --- see SendMessageToConnection for on this subtle difference from a stream socket).
868 ///
869 /// Unreliable messages may be dropped, or delivered out of order with respect to
870 /// each other or with respect to reliable messages. The same unreliable message
871 /// may be received multiple times.
872 ///
873 /// If any messages are returned, you MUST call SteamNetworkingMessage_t::Release() on each
874 /// of them free up resources after you are done. It is safe to keep the object alive for
875 /// a little while (put it into some queue, etc), and you may call Release() from any thread.
876 pub fn receive_messages(
877 &mut self,
878 batch_size: usize,
879 ) -> Result<Vec<NetworkingMessage<Manager>>, InvalidHandle> {
880 if self.message_buffer.capacity() < batch_size {
881 self.message_buffer
882 .reserve(batch_size - self.message_buffer.capacity());
883 }
884
885 unsafe {
886 let message_count = sys::SteamAPI_ISteamNetworkingSockets_ReceiveMessagesOnConnection(
887 self.sockets,
888 self.handle,
889 self.message_buffer.as_mut_ptr(),
890 batch_size as _,
891 );
892 if message_count < 0 {
893 return Err(InvalidHandle);
894 }
895 self.message_buffer.set_len(message_count as usize);
896 }
897
898 Ok(self
899 .message_buffer
900 .drain(..)
901 .map(|x| NetworkingMessage {
902 message: x,
903 _inner: self.inner.clone(),
904 })
905 .collect())
906 }
907
908 /// Assign a connection to a poll group. Note that a connection may only belong to a
909 /// single poll group. Adding a connection to a poll group implicitly removes it from
910 /// any other poll group it is in.
911 ///
912 /// You can call `clear_connection_poll_group` to remove a connection from its current
913 /// poll group without adding it to a new poll group.
914 ///
915 /// If there are received messages currently pending on the connection, an attempt
916 /// is made to add them to the queue of messages for the poll group in approximately
917 /// the order that would have applied if the connection was already part of the poll
918 /// group at the time that the messages were received.
919 ///
920 /// Returns false if the connection handle is invalid, or if the poll group handle
921 /// is invalid (and not k_HSteamNetPollGroup_Invalid).
922 pub fn set_poll_group(&self, poll_group: &NetPollGroup<Manager>) {
923 let was_successful = unsafe {
924 sys::SteamAPI_ISteamNetworkingSockets_SetConnectionPollGroup(
925 self.sockets,
926 self.handle,
927 poll_group.handle,
928 )
929 };
930 debug_assert!(was_successful);
931 }
932
933 pub fn run_callbacks(&self) {
934 unsafe { sys::SteamAPI_ISteamNetworkingSockets_RunCallbacks(self.sockets) }
935 }
936
937 /// Set the connection state to be handled externally. The struct will no longer close the connection on drop.
938 pub(crate) fn handle_connection(&mut self) {
939 self.is_handled = true
940 }
941}
942
943impl<Manager> Drop for NetConnection<Manager> {
944 fn drop(&mut self) {
945 if !self.is_handled {
946 let debug_string = CString::new("Handle was dropped").unwrap();
947 let _was_successful = unsafe {
948 sys::SteamAPI_ISteamNetworkingSockets_CloseConnection(
949 self.sockets,
950 self.handle,
951 NetConnectionEnd::AppGeneric.into(),
952 debug_string.as_ptr(),
953 false,
954 )
955 };
956
957 if self.socket.is_none() {
958 self.inner
959 .networking_sockets_data
960 .lock()
961 .unwrap()
962 .independent_connections
963 .remove(&self.handle)
964 .expect("internal connection was removed before being dropped");
965 }
966 }
967 }
968}
969
970pub struct NetPollGroup<Manager> {
971 handle: sys::HSteamNetPollGroup,
972 sockets: *mut sys::ISteamNetworkingSockets,
973 inner: Arc<Inner<Manager>>,
974 message_buffer: Vec<*mut SteamNetworkingMessage_t>,
975}
976
977unsafe impl<Manager: Send + Sync> Send for NetPollGroup<Manager> {}
978unsafe impl<Manager: Send + Sync> Sync for NetPollGroup<Manager> {}
979
980impl<Manager> NetPollGroup<Manager> {
981 pub fn receive_messages(&mut self, batch_size: usize) -> Vec<NetworkingMessage<Manager>> {
982 if self.message_buffer.capacity() < batch_size {
983 self.message_buffer
984 .reserve(batch_size - self.message_buffer.capacity());
985 }
986
987 unsafe {
988 let count = sys::SteamAPI_ISteamNetworkingSockets_ReceiveMessagesOnPollGroup(
989 self.sockets,
990 self.handle,
991 self.message_buffer.as_mut_ptr(),
992 batch_size as _,
993 ) as usize;
994 self.message_buffer.set_len(count);
995 }
996
997 self.message_buffer
998 .drain(..)
999 .map(|x| NetworkingMessage {
1000 message: x,
1001 _inner: self.inner.clone(),
1002 })
1003 .collect()
1004 }
1005}
1006
1007impl<Manager> Drop for NetPollGroup<Manager> {
1008 fn drop(&mut self) {
1009 let _was_successful = unsafe {
1010 sys::SteamAPI_ISteamNetworkingSockets_DestroyPollGroup(self.sockets, self.handle)
1011 };
1012 }
1013}
1014
1015#[derive(Debug, Error)]
1016#[error("operation was unsuccessful an invalid handle was returned")]
1017pub struct InvalidHandle;
1018
1019#[cfg(test)]
1020mod tests {
1021 use std::net::Ipv4Addr;
1022
1023 use crate::{networking_types::NetworkingConnectionState, Client};
1024
1025 use super::*;
1026 use crate::networking_types::{
1027 ListenSocketEvent, NetworkingConfigEntry, NetworkingConfigValue, SendFlags,
1028 };
1029
1030 #[test]
1031 #[serial]
1032 fn test_create_listen_socket_ip() {
1033 let (client, _single) = Client::init().unwrap();
1034 let sockets = client.networking_sockets();
1035 let socket_result = sockets.create_listen_socket_ip(
1036 SocketAddr::new(Ipv4Addr::new(0, 0, 0, 0).into(), 12345),
1037 vec![],
1038 );
1039 assert!(socket_result.is_ok());
1040 }
1041
1042 #[test]
1043 fn test_socket_connection() {
1044 let (client, single) = Client::init().unwrap();
1045 let sockets = client.networking_sockets();
1046
1047 sockets.init_authentication().unwrap();
1048
1049 let debug_config = vec![NetworkingConfigEntry::new_int32(
1050 NetworkingConfigValue::IPAllowWithoutAuth,
1051 1,
1052 )];
1053
1054 println!("Create ListenSocket");
1055 let bound_ip = SocketAddr::new(Ipv4Addr::new(127, 0, 0, 1).into(), 12345);
1056 let socket = sockets
1057 .create_listen_socket_ip(bound_ip, debug_config.clone())
1058 .unwrap();
1059
1060 println!("Create connection");
1061 let mut to_server = sockets
1062 .connect_by_ip_address(bound_ip, debug_config)
1063 .unwrap();
1064
1065 println!("Run callbacks");
1066 for _ in 0..5 {
1067 single.run_callbacks();
1068 std::thread::sleep(::std::time::Duration::from_millis(50));
1069 }
1070
1071 let event = socket.try_receive_event().unwrap();
1072 match event {
1073 ListenSocketEvent::Connecting(request) => {
1074 println!("Accept connection");
1075 request.accept().unwrap();
1076 }
1077 _ => panic!("unexpected event"),
1078 }
1079
1080 println!("Run callbacks");
1081 for _ in 0..5 {
1082 single.run_callbacks();
1083 std::thread::sleep(::std::time::Duration::from_millis(50));
1084 }
1085
1086 let event = socket.try_receive_event().unwrap();
1087 let mut to_client = match event {
1088 ListenSocketEvent::Connected(connected) => connected.take_connection(),
1089 _ => panic!("unexpected event"),
1090 };
1091
1092 println!("Configure connection lanes");
1093 let mut lane_priorities = vec![0; 2];
1094 let mut lane_weights = vec![0; 2];
1095 lane_priorities[0] = 1;
1096 lane_weights[0] = 1;
1097 lane_priorities[1] = 1;
1098 lane_weights[1] = 3;
1099
1100 let result =
1101 sockets.configure_connection_lanes(&to_server, 2, &lane_priorities, &lane_weights);
1102 assert!(result.is_ok());
1103
1104 println!("Get connection info remote client");
1105 let info = sockets.get_connection_info(&to_client).unwrap();
1106 match info.state() {
1107 Ok(state) => assert_eq!(state, NetworkingConnectionState::Connected),
1108 _ => panic!("unexpected state"),
1109 }
1110
1111 println!("Get connection info server");
1112 let info = sockets.get_connection_info(&to_server).unwrap();
1113 match info.state() {
1114 Ok(state) => assert_eq!(state, NetworkingConnectionState::Connected),
1115 _ => panic!("unexpected state"),
1116 }
1117
1118 println!("Get quick connection info remote client");
1119 let (info, lanes) = sockets
1120 .get_realtime_connection_status(&to_client, 0)
1121 .unwrap();
1122 if let Ok(net_connection) = info.connection_state() {
1123 assert_eq!(net_connection, NetworkingConnectionState::Connected);
1124 assert_eq!(lanes.len(), 0);
1125 } else {
1126 panic!("unexpected state");
1127 }
1128
1129 println!("Get quick connection info server");
1130 let (info, lanes) = sockets
1131 .get_realtime_connection_status(&to_server, 2)
1132 .unwrap();
1133 if let Ok(net_connection) = info.connection_state() {
1134 assert_eq!(net_connection, NetworkingConnectionState::Connected);
1135 assert_eq!(lanes.len(), 2);
1136 } else {
1137 panic!("unexpected state");
1138 }
1139
1140 println!("Send message to server");
1141 to_server
1142 .send_message(&[1, 1, 2, 5], SendFlags::RELIABLE_NO_NAGLE)
1143 .unwrap();
1144
1145 std::thread::sleep(::std::time::Duration::from_millis(100));
1146
1147 println!("Receive message");
1148 let messages = to_client.receive_messages(10).unwrap();
1149 assert_eq!(messages.len(), 1);
1150 assert_eq!(messages[0].data(), &[1, 1, 2, 5]);
1151
1152 println!("Send message to client");
1153 to_client
1154 .send_message(&[3, 3, 3, 1], SendFlags::RELIABLE_NO_NAGLE)
1155 .unwrap();
1156
1157 std::thread::sleep(::std::time::Duration::from_millis(100));
1158
1159 println!("Receive message");
1160 let messages = to_server.receive_messages(10).unwrap();
1161 assert_eq!(messages.len(), 1);
1162 assert_eq!(messages[0].data(), &[3, 3, 3, 1]);
1163
1164 println!("Send message to client with send_messages");
1165 let utils = client.networking_utils();
1166 let mut message = utils.allocate_message(0);
1167 message.set_connection(&to_client);
1168 message.set_send_flags(SendFlags::RELIABLE_NO_NAGLE);
1169 message.set_data(vec![1, 2, 34, 5]).unwrap();
1170 socket.send_messages(vec![message]);
1171
1172 std::thread::sleep(::std::time::Duration::from_millis(1000));
1173
1174 println!("Receive message");
1175 let messages = to_server.receive_messages(10).unwrap();
1176 assert_eq!(messages.len(), 1);
1177 assert_eq!(messages[0].data(), &[1, 2, 34, 5]);
1178 }
1179}