1#[cfg(unix)]
4use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
5#[cfg(windows)]
6use std::os::windows::io::{AsRawSocket, FromRawSocket, IntoRawSocket, RawSocket};
7#[cfg(feature = "v2_60")]
8use std::time::Duration;
9use std::{cell::RefCell, marker::PhantomData, mem::transmute, pin::Pin, ptr};
10
11use futures_core::stream::Stream;
12use glib::{prelude::*, translate::*, Slice};
13
14#[cfg(feature = "v2_60")]
15use crate::PollableReturn;
16use crate::{ffi, Cancellable, Socket, SocketAddress, SocketControlMessage};
17
18impl Socket {
19 #[cfg(unix)]
20 #[cfg_attr(docsrs, doc(cfg(unix)))]
21 #[allow(clippy::missing_safety_doc)]
22 pub unsafe fn from_fd(fd: impl IntoRawFd) -> Result<Socket, glib::Error> {
23 let fd = fd.into_raw_fd();
24 let mut error = ptr::null_mut();
25 let ret = ffi::g_socket_new_from_fd(fd, &mut error);
26 if error.is_null() {
27 Ok(from_glib_full(ret))
28 } else {
29 Err(from_glib_full(error))
30 }
31 }
32 #[cfg(windows)]
33 #[cfg_attr(docsrs, doc(cfg(windows)))]
34 #[allow(clippy::missing_safety_doc)]
35 pub unsafe fn from_socket(socket: impl IntoRawSocket) -> Result<Socket, glib::Error> {
36 let socket = socket.into_raw_socket();
37 let mut error = ptr::null_mut();
38 let ret = ffi::g_socket_new_from_fd(socket as i32, &mut error);
39 if error.is_null() {
40 Ok(from_glib_full(ret))
41 } else {
42 Err(from_glib_full(error))
43 }
44 }
45}
46
47#[cfg(unix)]
48#[cfg_attr(docsrs, doc(cfg(unix)))]
49impl AsRawFd for Socket {
50 fn as_raw_fd(&self) -> RawFd {
51 unsafe { ffi::g_socket_get_fd(self.to_glib_none().0) as _ }
52 }
53}
54
55#[cfg(windows)]
56#[cfg_attr(docsrs, doc(cfg(windows)))]
57impl AsRawSocket for Socket {
58 fn as_raw_socket(&self) -> RawSocket {
59 unsafe { ffi::g_socket_get_fd(self.to_glib_none().0) as _ }
60 }
61}
62
63#[doc(alias = "GInputVector")]
64#[repr(transparent)]
65#[derive(Debug)]
66pub struct InputVector<'v> {
67 vector: ffi::GInputVector,
68 buffer: PhantomData<&'v mut [u8]>,
69}
70
71impl<'v> InputVector<'v> {
72 #[inline]
73 pub fn new(buffer: &'v mut [u8]) -> Self {
74 Self {
75 vector: ffi::GInputVector {
76 buffer: buffer.as_mut_ptr() as *mut _,
77 size: buffer.len(),
78 },
79 buffer: PhantomData,
80 }
81 }
82}
83
84unsafe impl Send for InputVector<'_> {}
85unsafe impl Sync for InputVector<'_> {}
86
87impl std::ops::Deref for InputVector<'_> {
88 type Target = [u8];
89
90 #[inline]
91 fn deref(&self) -> &Self::Target {
92 unsafe { std::slice::from_raw_parts(self.vector.buffer as *const _, self.vector.size) }
93 }
94}
95
96impl std::ops::DerefMut for InputVector<'_> {
97 #[inline]
98 fn deref_mut(&mut self) -> &mut Self::Target {
99 unsafe { std::slice::from_raw_parts_mut(self.vector.buffer as *mut _, self.vector.size) }
100 }
101}
102
103#[derive(Debug)]
104pub struct SocketControlMessages {
105 ptr: *mut *mut ffi::GSocketControlMessage,
106 len: u32,
107}
108
109impl SocketControlMessages {
110 #[inline]
111 pub const fn new() -> Self {
112 Self {
113 ptr: ptr::null_mut(),
114 len: 0,
115 }
116 }
117}
118
119impl AsRef<[SocketControlMessage]> for SocketControlMessages {
120 #[inline]
121 fn as_ref(&self) -> &[SocketControlMessage] {
122 unsafe { std::slice::from_raw_parts(self.ptr as *const _, self.len as usize) }
123 }
124}
125
126impl std::ops::Deref for SocketControlMessages {
127 type Target = [SocketControlMessage];
128
129 #[inline]
130 fn deref(&self) -> &Self::Target {
131 self.as_ref()
132 }
133}
134
135impl Default for SocketControlMessages {
136 #[inline]
137 fn default() -> Self {
138 Self::new()
139 }
140}
141
142impl Drop for SocketControlMessages {
143 #[inline]
144 fn drop(&mut self) {
145 unsafe {
146 let _: Slice<SocketControlMessage> =
147 Slice::from_glib_full_num(self.ptr as *mut _, self.len as usize);
148 }
149 }
150}
151
152#[doc(alias = "GInputMessage")]
153#[repr(transparent)]
154#[derive(Debug)]
155pub struct InputMessage<'m> {
156 message: ffi::GInputMessage,
157 address: PhantomData<Option<&'m mut Option<SocketAddress>>>,
158 vectors: PhantomData<&'m mut [InputVector<'m>]>,
159 control_messages: PhantomData<Option<&'m mut SocketControlMessages>>,
160}
161
162impl<'m> InputMessage<'m> {
163 pub fn new(
164 mut address: Option<&'m mut Option<SocketAddress>>,
165 vectors: &'m mut [InputVector<'m>],
166 control_messages: Option<&'m mut SocketControlMessages>,
167 ) -> Self {
168 let address = address
169 .as_mut()
170 .map(|a| {
171 assert!(a.is_none());
172 *a as *mut _ as *mut _
173 })
174 .unwrap_or_else(ptr::null_mut);
175 let (control_messages, num_control_messages) = control_messages
176 .map(|c| (&mut c.ptr as *mut _, &mut c.len as *mut _))
177 .unwrap_or_else(|| (ptr::null_mut(), ptr::null_mut()));
178 Self {
179 message: ffi::GInputMessage {
180 address,
181 vectors: vectors.as_mut_ptr() as *mut ffi::GInputVector,
182 num_vectors: vectors.len().try_into().unwrap(),
183 bytes_received: 0,
184 flags: 0,
185 control_messages,
186 num_control_messages,
187 },
188 address: PhantomData,
189 vectors: PhantomData,
190 control_messages: PhantomData,
191 }
192 }
193 #[inline]
194 pub fn vectors(&mut self) -> &'m mut [InputVector<'m>] {
195 unsafe {
196 std::slice::from_raw_parts_mut(
197 self.message.vectors as *mut _,
198 self.message.num_vectors as usize,
199 )
200 }
201 }
202 #[inline]
203 pub const fn flags(&self) -> i32 {
204 self.message.flags
205 }
206 #[inline]
207 pub const fn bytes_received(&self) -> usize {
208 self.message.bytes_received
209 }
210}
211
212#[doc(alias = "GOutputVector")]
213#[repr(transparent)]
214#[derive(Debug)]
215pub struct OutputVector<'v> {
216 vector: ffi::GOutputVector,
217 buffer: PhantomData<&'v [u8]>,
218}
219
220impl<'v> OutputVector<'v> {
221 #[inline]
222 pub const fn new(buffer: &'v [u8]) -> Self {
223 Self {
224 vector: ffi::GOutputVector {
225 buffer: buffer.as_ptr() as *const _,
226 size: buffer.len(),
227 },
228 buffer: PhantomData,
229 }
230 }
231}
232
233unsafe impl Send for OutputVector<'_> {}
234unsafe impl Sync for OutputVector<'_> {}
235
236impl std::ops::Deref for OutputVector<'_> {
237 type Target = [u8];
238
239 #[inline]
240 fn deref(&self) -> &Self::Target {
241 unsafe { std::slice::from_raw_parts(self.vector.buffer as *const _, self.vector.size) }
242 }
243}
244
245#[doc(alias = "GOutputMessage")]
246#[repr(transparent)]
247#[derive(Debug)]
248pub struct OutputMessage<'m> {
249 message: ffi::GOutputMessage,
250 address: PhantomData<Option<&'m SocketAddress>>,
251 vectors: PhantomData<&'m [OutputVector<'m>]>,
252 control_messages: PhantomData<&'m [SocketControlMessage]>,
253}
254
255impl<'m> OutputMessage<'m> {
256 pub fn new<A: IsA<SocketAddress>>(
257 address: Option<&'m A>,
258 vectors: &'m [OutputVector<'m>],
259 control_messages: &'m [SocketControlMessage],
260 ) -> Self {
261 Self {
262 message: ffi::GOutputMessage {
263 address: address
264 .map(|a| a.upcast_ref::<SocketAddress>().as_ptr())
265 .unwrap_or_else(ptr::null_mut),
266 vectors: mut_override(vectors.as_ptr() as *const ffi::GOutputVector),
267 num_vectors: vectors.len().try_into().unwrap(),
268 bytes_sent: 0,
269 control_messages: control_messages.as_ptr() as *mut _,
270 num_control_messages: control_messages.len().try_into().unwrap(),
271 },
272 address: PhantomData,
273 vectors: PhantomData,
274 control_messages: PhantomData,
275 }
276 }
277 #[inline]
278 pub fn vectors(&self) -> &'m [OutputVector<'m>] {
279 unsafe {
280 std::slice::from_raw_parts(
281 self.message.vectors as *const _,
282 self.message.num_vectors as usize,
283 )
284 }
285 }
286 #[inline]
287 pub fn bytes_sent(&self) -> u32 {
288 self.message.bytes_sent
289 }
290}
291
292mod sealed {
293 pub trait Sealed {}
294 impl<T: super::IsA<super::Socket>> Sealed for T {}
295}
296
297pub trait SocketExtManual: sealed::Sealed + IsA<Socket> + Sized {
298 #[doc(alias = "g_socket_receive")]
299 fn receive<B: AsMut<[u8]>, C: IsA<Cancellable>>(
300 &self,
301 mut buffer: B,
302 cancellable: Option<&C>,
303 ) -> Result<usize, glib::Error> {
304 let cancellable = cancellable.map(|c| c.as_ref());
305 let gcancellable = cancellable.to_glib_none();
306 let buffer = buffer.as_mut();
307 let buffer_ptr = buffer.as_mut_ptr();
308 let count = buffer.len();
309 unsafe {
310 let mut error = ptr::null_mut();
311 let ret = ffi::g_socket_receive(
312 self.as_ref().to_glib_none().0,
313 buffer_ptr,
314 count,
315 gcancellable.0,
316 &mut error,
317 );
318 if error.is_null() {
319 Ok(ret as usize)
320 } else {
321 Err(from_glib_full(error))
322 }
323 }
324 }
325 #[doc(alias = "g_socket_receive_from")]
326 fn receive_from<B: AsMut<[u8]>, C: IsA<Cancellable>>(
327 &self,
328 mut buffer: B,
329 cancellable: Option<&C>,
330 ) -> Result<(usize, SocketAddress), glib::Error> {
331 let cancellable = cancellable.map(|c| c.as_ref());
332 let gcancellable = cancellable.to_glib_none();
333 let buffer = buffer.as_mut();
334 let buffer_ptr = buffer.as_mut_ptr();
335 let count = buffer.len();
336 unsafe {
337 let mut error = ptr::null_mut();
338 let mut addr_ptr = ptr::null_mut();
339
340 let ret = ffi::g_socket_receive_from(
341 self.as_ref().to_glib_none().0,
342 &mut addr_ptr,
343 buffer_ptr,
344 count,
345 gcancellable.0,
346 &mut error,
347 );
348 if error.is_null() {
349 Ok((ret as usize, from_glib_full(addr_ptr)))
350 } else {
351 Err(from_glib_full(error))
352 }
353 }
354 }
355 #[doc(alias = "g_socket_receive_message")]
356 fn receive_message<C: IsA<Cancellable>>(
357 &self,
358 mut address: Option<&mut Option<SocketAddress>>,
359 vectors: &mut [InputVector],
360 control_messages: Option<&mut SocketControlMessages>,
361 mut flags: i32,
362 cancellable: Option<&C>,
363 ) -> Result<(usize, i32), glib::Error> {
364 let cancellable = cancellable.map(|c| c.as_ref());
365 let address = address
366 .as_mut()
367 .map(|a| {
368 assert!(a.is_none());
369 *a as *mut _ as *mut _
370 })
371 .unwrap_or_else(ptr::null_mut);
372 let (control_messages, num_control_messages) = control_messages
373 .map(|c| (&mut c.ptr as *mut _, &mut c.len as *mut _ as *mut _))
374 .unwrap_or_else(|| (ptr::null_mut(), ptr::null_mut()));
375 unsafe {
376 let mut error = ptr::null_mut();
377
378 let received = ffi::g_socket_receive_message(
379 self.as_ref().to_glib_none().0,
380 address,
381 vectors.as_mut_ptr() as *mut ffi::GInputVector,
382 vectors.len().try_into().unwrap(),
383 control_messages,
384 num_control_messages,
385 &mut flags,
386 cancellable.to_glib_none().0,
387 &mut error,
388 );
389 if error.is_null() {
390 Ok((received as usize, flags))
391 } else {
392 Err(from_glib_full(error))
393 }
394 }
395 }
396 #[doc(alias = "g_socket_receive_messages")]
397 fn receive_messages<C: IsA<Cancellable>>(
398 &self,
399 messages: &mut [InputMessage],
400 flags: i32,
401 cancellable: Option<&C>,
402 ) -> Result<usize, glib::Error> {
403 let cancellable = cancellable.map(|c| c.as_ref());
404 unsafe {
405 let mut error = ptr::null_mut();
406
407 let count = ffi::g_socket_receive_messages(
408 self.as_ref().to_glib_none().0,
409 messages.as_mut_ptr() as *mut _,
410 messages.len().try_into().unwrap(),
411 flags,
412 cancellable.to_glib_none().0,
413 &mut error,
414 );
415 if error.is_null() {
416 Ok(count as usize)
417 } else {
418 Err(from_glib_full(error))
419 }
420 }
421 }
422 #[doc(alias = "g_socket_receive_with_blocking")]
423 fn receive_with_blocking<B: AsMut<[u8]>, C: IsA<Cancellable>>(
424 &self,
425 mut buffer: B,
426 blocking: bool,
427 cancellable: Option<&C>,
428 ) -> Result<usize, glib::Error> {
429 let cancellable = cancellable.map(|c| c.as_ref());
430 let gcancellable = cancellable.to_glib_none();
431 let buffer = buffer.as_mut();
432 let buffer_ptr = buffer.as_mut_ptr();
433 let count = buffer.len();
434 unsafe {
435 let mut error = ptr::null_mut();
436 let ret = ffi::g_socket_receive_with_blocking(
437 self.as_ref().to_glib_none().0,
438 buffer_ptr,
439 count,
440 blocking.into_glib(),
441 gcancellable.0,
442 &mut error,
443 );
444 if error.is_null() {
445 Ok(ret as usize)
446 } else {
447 Err(from_glib_full(error))
448 }
449 }
450 }
451
452 #[doc(alias = "g_socket_send")]
453 fn send<B: AsRef<[u8]>, C: IsA<Cancellable>>(
454 &self,
455 buffer: B,
456 cancellable: Option<&C>,
457 ) -> Result<usize, glib::Error> {
458 let cancellable = cancellable.map(|c| c.as_ref());
459 let gcancellable = cancellable.to_glib_none();
460 let (count, buffer_ptr) = {
461 let slice = buffer.as_ref();
462 (slice.len(), slice.as_ptr())
463 };
464 unsafe {
465 let mut error = ptr::null_mut();
466 let ret = ffi::g_socket_send(
467 self.as_ref().to_glib_none().0,
468 mut_override(buffer_ptr),
469 count,
470 gcancellable.0,
471 &mut error,
472 );
473 if error.is_null() {
474 Ok(ret as usize)
475 } else {
476 Err(from_glib_full(error))
477 }
478 }
479 }
480 #[doc(alias = "g_socket_send_message")]
481 fn send_message<P: IsA<SocketAddress>, C: IsA<Cancellable>>(
482 &self,
483 address: Option<&P>,
484 vectors: &[OutputVector],
485 messages: &[SocketControlMessage],
486 flags: i32,
487 cancellable: Option<&C>,
488 ) -> Result<usize, glib::Error> {
489 let cancellable = cancellable.map(|c| c.as_ref());
490 unsafe {
491 let mut error = ptr::null_mut();
492 let ret = ffi::g_socket_send_message(
493 self.as_ref().to_glib_none().0,
494 address.map(|p| p.as_ref()).to_glib_none().0,
495 vectors.as_ptr() as *mut ffi::GOutputVector,
496 vectors.len().try_into().unwrap(),
497 messages.as_ptr() as *mut _,
498 messages.len().try_into().unwrap(),
499 flags,
500 cancellable.to_glib_none().0,
501 &mut error,
502 );
503 if error.is_null() {
504 Ok(ret as usize)
505 } else {
506 Err(from_glib_full(error))
507 }
508 }
509 }
510 #[cfg(feature = "v2_60")]
511 #[cfg_attr(docsrs, doc(cfg(feature = "v2_60")))]
512 #[doc(alias = "g_socket_send_message_with_timeout")]
513 fn send_message_with_timeout<P: IsA<SocketAddress>, C: IsA<Cancellable>>(
514 &self,
515 address: Option<&P>,
516 vectors: &[OutputVector],
517 messages: &[SocketControlMessage],
518 flags: i32,
519 timeout: Option<Duration>,
520 cancellable: Option<&C>,
521 ) -> Result<(PollableReturn, usize), glib::Error> {
522 let cancellable = cancellable.map(|c| c.as_ref());
523 unsafe {
524 let mut error = ptr::null_mut();
525 let mut bytes_written = 0;
526
527 let ret = ffi::g_socket_send_message_with_timeout(
528 self.as_ref().to_glib_none().0,
529 address.map(|p| p.as_ref()).to_glib_none().0,
530 vectors.as_ptr() as *mut ffi::GOutputVector,
531 vectors.len().try_into().unwrap(),
532 messages.as_ptr() as *mut _,
533 messages.len().try_into().unwrap(),
534 flags,
535 timeout
536 .map(|t| t.as_micros().try_into().unwrap())
537 .unwrap_or(-1),
538 &mut bytes_written,
539 cancellable.to_glib_none().0,
540 &mut error,
541 );
542 if error.is_null() {
543 Ok((from_glib(ret), bytes_written))
544 } else {
545 Err(from_glib_full(error))
546 }
547 }
548 }
549 #[doc(alias = "g_socket_send_messages")]
550 fn send_messages<C: IsA<Cancellable>>(
551 &self,
552 messages: &mut [OutputMessage],
553 flags: i32,
554 cancellable: Option<&C>,
555 ) -> Result<usize, glib::Error> {
556 let cancellable = cancellable.map(|c| c.as_ref());
557 unsafe {
558 let mut error = ptr::null_mut();
559 let count = ffi::g_socket_send_messages(
560 self.as_ref().to_glib_none().0,
561 messages.as_mut_ptr() as *mut _,
562 messages.len().try_into().unwrap(),
563 flags,
564 cancellable.to_glib_none().0,
565 &mut error,
566 );
567 if error.is_null() {
568 Ok(count as usize)
569 } else {
570 Err(from_glib_full(error))
571 }
572 }
573 }
574 #[doc(alias = "g_socket_send_to")]
575 fn send_to<B: AsRef<[u8]>, P: IsA<SocketAddress>, C: IsA<Cancellable>>(
576 &self,
577 address: Option<&P>,
578 buffer: B,
579 cancellable: Option<&C>,
580 ) -> Result<usize, glib::Error> {
581 let cancellable = cancellable.map(|c| c.as_ref());
582 let gcancellable = cancellable.to_glib_none();
583 let (count, buffer_ptr) = {
584 let slice = buffer.as_ref();
585 (slice.len(), slice.as_ptr())
586 };
587 unsafe {
588 let mut error = ptr::null_mut();
589
590 let ret = ffi::g_socket_send_to(
591 self.as_ref().to_glib_none().0,
592 address.map(|p| p.as_ref()).to_glib_none().0,
593 mut_override(buffer_ptr),
594 count,
595 gcancellable.0,
596 &mut error,
597 );
598 if error.is_null() {
599 Ok(ret as usize)
600 } else {
601 Err(from_glib_full(error))
602 }
603 }
604 }
605 #[doc(alias = "g_socket_send_with_blocking")]
606 fn send_with_blocking<B: AsRef<[u8]>, C: IsA<Cancellable>>(
607 &self,
608 buffer: B,
609 blocking: bool,
610 cancellable: Option<&C>,
611 ) -> Result<usize, glib::Error> {
612 let cancellable = cancellable.map(|c| c.as_ref());
613 let gcancellable = cancellable.to_glib_none();
614 let (count, buffer_ptr) = {
615 let slice = buffer.as_ref();
616 (slice.len(), slice.as_ptr())
617 };
618 unsafe {
619 let mut error = ptr::null_mut();
620 let ret = ffi::g_socket_send_with_blocking(
621 self.as_ref().to_glib_none().0,
622 mut_override(buffer_ptr),
623 count,
624 blocking.into_glib(),
625 gcancellable.0,
626 &mut error,
627 );
628 if error.is_null() {
629 Ok(ret as usize)
630 } else {
631 Err(from_glib_full(error))
632 }
633 }
634 }
635
636 #[cfg(unix)]
637 #[cfg_attr(docsrs, doc(cfg(unix)))]
638 #[doc(alias = "get_fd")]
639 #[doc(alias = "g_socket_get_fd")]
640 fn fd<T: FromRawFd>(&self) -> T {
641 unsafe { FromRawFd::from_raw_fd(ffi::g_socket_get_fd(self.as_ref().to_glib_none().0)) }
642 }
643
644 #[cfg(windows)]
645 #[cfg_attr(docsrs, doc(cfg(windows)))]
646 #[doc(alias = "get_socket")]
647 #[doc(alias = "g_socket_get_fd")]
648 fn socket<T: FromRawSocket>(&self) -> T {
649 unsafe {
650 FromRawSocket::from_raw_socket(ffi::g_socket_get_fd(self.as_ref().to_glib_none().0) as _)
651 }
652 }
653
654 #[doc(alias = "g_socket_create_source")]
655 fn create_source<F, C>(
656 &self,
657 condition: glib::IOCondition,
658 cancellable: Option<&C>,
659 name: Option<&str>,
660 priority: glib::Priority,
661 func: F,
662 ) -> glib::Source
663 where
664 F: FnMut(&Self, glib::IOCondition) -> glib::ControlFlow + 'static,
665 C: IsA<Cancellable>,
666 {
667 unsafe extern "C" fn trampoline<
668 O: IsA<Socket>,
669 F: FnMut(&O, glib::IOCondition) -> glib::ControlFlow + 'static,
670 >(
671 socket: *mut ffi::GSocket,
672 condition: glib::ffi::GIOCondition,
673 func: glib::ffi::gpointer,
674 ) -> glib::ffi::gboolean {
675 let func: &RefCell<F> = &*(func as *const RefCell<F>);
676 let mut func = func.borrow_mut();
677 (*func)(
678 Socket::from_glib_borrow(socket).unsafe_cast_ref(),
679 from_glib(condition),
680 )
681 .into_glib()
682 }
683 unsafe extern "C" fn destroy_closure<F>(ptr: glib::ffi::gpointer) {
684 let _ = Box::<RefCell<F>>::from_raw(ptr as *mut _);
685 }
686 let cancellable = cancellable.map(|c| c.as_ref());
687 let gcancellable = cancellable.to_glib_none();
688 unsafe {
689 let source = ffi::g_socket_create_source(
690 self.as_ref().to_glib_none().0,
691 condition.into_glib(),
692 gcancellable.0,
693 );
694 let trampoline = trampoline::<Self, F> as glib::ffi::gpointer;
695 glib::ffi::g_source_set_callback(
696 source,
697 Some(transmute::<
698 glib::ffi::gpointer,
699 unsafe extern "C" fn(glib::ffi::gpointer) -> glib::ffi::gboolean,
700 >(trampoline)),
701 Box::into_raw(Box::new(RefCell::new(func))) as glib::ffi::gpointer,
702 Some(destroy_closure::<F>),
703 );
704 glib::ffi::g_source_set_priority(source, priority.into_glib());
705
706 if let Some(name) = name {
707 glib::ffi::g_source_set_name(source, name.to_glib_none().0);
708 }
709
710 from_glib_full(source)
711 }
712 }
713
714 fn create_source_future<C: IsA<Cancellable>>(
715 &self,
716 condition: glib::IOCondition,
717 cancellable: Option<&C>,
718 priority: glib::Priority,
719 ) -> Pin<Box<dyn std::future::Future<Output = glib::IOCondition> + 'static>> {
720 let cancellable: Option<Cancellable> = cancellable.map(|c| c.as_ref()).cloned();
721
722 let obj = self.clone();
723 Box::pin(glib::SourceFuture::new(move |send| {
724 let mut send = Some(send);
725 obj.create_source(
726 condition,
727 cancellable.as_ref(),
728 None,
729 priority,
730 move |_, condition| {
731 let _ = send.take().unwrap().send(condition);
732 glib::ControlFlow::Break
733 },
734 )
735 }))
736 }
737
738 fn create_source_stream<C: IsA<Cancellable>>(
739 &self,
740 condition: glib::IOCondition,
741 cancellable: Option<&C>,
742 priority: glib::Priority,
743 ) -> Pin<Box<dyn Stream<Item = glib::IOCondition> + 'static>> {
744 let cancellable: Option<Cancellable> = cancellable.map(|c| c.as_ref()).cloned();
745
746 let obj = self.clone();
747 Box::pin(glib::SourceStream::new(move |send| {
748 let send = Some(send);
749 obj.create_source(
750 condition,
751 cancellable.as_ref(),
752 None,
753 priority,
754 move |_, condition| {
755 if send.as_ref().unwrap().unbounded_send(condition).is_err() {
756 glib::ControlFlow::Break
757 } else {
758 glib::ControlFlow::Continue
759 }
760 },
761 )
762 }))
763 }
764}
765
766impl<O: IsA<Socket>> SocketExtManual for O {}
767
768#[cfg(all(docsrs, not(unix)))]
769pub trait IntoRawFd {
770 fn into_raw_fd(self) -> libc::c_int;
771}
772
773#[cfg(all(docsrs, not(unix)))]
774pub trait FromRawFd {
775 unsafe fn from_raw_fd(fd: libc::c_int) -> Self;
776}
777
778#[cfg(all(docsrs, not(unix)))]
779pub trait AsRawFd {
780 fn as_raw_fd(&self) -> RawFd;
781}
782
783#[cfg(all(docsrs, not(unix)))]
784pub type RawFd = libc::c_int;
785
786#[cfg(all(docsrs, not(windows)))]
787pub trait IntoRawSocket {
788 fn into_raw_socket(self) -> u64;
789}
790
791#[cfg(all(docsrs, not(windows)))]
792pub trait FromRawSocket {
793 unsafe fn from_raw_socket(sock: u64) -> Self;
794}
795
796#[cfg(all(docsrs, not(windows)))]
797pub trait AsRawSocket {
798 fn as_raw_socket(&self) -> RawSocket;
799}
800
801#[cfg(all(docsrs, not(windows)))]
802pub type RawSocket = *mut std::os::raw::c_void;
803
804#[cfg(test)]
805mod tests {
806 #[test]
807 #[cfg(unix)]
808 fn socket_messages() {
809 use std::{io, os::unix::io::AsRawFd};
810
811 use super::Socket;
812 use crate::{prelude::*, Cancellable, UnixFDMessage};
813
814 let mut fds = [0 as libc::c_int; 2];
815 let (out_sock, in_sock) = unsafe {
816 let ret = libc::socketpair(libc::AF_UNIX, libc::SOCK_STREAM, 0, fds.as_mut_ptr());
817 if ret != 0 {
818 panic!("{}", io::Error::last_os_error());
819 }
820 (
821 Socket::from_fd(fds[0]).unwrap(),
822 Socket::from_fd(fds[1]).unwrap(),
823 )
824 };
825
826 let fd_msg = UnixFDMessage::new();
827 fd_msg.append_fd(out_sock.as_raw_fd()).unwrap();
828 let vs = [super::OutputVector::new(&[0])];
829 let ctrl_msgs = [fd_msg.upcast()];
830 let mut out_msg = [super::OutputMessage::new(
831 crate::SocketAddress::NONE,
832 vs.as_slice(),
833 ctrl_msgs.as_slice(),
834 )];
835 let written = super::SocketExtManual::send_messages(
836 &out_sock,
837 out_msg.as_mut_slice(),
838 0,
839 Cancellable::NONE,
840 )
841 .unwrap();
842 assert_eq!(written, 1);
843 assert_eq!(out_msg[0].bytes_sent(), 1);
844
845 let mut v = [0u8];
846 let mut vs = [super::InputVector::new(v.as_mut_slice())];
847 let mut ctrl_msgs = super::SocketControlMessages::new();
848 let mut in_msg = [super::InputMessage::new(
849 None,
850 vs.as_mut_slice(),
851 Some(&mut ctrl_msgs),
852 )];
853 let received = super::SocketExtManual::receive_messages(
854 &in_sock,
855 in_msg.as_mut_slice(),
856 0,
857 Cancellable::NONE,
858 )
859 .unwrap();
860
861 assert_eq!(received, 1);
862 assert_eq!(in_msg[0].bytes_received(), 1);
863 assert_eq!(ctrl_msgs.len(), 1);
864 let fds = ctrl_msgs[0]
865 .downcast_ref::<UnixFDMessage>()
866 .unwrap()
867 .fd_list();
868 assert_eq!(fds.length(), 1);
869 }
870 #[test]
871 #[cfg(unix)]
872 fn dgram_socket_messages() {
873 use super::Socket;
874 use crate::{prelude::*, Cancellable};
875
876 let addr = crate::InetSocketAddress::from_string("127.0.0.1", 28351).unwrap();
877
878 let out_sock = Socket::new(
879 crate::SocketFamily::Ipv4,
880 crate::SocketType::Datagram,
881 crate::SocketProtocol::Udp,
882 )
883 .unwrap();
884 let in_sock = Socket::new(
885 crate::SocketFamily::Ipv4,
886 crate::SocketType::Datagram,
887 crate::SocketProtocol::Udp,
888 )
889 .unwrap();
890 in_sock.bind(&addr, true).unwrap();
891
892 const DATA: [u8; std::mem::size_of::<u64>()] = 1234u64.to_be_bytes();
893 let out_vec = DATA;
894 let out_vecs = [super::OutputVector::new(out_vec.as_slice())];
895 let mut out_msg = [super::OutputMessage::new(
896 Some(&addr),
897 out_vecs.as_slice(),
898 &[],
899 )];
900 let written = super::SocketExtManual::send_messages(
901 &out_sock,
902 out_msg.as_mut_slice(),
903 0,
904 Cancellable::NONE,
905 )
906 .unwrap();
907 assert_eq!(written, 1);
908 assert_eq!(out_msg[0].bytes_sent() as usize, out_vec.len());
909
910 let mut in_addr = None;
911 let mut in_vec = [0u8; DATA.len()];
912 let mut in_vecs = [super::InputVector::new(in_vec.as_mut_slice())];
913 let mut in_msg = [super::InputMessage::new(
914 Some(&mut in_addr),
915 in_vecs.as_mut_slice(),
916 None,
917 )];
918 let received = super::SocketExtManual::receive_messages(
919 &in_sock,
920 in_msg.as_mut_slice(),
921 0,
922 Cancellable::NONE,
923 )
924 .unwrap();
925
926 assert_eq!(received, 1);
927 assert_eq!(in_msg[0].bytes_received(), in_vec.len());
928 assert_eq!(in_vec, out_vec);
929 let in_addr = in_addr
930 .unwrap()
931 .downcast::<crate::InetSocketAddress>()
932 .unwrap();
933 assert_eq!(in_addr.address().to_str(), addr.address().to_str());
934 }
935}