gio/
socket.rs

1// Take a look at the license at the top of the repository in the LICENSE file.
2
3#[cfg(unix)]
4use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
5#[cfg(windows)]
6use std::os::windows::io::{AsRawSocket, FromRawSocket, IntoRawSocket, RawSocket};
7#[cfg(feature = "v2_60")]
8use std::time::Duration;
9use std::{cell::RefCell, marker::PhantomData, mem::transmute, pin::Pin, ptr};
10
11use futures_core::stream::Stream;
12use glib::{prelude::*, translate::*, Slice};
13
14#[cfg(feature = "v2_60")]
15use crate::PollableReturn;
16use crate::{ffi, Cancellable, Socket, SocketAddress, SocketControlMessage};
17
18impl Socket {
19    #[cfg(unix)]
20    #[cfg_attr(docsrs, doc(cfg(unix)))]
21    #[allow(clippy::missing_safety_doc)]
22    pub unsafe fn from_fd(fd: impl IntoRawFd) -> Result<Socket, glib::Error> {
23        let fd = fd.into_raw_fd();
24        let mut error = ptr::null_mut();
25        let ret = ffi::g_socket_new_from_fd(fd, &mut error);
26        if error.is_null() {
27            Ok(from_glib_full(ret))
28        } else {
29            Err(from_glib_full(error))
30        }
31    }
32    #[cfg(windows)]
33    #[cfg_attr(docsrs, doc(cfg(windows)))]
34    #[allow(clippy::missing_safety_doc)]
35    pub unsafe fn from_socket(socket: impl IntoRawSocket) -> Result<Socket, glib::Error> {
36        let socket = socket.into_raw_socket();
37        let mut error = ptr::null_mut();
38        let ret = ffi::g_socket_new_from_fd(socket as i32, &mut error);
39        if error.is_null() {
40            Ok(from_glib_full(ret))
41        } else {
42            Err(from_glib_full(error))
43        }
44    }
45}
46
47#[cfg(unix)]
48#[cfg_attr(docsrs, doc(cfg(unix)))]
49impl AsRawFd for Socket {
50    fn as_raw_fd(&self) -> RawFd {
51        unsafe { ffi::g_socket_get_fd(self.to_glib_none().0) as _ }
52    }
53}
54
55#[cfg(windows)]
56#[cfg_attr(docsrs, doc(cfg(windows)))]
57impl AsRawSocket for Socket {
58    fn as_raw_socket(&self) -> RawSocket {
59        unsafe { ffi::g_socket_get_fd(self.to_glib_none().0) as _ }
60    }
61}
62
63#[doc(alias = "GInputVector")]
64#[repr(transparent)]
65#[derive(Debug)]
66pub struct InputVector<'v> {
67    vector: ffi::GInputVector,
68    buffer: PhantomData<&'v mut [u8]>,
69}
70
71impl<'v> InputVector<'v> {
72    #[inline]
73    pub fn new(buffer: &'v mut [u8]) -> Self {
74        Self {
75            vector: ffi::GInputVector {
76                buffer: buffer.as_mut_ptr() as *mut _,
77                size: buffer.len(),
78            },
79            buffer: PhantomData,
80        }
81    }
82}
83
84unsafe impl Send for InputVector<'_> {}
85unsafe impl Sync for InputVector<'_> {}
86
87impl std::ops::Deref for InputVector<'_> {
88    type Target = [u8];
89
90    #[inline]
91    fn deref(&self) -> &Self::Target {
92        unsafe { std::slice::from_raw_parts(self.vector.buffer as *const _, self.vector.size) }
93    }
94}
95
96impl std::ops::DerefMut for InputVector<'_> {
97    #[inline]
98    fn deref_mut(&mut self) -> &mut Self::Target {
99        unsafe { std::slice::from_raw_parts_mut(self.vector.buffer as *mut _, self.vector.size) }
100    }
101}
102
103#[derive(Debug)]
104pub struct SocketControlMessages {
105    ptr: *mut *mut ffi::GSocketControlMessage,
106    len: u32,
107}
108
109impl SocketControlMessages {
110    #[inline]
111    pub const fn new() -> Self {
112        Self {
113            ptr: ptr::null_mut(),
114            len: 0,
115        }
116    }
117}
118
119impl AsRef<[SocketControlMessage]> for SocketControlMessages {
120    #[inline]
121    fn as_ref(&self) -> &[SocketControlMessage] {
122        unsafe { std::slice::from_raw_parts(self.ptr as *const _, self.len as usize) }
123    }
124}
125
126impl std::ops::Deref for SocketControlMessages {
127    type Target = [SocketControlMessage];
128
129    #[inline]
130    fn deref(&self) -> &Self::Target {
131        self.as_ref()
132    }
133}
134
135impl Default for SocketControlMessages {
136    #[inline]
137    fn default() -> Self {
138        Self::new()
139    }
140}
141
142impl Drop for SocketControlMessages {
143    #[inline]
144    fn drop(&mut self) {
145        unsafe {
146            let _: Slice<SocketControlMessage> =
147                Slice::from_glib_full_num(self.ptr as *mut _, self.len as usize);
148        }
149    }
150}
151
152#[doc(alias = "GInputMessage")]
153#[repr(transparent)]
154#[derive(Debug)]
155pub struct InputMessage<'m> {
156    message: ffi::GInputMessage,
157    address: PhantomData<Option<&'m mut Option<SocketAddress>>>,
158    vectors: PhantomData<&'m mut [InputVector<'m>]>,
159    control_messages: PhantomData<Option<&'m mut SocketControlMessages>>,
160}
161
162impl<'m> InputMessage<'m> {
163    pub fn new(
164        mut address: Option<&'m mut Option<SocketAddress>>,
165        vectors: &'m mut [InputVector<'m>],
166        control_messages: Option<&'m mut SocketControlMessages>,
167    ) -> Self {
168        let address = address
169            .as_mut()
170            .map(|a| {
171                assert!(a.is_none());
172                *a as *mut _ as *mut _
173            })
174            .unwrap_or_else(ptr::null_mut);
175        let (control_messages, num_control_messages) = control_messages
176            .map(|c| (&mut c.ptr as *mut _, &mut c.len as *mut _))
177            .unwrap_or_else(|| (ptr::null_mut(), ptr::null_mut()));
178        Self {
179            message: ffi::GInputMessage {
180                address,
181                vectors: vectors.as_mut_ptr() as *mut ffi::GInputVector,
182                num_vectors: vectors.len().try_into().unwrap(),
183                bytes_received: 0,
184                flags: 0,
185                control_messages,
186                num_control_messages,
187            },
188            address: PhantomData,
189            vectors: PhantomData,
190            control_messages: PhantomData,
191        }
192    }
193    #[inline]
194    pub fn vectors(&mut self) -> &'m mut [InputVector<'m>] {
195        unsafe {
196            std::slice::from_raw_parts_mut(
197                self.message.vectors as *mut _,
198                self.message.num_vectors as usize,
199            )
200        }
201    }
202    #[inline]
203    pub const fn flags(&self) -> i32 {
204        self.message.flags
205    }
206    #[inline]
207    pub const fn bytes_received(&self) -> usize {
208        self.message.bytes_received
209    }
210}
211
212#[doc(alias = "GOutputVector")]
213#[repr(transparent)]
214#[derive(Debug)]
215pub struct OutputVector<'v> {
216    vector: ffi::GOutputVector,
217    buffer: PhantomData<&'v [u8]>,
218}
219
220impl<'v> OutputVector<'v> {
221    #[inline]
222    pub const fn new(buffer: &'v [u8]) -> Self {
223        Self {
224            vector: ffi::GOutputVector {
225                buffer: buffer.as_ptr() as *const _,
226                size: buffer.len(),
227            },
228            buffer: PhantomData,
229        }
230    }
231}
232
233unsafe impl Send for OutputVector<'_> {}
234unsafe impl Sync for OutputVector<'_> {}
235
236impl std::ops::Deref for OutputVector<'_> {
237    type Target = [u8];
238
239    #[inline]
240    fn deref(&self) -> &Self::Target {
241        unsafe { std::slice::from_raw_parts(self.vector.buffer as *const _, self.vector.size) }
242    }
243}
244
245#[doc(alias = "GOutputMessage")]
246#[repr(transparent)]
247#[derive(Debug)]
248pub struct OutputMessage<'m> {
249    message: ffi::GOutputMessage,
250    address: PhantomData<Option<&'m SocketAddress>>,
251    vectors: PhantomData<&'m [OutputVector<'m>]>,
252    control_messages: PhantomData<&'m [SocketControlMessage]>,
253}
254
255impl<'m> OutputMessage<'m> {
256    pub fn new<A: IsA<SocketAddress>>(
257        address: Option<&'m A>,
258        vectors: &'m [OutputVector<'m>],
259        control_messages: &'m [SocketControlMessage],
260    ) -> Self {
261        Self {
262            message: ffi::GOutputMessage {
263                address: address
264                    .map(|a| a.upcast_ref::<SocketAddress>().as_ptr())
265                    .unwrap_or_else(ptr::null_mut),
266                vectors: mut_override(vectors.as_ptr() as *const ffi::GOutputVector),
267                num_vectors: vectors.len().try_into().unwrap(),
268                bytes_sent: 0,
269                control_messages: control_messages.as_ptr() as *mut _,
270                num_control_messages: control_messages.len().try_into().unwrap(),
271            },
272            address: PhantomData,
273            vectors: PhantomData,
274            control_messages: PhantomData,
275        }
276    }
277    #[inline]
278    pub fn vectors(&self) -> &'m [OutputVector<'m>] {
279        unsafe {
280            std::slice::from_raw_parts(
281                self.message.vectors as *const _,
282                self.message.num_vectors as usize,
283            )
284        }
285    }
286    #[inline]
287    pub fn bytes_sent(&self) -> u32 {
288        self.message.bytes_sent
289    }
290}
291
292mod sealed {
293    pub trait Sealed {}
294    impl<T: super::IsA<super::Socket>> Sealed for T {}
295}
296
297pub trait SocketExtManual: sealed::Sealed + IsA<Socket> + Sized {
298    #[doc(alias = "g_socket_receive")]
299    fn receive<B: AsMut<[u8]>, C: IsA<Cancellable>>(
300        &self,
301        mut buffer: B,
302        cancellable: Option<&C>,
303    ) -> Result<usize, glib::Error> {
304        let cancellable = cancellable.map(|c| c.as_ref());
305        let gcancellable = cancellable.to_glib_none();
306        let buffer = buffer.as_mut();
307        let buffer_ptr = buffer.as_mut_ptr();
308        let count = buffer.len();
309        unsafe {
310            let mut error = ptr::null_mut();
311            let ret = ffi::g_socket_receive(
312                self.as_ref().to_glib_none().0,
313                buffer_ptr,
314                count,
315                gcancellable.0,
316                &mut error,
317            );
318            if error.is_null() {
319                Ok(ret as usize)
320            } else {
321                Err(from_glib_full(error))
322            }
323        }
324    }
325    #[doc(alias = "g_socket_receive_from")]
326    fn receive_from<B: AsMut<[u8]>, C: IsA<Cancellable>>(
327        &self,
328        mut buffer: B,
329        cancellable: Option<&C>,
330    ) -> Result<(usize, SocketAddress), glib::Error> {
331        let cancellable = cancellable.map(|c| c.as_ref());
332        let gcancellable = cancellable.to_glib_none();
333        let buffer = buffer.as_mut();
334        let buffer_ptr = buffer.as_mut_ptr();
335        let count = buffer.len();
336        unsafe {
337            let mut error = ptr::null_mut();
338            let mut addr_ptr = ptr::null_mut();
339
340            let ret = ffi::g_socket_receive_from(
341                self.as_ref().to_glib_none().0,
342                &mut addr_ptr,
343                buffer_ptr,
344                count,
345                gcancellable.0,
346                &mut error,
347            );
348            if error.is_null() {
349                Ok((ret as usize, from_glib_full(addr_ptr)))
350            } else {
351                Err(from_glib_full(error))
352            }
353        }
354    }
355    #[doc(alias = "g_socket_receive_message")]
356    fn receive_message<C: IsA<Cancellable>>(
357        &self,
358        mut address: Option<&mut Option<SocketAddress>>,
359        vectors: &mut [InputVector],
360        control_messages: Option<&mut SocketControlMessages>,
361        mut flags: i32,
362        cancellable: Option<&C>,
363    ) -> Result<(usize, i32), glib::Error> {
364        let cancellable = cancellable.map(|c| c.as_ref());
365        let address = address
366            .as_mut()
367            .map(|a| {
368                assert!(a.is_none());
369                *a as *mut _ as *mut _
370            })
371            .unwrap_or_else(ptr::null_mut);
372        let (control_messages, num_control_messages) = control_messages
373            .map(|c| (&mut c.ptr as *mut _, &mut c.len as *mut _ as *mut _))
374            .unwrap_or_else(|| (ptr::null_mut(), ptr::null_mut()));
375        unsafe {
376            let mut error = ptr::null_mut();
377
378            let received = ffi::g_socket_receive_message(
379                self.as_ref().to_glib_none().0,
380                address,
381                vectors.as_mut_ptr() as *mut ffi::GInputVector,
382                vectors.len().try_into().unwrap(),
383                control_messages,
384                num_control_messages,
385                &mut flags,
386                cancellable.to_glib_none().0,
387                &mut error,
388            );
389            if error.is_null() {
390                Ok((received as usize, flags))
391            } else {
392                Err(from_glib_full(error))
393            }
394        }
395    }
396    #[doc(alias = "g_socket_receive_messages")]
397    fn receive_messages<C: IsA<Cancellable>>(
398        &self,
399        messages: &mut [InputMessage],
400        flags: i32,
401        cancellable: Option<&C>,
402    ) -> Result<usize, glib::Error> {
403        let cancellable = cancellable.map(|c| c.as_ref());
404        unsafe {
405            let mut error = ptr::null_mut();
406
407            let count = ffi::g_socket_receive_messages(
408                self.as_ref().to_glib_none().0,
409                messages.as_mut_ptr() as *mut _,
410                messages.len().try_into().unwrap(),
411                flags,
412                cancellable.to_glib_none().0,
413                &mut error,
414            );
415            if error.is_null() {
416                Ok(count as usize)
417            } else {
418                Err(from_glib_full(error))
419            }
420        }
421    }
422    #[doc(alias = "g_socket_receive_with_blocking")]
423    fn receive_with_blocking<B: AsMut<[u8]>, C: IsA<Cancellable>>(
424        &self,
425        mut buffer: B,
426        blocking: bool,
427        cancellable: Option<&C>,
428    ) -> Result<usize, glib::Error> {
429        let cancellable = cancellable.map(|c| c.as_ref());
430        let gcancellable = cancellable.to_glib_none();
431        let buffer = buffer.as_mut();
432        let buffer_ptr = buffer.as_mut_ptr();
433        let count = buffer.len();
434        unsafe {
435            let mut error = ptr::null_mut();
436            let ret = ffi::g_socket_receive_with_blocking(
437                self.as_ref().to_glib_none().0,
438                buffer_ptr,
439                count,
440                blocking.into_glib(),
441                gcancellable.0,
442                &mut error,
443            );
444            if error.is_null() {
445                Ok(ret as usize)
446            } else {
447                Err(from_glib_full(error))
448            }
449        }
450    }
451
452    #[doc(alias = "g_socket_send")]
453    fn send<B: AsRef<[u8]>, C: IsA<Cancellable>>(
454        &self,
455        buffer: B,
456        cancellable: Option<&C>,
457    ) -> Result<usize, glib::Error> {
458        let cancellable = cancellable.map(|c| c.as_ref());
459        let gcancellable = cancellable.to_glib_none();
460        let (count, buffer_ptr) = {
461            let slice = buffer.as_ref();
462            (slice.len(), slice.as_ptr())
463        };
464        unsafe {
465            let mut error = ptr::null_mut();
466            let ret = ffi::g_socket_send(
467                self.as_ref().to_glib_none().0,
468                mut_override(buffer_ptr),
469                count,
470                gcancellable.0,
471                &mut error,
472            );
473            if error.is_null() {
474                Ok(ret as usize)
475            } else {
476                Err(from_glib_full(error))
477            }
478        }
479    }
480    #[doc(alias = "g_socket_send_message")]
481    fn send_message<P: IsA<SocketAddress>, C: IsA<Cancellable>>(
482        &self,
483        address: Option<&P>,
484        vectors: &[OutputVector],
485        messages: &[SocketControlMessage],
486        flags: i32,
487        cancellable: Option<&C>,
488    ) -> Result<usize, glib::Error> {
489        let cancellable = cancellable.map(|c| c.as_ref());
490        unsafe {
491            let mut error = ptr::null_mut();
492            let ret = ffi::g_socket_send_message(
493                self.as_ref().to_glib_none().0,
494                address.map(|p| p.as_ref()).to_glib_none().0,
495                vectors.as_ptr() as *mut ffi::GOutputVector,
496                vectors.len().try_into().unwrap(),
497                messages.as_ptr() as *mut _,
498                messages.len().try_into().unwrap(),
499                flags,
500                cancellable.to_glib_none().0,
501                &mut error,
502            );
503            if error.is_null() {
504                Ok(ret as usize)
505            } else {
506                Err(from_glib_full(error))
507            }
508        }
509    }
510    #[cfg(feature = "v2_60")]
511    #[cfg_attr(docsrs, doc(cfg(feature = "v2_60")))]
512    #[doc(alias = "g_socket_send_message_with_timeout")]
513    fn send_message_with_timeout<P: IsA<SocketAddress>, C: IsA<Cancellable>>(
514        &self,
515        address: Option<&P>,
516        vectors: &[OutputVector],
517        messages: &[SocketControlMessage],
518        flags: i32,
519        timeout: Option<Duration>,
520        cancellable: Option<&C>,
521    ) -> Result<(PollableReturn, usize), glib::Error> {
522        let cancellable = cancellable.map(|c| c.as_ref());
523        unsafe {
524            let mut error = ptr::null_mut();
525            let mut bytes_written = 0;
526
527            let ret = ffi::g_socket_send_message_with_timeout(
528                self.as_ref().to_glib_none().0,
529                address.map(|p| p.as_ref()).to_glib_none().0,
530                vectors.as_ptr() as *mut ffi::GOutputVector,
531                vectors.len().try_into().unwrap(),
532                messages.as_ptr() as *mut _,
533                messages.len().try_into().unwrap(),
534                flags,
535                timeout
536                    .map(|t| t.as_micros().try_into().unwrap())
537                    .unwrap_or(-1),
538                &mut bytes_written,
539                cancellable.to_glib_none().0,
540                &mut error,
541            );
542            if error.is_null() {
543                Ok((from_glib(ret), bytes_written))
544            } else {
545                Err(from_glib_full(error))
546            }
547        }
548    }
549    #[doc(alias = "g_socket_send_messages")]
550    fn send_messages<C: IsA<Cancellable>>(
551        &self,
552        messages: &mut [OutputMessage],
553        flags: i32,
554        cancellable: Option<&C>,
555    ) -> Result<usize, glib::Error> {
556        let cancellable = cancellable.map(|c| c.as_ref());
557        unsafe {
558            let mut error = ptr::null_mut();
559            let count = ffi::g_socket_send_messages(
560                self.as_ref().to_glib_none().0,
561                messages.as_mut_ptr() as *mut _,
562                messages.len().try_into().unwrap(),
563                flags,
564                cancellable.to_glib_none().0,
565                &mut error,
566            );
567            if error.is_null() {
568                Ok(count as usize)
569            } else {
570                Err(from_glib_full(error))
571            }
572        }
573    }
574    #[doc(alias = "g_socket_send_to")]
575    fn send_to<B: AsRef<[u8]>, P: IsA<SocketAddress>, C: IsA<Cancellable>>(
576        &self,
577        address: Option<&P>,
578        buffer: B,
579        cancellable: Option<&C>,
580    ) -> Result<usize, glib::Error> {
581        let cancellable = cancellable.map(|c| c.as_ref());
582        let gcancellable = cancellable.to_glib_none();
583        let (count, buffer_ptr) = {
584            let slice = buffer.as_ref();
585            (slice.len(), slice.as_ptr())
586        };
587        unsafe {
588            let mut error = ptr::null_mut();
589
590            let ret = ffi::g_socket_send_to(
591                self.as_ref().to_glib_none().0,
592                address.map(|p| p.as_ref()).to_glib_none().0,
593                mut_override(buffer_ptr),
594                count,
595                gcancellable.0,
596                &mut error,
597            );
598            if error.is_null() {
599                Ok(ret as usize)
600            } else {
601                Err(from_glib_full(error))
602            }
603        }
604    }
605    #[doc(alias = "g_socket_send_with_blocking")]
606    fn send_with_blocking<B: AsRef<[u8]>, C: IsA<Cancellable>>(
607        &self,
608        buffer: B,
609        blocking: bool,
610        cancellable: Option<&C>,
611    ) -> Result<usize, glib::Error> {
612        let cancellable = cancellable.map(|c| c.as_ref());
613        let gcancellable = cancellable.to_glib_none();
614        let (count, buffer_ptr) = {
615            let slice = buffer.as_ref();
616            (slice.len(), slice.as_ptr())
617        };
618        unsafe {
619            let mut error = ptr::null_mut();
620            let ret = ffi::g_socket_send_with_blocking(
621                self.as_ref().to_glib_none().0,
622                mut_override(buffer_ptr),
623                count,
624                blocking.into_glib(),
625                gcancellable.0,
626                &mut error,
627            );
628            if error.is_null() {
629                Ok(ret as usize)
630            } else {
631                Err(from_glib_full(error))
632            }
633        }
634    }
635
636    #[cfg(unix)]
637    #[cfg_attr(docsrs, doc(cfg(unix)))]
638    #[doc(alias = "get_fd")]
639    #[doc(alias = "g_socket_get_fd")]
640    fn fd<T: FromRawFd>(&self) -> T {
641        unsafe { FromRawFd::from_raw_fd(ffi::g_socket_get_fd(self.as_ref().to_glib_none().0)) }
642    }
643
644    #[cfg(windows)]
645    #[cfg_attr(docsrs, doc(cfg(windows)))]
646    #[doc(alias = "get_socket")]
647    #[doc(alias = "g_socket_get_fd")]
648    fn socket<T: FromRawSocket>(&self) -> T {
649        unsafe {
650            FromRawSocket::from_raw_socket(ffi::g_socket_get_fd(self.as_ref().to_glib_none().0) as _)
651        }
652    }
653
654    #[doc(alias = "g_socket_create_source")]
655    fn create_source<F, C>(
656        &self,
657        condition: glib::IOCondition,
658        cancellable: Option<&C>,
659        name: Option<&str>,
660        priority: glib::Priority,
661        func: F,
662    ) -> glib::Source
663    where
664        F: FnMut(&Self, glib::IOCondition) -> glib::ControlFlow + 'static,
665        C: IsA<Cancellable>,
666    {
667        unsafe extern "C" fn trampoline<
668            O: IsA<Socket>,
669            F: FnMut(&O, glib::IOCondition) -> glib::ControlFlow + 'static,
670        >(
671            socket: *mut ffi::GSocket,
672            condition: glib::ffi::GIOCondition,
673            func: glib::ffi::gpointer,
674        ) -> glib::ffi::gboolean {
675            let func: &RefCell<F> = &*(func as *const RefCell<F>);
676            let mut func = func.borrow_mut();
677            (*func)(
678                Socket::from_glib_borrow(socket).unsafe_cast_ref(),
679                from_glib(condition),
680            )
681            .into_glib()
682        }
683        unsafe extern "C" fn destroy_closure<F>(ptr: glib::ffi::gpointer) {
684            let _ = Box::<RefCell<F>>::from_raw(ptr as *mut _);
685        }
686        let cancellable = cancellable.map(|c| c.as_ref());
687        let gcancellable = cancellable.to_glib_none();
688        unsafe {
689            let source = ffi::g_socket_create_source(
690                self.as_ref().to_glib_none().0,
691                condition.into_glib(),
692                gcancellable.0,
693            );
694            let trampoline = trampoline::<Self, F> as glib::ffi::gpointer;
695            glib::ffi::g_source_set_callback(
696                source,
697                Some(transmute::<
698                    glib::ffi::gpointer,
699                    unsafe extern "C" fn(glib::ffi::gpointer) -> glib::ffi::gboolean,
700                >(trampoline)),
701                Box::into_raw(Box::new(RefCell::new(func))) as glib::ffi::gpointer,
702                Some(destroy_closure::<F>),
703            );
704            glib::ffi::g_source_set_priority(source, priority.into_glib());
705
706            if let Some(name) = name {
707                glib::ffi::g_source_set_name(source, name.to_glib_none().0);
708            }
709
710            from_glib_full(source)
711        }
712    }
713
714    fn create_source_future<C: IsA<Cancellable>>(
715        &self,
716        condition: glib::IOCondition,
717        cancellable: Option<&C>,
718        priority: glib::Priority,
719    ) -> Pin<Box<dyn std::future::Future<Output = glib::IOCondition> + 'static>> {
720        let cancellable: Option<Cancellable> = cancellable.map(|c| c.as_ref()).cloned();
721
722        let obj = self.clone();
723        Box::pin(glib::SourceFuture::new(move |send| {
724            let mut send = Some(send);
725            obj.create_source(
726                condition,
727                cancellable.as_ref(),
728                None,
729                priority,
730                move |_, condition| {
731                    let _ = send.take().unwrap().send(condition);
732                    glib::ControlFlow::Break
733                },
734            )
735        }))
736    }
737
738    fn create_source_stream<C: IsA<Cancellable>>(
739        &self,
740        condition: glib::IOCondition,
741        cancellable: Option<&C>,
742        priority: glib::Priority,
743    ) -> Pin<Box<dyn Stream<Item = glib::IOCondition> + 'static>> {
744        let cancellable: Option<Cancellable> = cancellable.map(|c| c.as_ref()).cloned();
745
746        let obj = self.clone();
747        Box::pin(glib::SourceStream::new(move |send| {
748            let send = Some(send);
749            obj.create_source(
750                condition,
751                cancellable.as_ref(),
752                None,
753                priority,
754                move |_, condition| {
755                    if send.as_ref().unwrap().unbounded_send(condition).is_err() {
756                        glib::ControlFlow::Break
757                    } else {
758                        glib::ControlFlow::Continue
759                    }
760                },
761            )
762        }))
763    }
764}
765
766impl<O: IsA<Socket>> SocketExtManual for O {}
767
768#[cfg(all(docsrs, not(unix)))]
769pub trait IntoRawFd {
770    fn into_raw_fd(self) -> libc::c_int;
771}
772
773#[cfg(all(docsrs, not(unix)))]
774pub trait FromRawFd {
775    unsafe fn from_raw_fd(fd: libc::c_int) -> Self;
776}
777
778#[cfg(all(docsrs, not(unix)))]
779pub trait AsRawFd {
780    fn as_raw_fd(&self) -> RawFd;
781}
782
783#[cfg(all(docsrs, not(unix)))]
784pub type RawFd = libc::c_int;
785
786#[cfg(all(docsrs, not(windows)))]
787pub trait IntoRawSocket {
788    fn into_raw_socket(self) -> u64;
789}
790
791#[cfg(all(docsrs, not(windows)))]
792pub trait FromRawSocket {
793    unsafe fn from_raw_socket(sock: u64) -> Self;
794}
795
796#[cfg(all(docsrs, not(windows)))]
797pub trait AsRawSocket {
798    fn as_raw_socket(&self) -> RawSocket;
799}
800
801#[cfg(all(docsrs, not(windows)))]
802pub type RawSocket = *mut std::os::raw::c_void;
803
804#[cfg(test)]
805mod tests {
806    #[test]
807    #[cfg(unix)]
808    fn socket_messages() {
809        use std::{io, os::unix::io::AsRawFd};
810
811        use super::Socket;
812        use crate::{prelude::*, Cancellable, UnixFDMessage};
813
814        let mut fds = [0 as libc::c_int; 2];
815        let (out_sock, in_sock) = unsafe {
816            let ret = libc::socketpair(libc::AF_UNIX, libc::SOCK_STREAM, 0, fds.as_mut_ptr());
817            if ret != 0 {
818                panic!("{}", io::Error::last_os_error());
819            }
820            (
821                Socket::from_fd(fds[0]).unwrap(),
822                Socket::from_fd(fds[1]).unwrap(),
823            )
824        };
825
826        let fd_msg = UnixFDMessage::new();
827        fd_msg.append_fd(out_sock.as_raw_fd()).unwrap();
828        let vs = [super::OutputVector::new(&[0])];
829        let ctrl_msgs = [fd_msg.upcast()];
830        let mut out_msg = [super::OutputMessage::new(
831            crate::SocketAddress::NONE,
832            vs.as_slice(),
833            ctrl_msgs.as_slice(),
834        )];
835        let written = super::SocketExtManual::send_messages(
836            &out_sock,
837            out_msg.as_mut_slice(),
838            0,
839            Cancellable::NONE,
840        )
841        .unwrap();
842        assert_eq!(written, 1);
843        assert_eq!(out_msg[0].bytes_sent(), 1);
844
845        let mut v = [0u8];
846        let mut vs = [super::InputVector::new(v.as_mut_slice())];
847        let mut ctrl_msgs = super::SocketControlMessages::new();
848        let mut in_msg = [super::InputMessage::new(
849            None,
850            vs.as_mut_slice(),
851            Some(&mut ctrl_msgs),
852        )];
853        let received = super::SocketExtManual::receive_messages(
854            &in_sock,
855            in_msg.as_mut_slice(),
856            0,
857            Cancellable::NONE,
858        )
859        .unwrap();
860
861        assert_eq!(received, 1);
862        assert_eq!(in_msg[0].bytes_received(), 1);
863        assert_eq!(ctrl_msgs.len(), 1);
864        let fds = ctrl_msgs[0]
865            .downcast_ref::<UnixFDMessage>()
866            .unwrap()
867            .fd_list();
868        assert_eq!(fds.length(), 1);
869    }
870    #[test]
871    #[cfg(unix)]
872    fn dgram_socket_messages() {
873        use super::Socket;
874        use crate::{prelude::*, Cancellable};
875
876        let addr = crate::InetSocketAddress::from_string("127.0.0.1", 28351).unwrap();
877
878        let out_sock = Socket::new(
879            crate::SocketFamily::Ipv4,
880            crate::SocketType::Datagram,
881            crate::SocketProtocol::Udp,
882        )
883        .unwrap();
884        let in_sock = Socket::new(
885            crate::SocketFamily::Ipv4,
886            crate::SocketType::Datagram,
887            crate::SocketProtocol::Udp,
888        )
889        .unwrap();
890        in_sock.bind(&addr, true).unwrap();
891
892        const DATA: [u8; std::mem::size_of::<u64>()] = 1234u64.to_be_bytes();
893        let out_vec = DATA;
894        let out_vecs = [super::OutputVector::new(out_vec.as_slice())];
895        let mut out_msg = [super::OutputMessage::new(
896            Some(&addr),
897            out_vecs.as_slice(),
898            &[],
899        )];
900        let written = super::SocketExtManual::send_messages(
901            &out_sock,
902            out_msg.as_mut_slice(),
903            0,
904            Cancellable::NONE,
905        )
906        .unwrap();
907        assert_eq!(written, 1);
908        assert_eq!(out_msg[0].bytes_sent() as usize, out_vec.len());
909
910        let mut in_addr = None;
911        let mut in_vec = [0u8; DATA.len()];
912        let mut in_vecs = [super::InputVector::new(in_vec.as_mut_slice())];
913        let mut in_msg = [super::InputMessage::new(
914            Some(&mut in_addr),
915            in_vecs.as_mut_slice(),
916            None,
917        )];
918        let received = super::SocketExtManual::receive_messages(
919            &in_sock,
920            in_msg.as_mut_slice(),
921            0,
922            Cancellable::NONE,
923        )
924        .unwrap();
925
926        assert_eq!(received, 1);
927        assert_eq!(in_msg[0].bytes_received(), in_vec.len());
928        assert_eq!(in_vec, out_vec);
929        let in_addr = in_addr
930            .unwrap()
931            .downcast::<crate::InetSocketAddress>()
932            .unwrap();
933        assert_eq!(in_addr.address().to_str(), addr.address().to_str());
934    }
935}