1#[cfg(unix)]
4use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, FromRawFd, IntoRawFd, OwnedFd, RawFd};
5#[cfg(windows)]
6use std::os::windows::io::{
7 AsRawSocket, AsSocket, BorrowedSocket, FromRawSocket, IntoRawSocket, OwnedSocket, RawSocket,
8};
9#[cfg(feature = "v2_60")]
10use std::time::Duration;
11use std::{cell::RefCell, marker::PhantomData, mem::transmute, pin::Pin, ptr};
12
13use futures_core::stream::Stream;
14use glib::{prelude::*, translate::*, Slice};
15
16#[cfg(feature = "v2_60")]
17use crate::PollableReturn;
18use crate::{ffi, Cancellable, Socket, SocketAddress, SocketControlMessage};
19
20impl Socket {
21 #[cfg(unix)]
22 #[cfg_attr(docsrs, doc(cfg(unix)))]
23 #[doc(alias = "g_socket_new_from_fd")]
24 pub fn from_fd(fd: OwnedFd) -> Result<Socket, glib::Error> {
25 let fd = fd.into_raw_fd();
26 let mut error = ptr::null_mut();
27 unsafe {
28 let ret = ffi::g_socket_new_from_fd(fd, &mut error);
29 if error.is_null() {
30 Ok(from_glib_full(ret))
31 } else {
32 let _ = OwnedFd::from_raw_fd(fd);
33 Err(from_glib_full(error))
34 }
35 }
36 }
37 #[cfg(windows)]
38 #[cfg_attr(docsrs, doc(cfg(windows)))]
39 pub fn from_socket(socket: OwnedSocket) -> Result<Socket, glib::Error> {
40 let socket = socket.into_raw_socket();
41 let mut error = ptr::null_mut();
42 unsafe {
43 let ret = ffi::g_socket_new_from_fd(socket as i32, &mut error);
44 if error.is_null() {
45 Ok(from_glib_full(ret))
46 } else {
47 let _ = OwnedSocket::from_raw_socket(socket);
48 Err(from_glib_full(error))
49 }
50 }
51 }
52}
53
54#[cfg(unix)]
55#[cfg_attr(docsrs, doc(cfg(unix)))]
56impl AsRawFd for Socket {
57 fn as_raw_fd(&self) -> RawFd {
58 unsafe { ffi::g_socket_get_fd(self.to_glib_none().0) as _ }
59 }
60}
61
62#[cfg(unix)]
63#[cfg_attr(docsrs, doc(cfg(unix)))]
64impl AsFd for Socket {
65 fn as_fd(&self) -> BorrowedFd<'_> {
66 unsafe {
67 let raw_fd = self.as_raw_fd();
68 BorrowedFd::borrow_raw(raw_fd)
69 }
70 }
71}
72
73#[cfg(windows)]
74#[cfg_attr(docsrs, doc(cfg(windows)))]
75impl AsRawSocket for Socket {
76 fn as_raw_socket(&self) -> RawSocket {
77 unsafe { ffi::g_socket_get_fd(self.to_glib_none().0) as _ }
78 }
79}
80
81#[cfg(windows)]
82#[cfg_attr(docsrs, doc(cfg(windows)))]
83impl AsSocket for Socket {
84 fn as_socket(&self) -> BorrowedSocket<'_> {
85 unsafe {
86 let raw_socket = self.as_raw_socket();
87 BorrowedSocket::borrow_raw(raw_socket)
88 }
89 }
90}
91
92#[doc(alias = "GInputVector")]
93#[repr(transparent)]
94#[derive(Debug)]
95pub struct InputVector<'v> {
96 vector: ffi::GInputVector,
97 buffer: PhantomData<&'v mut [u8]>,
98}
99
100impl<'v> InputVector<'v> {
101 #[inline]
102 pub fn new(buffer: &'v mut [u8]) -> Self {
103 Self {
104 vector: ffi::GInputVector {
105 buffer: buffer.as_mut_ptr() as *mut _,
106 size: buffer.len(),
107 },
108 buffer: PhantomData,
109 }
110 }
111}
112
113unsafe impl Send for InputVector<'_> {}
114unsafe impl Sync for InputVector<'_> {}
115
116impl std::ops::Deref for InputVector<'_> {
117 type Target = [u8];
118
119 #[inline]
120 fn deref(&self) -> &Self::Target {
121 unsafe { std::slice::from_raw_parts(self.vector.buffer as *const _, self.vector.size) }
122 }
123}
124
125impl std::ops::DerefMut for InputVector<'_> {
126 #[inline]
127 fn deref_mut(&mut self) -> &mut Self::Target {
128 unsafe { std::slice::from_raw_parts_mut(self.vector.buffer as *mut _, self.vector.size) }
129 }
130}
131
132#[derive(Debug)]
133pub struct SocketControlMessages {
134 ptr: *mut *mut ffi::GSocketControlMessage,
135 len: u32,
136}
137
138impl SocketControlMessages {
139 #[inline]
140 pub const fn new() -> Self {
141 Self {
142 ptr: ptr::null_mut(),
143 len: 0,
144 }
145 }
146}
147
148impl AsRef<[SocketControlMessage]> for SocketControlMessages {
149 #[inline]
150 fn as_ref(&self) -> &[SocketControlMessage] {
151 unsafe { std::slice::from_raw_parts(self.ptr as *const _, self.len as usize) }
152 }
153}
154
155impl std::ops::Deref for SocketControlMessages {
156 type Target = [SocketControlMessage];
157
158 #[inline]
159 fn deref(&self) -> &Self::Target {
160 self.as_ref()
161 }
162}
163
164impl Default for SocketControlMessages {
165 #[inline]
166 fn default() -> Self {
167 Self::new()
168 }
169}
170
171impl Drop for SocketControlMessages {
172 #[inline]
173 fn drop(&mut self) {
174 unsafe {
175 let _: Slice<SocketControlMessage> =
176 Slice::from_glib_full_num(self.ptr as *mut _, self.len as usize);
177 }
178 }
179}
180
181#[doc(alias = "GInputMessage")]
182#[repr(transparent)]
183#[derive(Debug)]
184pub struct InputMessage<'m> {
185 message: ffi::GInputMessage,
186 address: PhantomData<Option<&'m mut Option<SocketAddress>>>,
187 vectors: PhantomData<&'m mut [InputVector<'m>]>,
188 control_messages: PhantomData<Option<&'m mut SocketControlMessages>>,
189}
190
191impl<'m> InputMessage<'m> {
192 pub fn new(
193 mut address: Option<&'m mut Option<SocketAddress>>,
194 vectors: &'m mut [InputVector<'m>],
195 control_messages: Option<&'m mut SocketControlMessages>,
196 ) -> Self {
197 let address = address
198 .as_mut()
199 .map(|a| {
200 assert!(a.is_none());
201 *a as *mut _ as *mut _
202 })
203 .unwrap_or_else(ptr::null_mut);
204 let (control_messages, num_control_messages) = control_messages
205 .map(|c| (&mut c.ptr as *mut _, &mut c.len as *mut _))
206 .unwrap_or_else(|| (ptr::null_mut(), ptr::null_mut()));
207 Self {
208 message: ffi::GInputMessage {
209 address,
210 vectors: vectors.as_mut_ptr() as *mut ffi::GInputVector,
211 num_vectors: vectors.len().try_into().unwrap(),
212 bytes_received: 0,
213 flags: 0,
214 control_messages,
215 num_control_messages,
216 },
217 address: PhantomData,
218 vectors: PhantomData,
219 control_messages: PhantomData,
220 }
221 }
222 #[inline]
223 pub fn vectors(&mut self) -> &'m mut [InputVector<'m>] {
224 unsafe {
225 std::slice::from_raw_parts_mut(
226 self.message.vectors as *mut _,
227 self.message.num_vectors as usize,
228 )
229 }
230 }
231 #[inline]
232 pub const fn flags(&self) -> i32 {
233 self.message.flags
234 }
235 #[inline]
236 pub const fn bytes_received(&self) -> usize {
237 self.message.bytes_received
238 }
239}
240
241#[doc(alias = "GOutputVector")]
242#[repr(transparent)]
243#[derive(Debug)]
244pub struct OutputVector<'v> {
245 vector: ffi::GOutputVector,
246 buffer: PhantomData<&'v [u8]>,
247}
248
249impl<'v> OutputVector<'v> {
250 #[inline]
251 pub const fn new(buffer: &'v [u8]) -> Self {
252 Self {
253 vector: ffi::GOutputVector {
254 buffer: buffer.as_ptr() as *const _,
255 size: buffer.len(),
256 },
257 buffer: PhantomData,
258 }
259 }
260}
261
262unsafe impl Send for OutputVector<'_> {}
263unsafe impl Sync for OutputVector<'_> {}
264
265impl std::ops::Deref for OutputVector<'_> {
266 type Target = [u8];
267
268 #[inline]
269 fn deref(&self) -> &Self::Target {
270 unsafe { std::slice::from_raw_parts(self.vector.buffer as *const _, self.vector.size) }
271 }
272}
273
274#[doc(alias = "GOutputMessage")]
275#[repr(transparent)]
276#[derive(Debug)]
277pub struct OutputMessage<'m> {
278 message: ffi::GOutputMessage,
279 address: PhantomData<Option<&'m SocketAddress>>,
280 vectors: PhantomData<&'m [OutputVector<'m>]>,
281 control_messages: PhantomData<&'m [SocketControlMessage]>,
282}
283
284impl<'m> OutputMessage<'m> {
285 pub fn new<A: IsA<SocketAddress>>(
286 address: Option<&'m A>,
287 vectors: &'m [OutputVector<'m>],
288 control_messages: &'m [SocketControlMessage],
289 ) -> Self {
290 Self {
291 message: ffi::GOutputMessage {
292 address: address
293 .map(|a| a.upcast_ref::<SocketAddress>().as_ptr())
294 .unwrap_or_else(ptr::null_mut),
295 vectors: mut_override(vectors.as_ptr() as *const ffi::GOutputVector),
296 num_vectors: vectors.len().try_into().unwrap(),
297 bytes_sent: 0,
298 control_messages: control_messages.as_ptr() as *mut _,
299 num_control_messages: control_messages.len().try_into().unwrap(),
300 },
301 address: PhantomData,
302 vectors: PhantomData,
303 control_messages: PhantomData,
304 }
305 }
306 #[inline]
307 pub fn vectors(&self) -> &'m [OutputVector<'m>] {
308 unsafe {
309 std::slice::from_raw_parts(
310 self.message.vectors as *const _,
311 self.message.num_vectors as usize,
312 )
313 }
314 }
315 #[inline]
316 pub fn bytes_sent(&self) -> u32 {
317 self.message.bytes_sent
318 }
319}
320
321pub trait SocketExtManual: IsA<Socket> + Sized {
322 #[doc(alias = "g_socket_receive")]
323 fn receive<B: AsMut<[u8]>, C: IsA<Cancellable>>(
324 &self,
325 mut buffer: B,
326 cancellable: Option<&C>,
327 ) -> Result<usize, glib::Error> {
328 let cancellable = cancellable.map(|c| c.as_ref());
329 let gcancellable = cancellable.to_glib_none();
330 let buffer = buffer.as_mut();
331 let buffer_ptr = buffer.as_mut_ptr();
332 let count = buffer.len();
333 unsafe {
334 let mut error = ptr::null_mut();
335 let ret = ffi::g_socket_receive(
336 self.as_ref().to_glib_none().0,
337 buffer_ptr,
338 count,
339 gcancellable.0,
340 &mut error,
341 );
342 if error.is_null() {
343 Ok(ret as usize)
344 } else {
345 Err(from_glib_full(error))
346 }
347 }
348 }
349 #[doc(alias = "g_socket_receive_from")]
350 fn receive_from<B: AsMut<[u8]>, C: IsA<Cancellable>>(
351 &self,
352 mut buffer: B,
353 cancellable: Option<&C>,
354 ) -> Result<(usize, SocketAddress), glib::Error> {
355 let cancellable = cancellable.map(|c| c.as_ref());
356 let gcancellable = cancellable.to_glib_none();
357 let buffer = buffer.as_mut();
358 let buffer_ptr = buffer.as_mut_ptr();
359 let count = buffer.len();
360 unsafe {
361 let mut error = ptr::null_mut();
362 let mut addr_ptr = ptr::null_mut();
363
364 let ret = ffi::g_socket_receive_from(
365 self.as_ref().to_glib_none().0,
366 &mut addr_ptr,
367 buffer_ptr,
368 count,
369 gcancellable.0,
370 &mut error,
371 );
372 if error.is_null() {
373 Ok((ret as usize, from_glib_full(addr_ptr)))
374 } else {
375 Err(from_glib_full(error))
376 }
377 }
378 }
379 #[doc(alias = "g_socket_receive_message")]
380 fn receive_message<C: IsA<Cancellable>>(
381 &self,
382 mut address: Option<&mut Option<SocketAddress>>,
383 vectors: &mut [InputVector],
384 control_messages: Option<&mut SocketControlMessages>,
385 mut flags: i32,
386 cancellable: Option<&C>,
387 ) -> Result<(usize, i32), glib::Error> {
388 let cancellable = cancellable.map(|c| c.as_ref());
389 let address = address
390 .as_mut()
391 .map(|a| {
392 assert!(a.is_none());
393 *a as *mut _ as *mut _
394 })
395 .unwrap_or_else(ptr::null_mut);
396 let (control_messages, num_control_messages) = control_messages
397 .map(|c| (&mut c.ptr as *mut _, &mut c.len as *mut _ as *mut _))
398 .unwrap_or_else(|| (ptr::null_mut(), ptr::null_mut()));
399 unsafe {
400 let mut error = ptr::null_mut();
401
402 let received = ffi::g_socket_receive_message(
403 self.as_ref().to_glib_none().0,
404 address,
405 vectors.as_mut_ptr() as *mut ffi::GInputVector,
406 vectors.len().try_into().unwrap(),
407 control_messages,
408 num_control_messages,
409 &mut flags,
410 cancellable.to_glib_none().0,
411 &mut error,
412 );
413 if error.is_null() {
414 Ok((received as usize, flags))
415 } else {
416 Err(from_glib_full(error))
417 }
418 }
419 }
420 #[doc(alias = "g_socket_receive_messages")]
421 fn receive_messages<C: IsA<Cancellable>>(
422 &self,
423 messages: &mut [InputMessage],
424 flags: i32,
425 cancellable: Option<&C>,
426 ) -> Result<usize, glib::Error> {
427 let cancellable = cancellable.map(|c| c.as_ref());
428 unsafe {
429 let mut error = ptr::null_mut();
430
431 let count = ffi::g_socket_receive_messages(
432 self.as_ref().to_glib_none().0,
433 messages.as_mut_ptr() as *mut _,
434 messages.len().try_into().unwrap(),
435 flags,
436 cancellable.to_glib_none().0,
437 &mut error,
438 );
439 if error.is_null() {
440 Ok(count as usize)
441 } else {
442 Err(from_glib_full(error))
443 }
444 }
445 }
446 #[doc(alias = "g_socket_receive_with_blocking")]
447 fn receive_with_blocking<B: AsMut<[u8]>, C: IsA<Cancellable>>(
448 &self,
449 mut buffer: B,
450 blocking: bool,
451 cancellable: Option<&C>,
452 ) -> Result<usize, glib::Error> {
453 let cancellable = cancellable.map(|c| c.as_ref());
454 let gcancellable = cancellable.to_glib_none();
455 let buffer = buffer.as_mut();
456 let buffer_ptr = buffer.as_mut_ptr();
457 let count = buffer.len();
458 unsafe {
459 let mut error = ptr::null_mut();
460 let ret = ffi::g_socket_receive_with_blocking(
461 self.as_ref().to_glib_none().0,
462 buffer_ptr,
463 count,
464 blocking.into_glib(),
465 gcancellable.0,
466 &mut error,
467 );
468 if error.is_null() {
469 Ok(ret as usize)
470 } else {
471 Err(from_glib_full(error))
472 }
473 }
474 }
475
476 #[doc(alias = "g_socket_send")]
477 fn send<B: AsRef<[u8]>, C: IsA<Cancellable>>(
478 &self,
479 buffer: B,
480 cancellable: Option<&C>,
481 ) -> Result<usize, glib::Error> {
482 let cancellable = cancellable.map(|c| c.as_ref());
483 let gcancellable = cancellable.to_glib_none();
484 let (count, buffer_ptr) = {
485 let slice = buffer.as_ref();
486 (slice.len(), slice.as_ptr())
487 };
488 unsafe {
489 let mut error = ptr::null_mut();
490 let ret = ffi::g_socket_send(
491 self.as_ref().to_glib_none().0,
492 mut_override(buffer_ptr),
493 count,
494 gcancellable.0,
495 &mut error,
496 );
497 if error.is_null() {
498 Ok(ret as usize)
499 } else {
500 Err(from_glib_full(error))
501 }
502 }
503 }
504 #[doc(alias = "g_socket_send_message")]
505 fn send_message<P: IsA<SocketAddress>, C: IsA<Cancellable>>(
506 &self,
507 address: Option<&P>,
508 vectors: &[OutputVector],
509 messages: &[SocketControlMessage],
510 flags: i32,
511 cancellable: Option<&C>,
512 ) -> Result<usize, glib::Error> {
513 let cancellable = cancellable.map(|c| c.as_ref());
514 unsafe {
515 let mut error = ptr::null_mut();
516 let ret = ffi::g_socket_send_message(
517 self.as_ref().to_glib_none().0,
518 address.map(|p| p.as_ref()).to_glib_none().0,
519 vectors.as_ptr() as *mut ffi::GOutputVector,
520 vectors.len().try_into().unwrap(),
521 messages.as_ptr() as *mut _,
522 messages.len().try_into().unwrap(),
523 flags,
524 cancellable.to_glib_none().0,
525 &mut error,
526 );
527 if error.is_null() {
528 Ok(ret as usize)
529 } else {
530 Err(from_glib_full(error))
531 }
532 }
533 }
534 #[cfg(feature = "v2_60")]
535 #[cfg_attr(docsrs, doc(cfg(feature = "v2_60")))]
536 #[doc(alias = "g_socket_send_message_with_timeout")]
537 fn send_message_with_timeout<P: IsA<SocketAddress>, C: IsA<Cancellable>>(
538 &self,
539 address: Option<&P>,
540 vectors: &[OutputVector],
541 messages: &[SocketControlMessage],
542 flags: i32,
543 timeout: Option<Duration>,
544 cancellable: Option<&C>,
545 ) -> Result<(PollableReturn, usize), glib::Error> {
546 let cancellable = cancellable.map(|c| c.as_ref());
547 unsafe {
548 let mut error = ptr::null_mut();
549 let mut bytes_written = 0;
550
551 let ret = ffi::g_socket_send_message_with_timeout(
552 self.as_ref().to_glib_none().0,
553 address.map(|p| p.as_ref()).to_glib_none().0,
554 vectors.as_ptr() as *mut ffi::GOutputVector,
555 vectors.len().try_into().unwrap(),
556 messages.as_ptr() as *mut _,
557 messages.len().try_into().unwrap(),
558 flags,
559 timeout
560 .map(|t| t.as_micros().try_into().unwrap())
561 .unwrap_or(-1),
562 &mut bytes_written,
563 cancellable.to_glib_none().0,
564 &mut error,
565 );
566 if error.is_null() {
567 Ok((from_glib(ret), bytes_written))
568 } else {
569 Err(from_glib_full(error))
570 }
571 }
572 }
573 #[doc(alias = "g_socket_send_messages")]
574 fn send_messages<C: IsA<Cancellable>>(
575 &self,
576 messages: &mut [OutputMessage],
577 flags: i32,
578 cancellable: Option<&C>,
579 ) -> Result<usize, glib::Error> {
580 let cancellable = cancellable.map(|c| c.as_ref());
581 unsafe {
582 let mut error = ptr::null_mut();
583 let count = ffi::g_socket_send_messages(
584 self.as_ref().to_glib_none().0,
585 messages.as_mut_ptr() as *mut _,
586 messages.len().try_into().unwrap(),
587 flags,
588 cancellable.to_glib_none().0,
589 &mut error,
590 );
591 if error.is_null() {
592 Ok(count as usize)
593 } else {
594 Err(from_glib_full(error))
595 }
596 }
597 }
598 #[doc(alias = "g_socket_send_to")]
599 fn send_to<B: AsRef<[u8]>, P: IsA<SocketAddress>, C: IsA<Cancellable>>(
600 &self,
601 address: Option<&P>,
602 buffer: B,
603 cancellable: Option<&C>,
604 ) -> Result<usize, glib::Error> {
605 let cancellable = cancellable.map(|c| c.as_ref());
606 let gcancellable = cancellable.to_glib_none();
607 let (count, buffer_ptr) = {
608 let slice = buffer.as_ref();
609 (slice.len(), slice.as_ptr())
610 };
611 unsafe {
612 let mut error = ptr::null_mut();
613
614 let ret = ffi::g_socket_send_to(
615 self.as_ref().to_glib_none().0,
616 address.map(|p| p.as_ref()).to_glib_none().0,
617 mut_override(buffer_ptr),
618 count,
619 gcancellable.0,
620 &mut error,
621 );
622 if error.is_null() {
623 Ok(ret as usize)
624 } else {
625 Err(from_glib_full(error))
626 }
627 }
628 }
629 #[doc(alias = "g_socket_send_with_blocking")]
630 fn send_with_blocking<B: AsRef<[u8]>, C: IsA<Cancellable>>(
631 &self,
632 buffer: B,
633 blocking: bool,
634 cancellable: Option<&C>,
635 ) -> Result<usize, glib::Error> {
636 let cancellable = cancellable.map(|c| c.as_ref());
637 let gcancellable = cancellable.to_glib_none();
638 let (count, buffer_ptr) = {
639 let slice = buffer.as_ref();
640 (slice.len(), slice.as_ptr())
641 };
642 unsafe {
643 let mut error = ptr::null_mut();
644 let ret = ffi::g_socket_send_with_blocking(
645 self.as_ref().to_glib_none().0,
646 mut_override(buffer_ptr),
647 count,
648 blocking.into_glib(),
649 gcancellable.0,
650 &mut error,
651 );
652 if error.is_null() {
653 Ok(ret as usize)
654 } else {
655 Err(from_glib_full(error))
656 }
657 }
658 }
659
660 #[cfg(unix)]
661 #[cfg_attr(docsrs, doc(cfg(unix)))]
662 #[doc(alias = "get_fd")]
663 #[doc(alias = "g_socket_get_fd")]
664 fn fd(&self) -> BorrowedFd<'_> {
665 self.as_ref().as_fd()
666 }
667
668 #[cfg(windows)]
669 #[cfg_attr(docsrs, doc(cfg(windows)))]
670 #[doc(alias = "get_socket")]
671 #[doc(alias = "g_socket_get_fd")]
672 fn socket(&self) -> BorrowedSocket<'_> {
673 self.as_ref().as_socket()
674 }
675
676 #[doc(alias = "g_socket_create_source")]
677 fn create_source<F, C>(
678 &self,
679 condition: glib::IOCondition,
680 cancellable: Option<&C>,
681 name: Option<&str>,
682 priority: glib::Priority,
683 func: F,
684 ) -> glib::Source
685 where
686 F: FnMut(&Self, glib::IOCondition) -> glib::ControlFlow + 'static,
687 C: IsA<Cancellable>,
688 {
689 unsafe extern "C" fn trampoline<
690 O: IsA<Socket>,
691 F: FnMut(&O, glib::IOCondition) -> glib::ControlFlow + 'static,
692 >(
693 socket: *mut ffi::GSocket,
694 condition: glib::ffi::GIOCondition,
695 func: glib::ffi::gpointer,
696 ) -> glib::ffi::gboolean {
697 let func: &RefCell<F> = &*(func as *const RefCell<F>);
698 let mut func = func.borrow_mut();
699 (*func)(
700 Socket::from_glib_borrow(socket).unsafe_cast_ref(),
701 from_glib(condition),
702 )
703 .into_glib()
704 }
705 unsafe extern "C" fn destroy_closure<F>(ptr: glib::ffi::gpointer) {
706 let _ = Box::<RefCell<F>>::from_raw(ptr as *mut _);
707 }
708 let cancellable = cancellable.map(|c| c.as_ref());
709 let gcancellable = cancellable.to_glib_none();
710 unsafe {
711 let source = ffi::g_socket_create_source(
712 self.as_ref().to_glib_none().0,
713 condition.into_glib(),
714 gcancellable.0,
715 );
716 let trampoline = trampoline::<Self, F> as glib::ffi::gpointer;
717 glib::ffi::g_source_set_callback(
718 source,
719 Some(transmute::<
720 glib::ffi::gpointer,
721 unsafe extern "C" fn(glib::ffi::gpointer) -> glib::ffi::gboolean,
722 >(trampoline)),
723 Box::into_raw(Box::new(RefCell::new(func))) as glib::ffi::gpointer,
724 Some(destroy_closure::<F>),
725 );
726 glib::ffi::g_source_set_priority(source, priority.into_glib());
727
728 if let Some(name) = name {
729 glib::ffi::g_source_set_name(source, name.to_glib_none().0);
730 }
731
732 from_glib_full(source)
733 }
734 }
735
736 fn create_source_future<C: IsA<Cancellable>>(
737 &self,
738 condition: glib::IOCondition,
739 cancellable: Option<&C>,
740 priority: glib::Priority,
741 ) -> Pin<Box<dyn std::future::Future<Output = glib::IOCondition> + 'static>> {
742 let cancellable: Option<Cancellable> = cancellable.map(|c| c.as_ref()).cloned();
743
744 let obj = self.clone();
745 Box::pin(glib::SourceFuture::new(move |send| {
746 let mut send = Some(send);
747 obj.create_source(
748 condition,
749 cancellable.as_ref(),
750 None,
751 priority,
752 move |_, condition| {
753 let _ = send.take().unwrap().send(condition);
754 glib::ControlFlow::Break
755 },
756 )
757 }))
758 }
759
760 fn create_source_stream<C: IsA<Cancellable>>(
761 &self,
762 condition: glib::IOCondition,
763 cancellable: Option<&C>,
764 priority: glib::Priority,
765 ) -> Pin<Box<dyn Stream<Item = glib::IOCondition> + 'static>> {
766 let cancellable: Option<Cancellable> = cancellable.map(|c| c.as_ref()).cloned();
767
768 let obj = self.clone();
769 Box::pin(glib::SourceStream::new(move |send| {
770 let send = Some(send);
771 obj.create_source(
772 condition,
773 cancellable.as_ref(),
774 None,
775 priority,
776 move |_, condition| {
777 if send.as_ref().unwrap().unbounded_send(condition).is_err() {
778 glib::ControlFlow::Break
779 } else {
780 glib::ControlFlow::Continue
781 }
782 },
783 )
784 }))
785 }
786}
787
788impl<O: IsA<Socket>> SocketExtManual for O {}
789
790#[cfg(all(docsrs, not(unix)))]
791pub trait IntoRawFd {
792 fn into_raw_fd(self) -> libc::c_int;
793}
794
795#[cfg(all(docsrs, not(unix)))]
796pub trait FromRawFd {
797 unsafe fn from_raw_fd(fd: libc::c_int) -> Self;
798}
799
800#[cfg(all(docsrs, not(unix)))]
801pub trait AsRawFd {
802 fn as_raw_fd(&self) -> RawFd;
803}
804
805#[cfg(all(docsrs, not(unix)))]
806pub type RawFd = libc::c_int;
807
808#[cfg(all(docsrs, not(windows)))]
809pub trait IntoRawSocket {
810 fn into_raw_socket(self) -> u64;
811}
812
813#[cfg(all(docsrs, not(windows)))]
814pub trait FromRawSocket {
815 unsafe fn from_raw_socket(sock: u64) -> Self;
816}
817
818#[cfg(all(docsrs, not(windows)))]
819pub trait AsRawSocket {
820 fn as_raw_socket(&self) -> RawSocket;
821}
822
823#[cfg(all(docsrs, not(windows)))]
824pub type RawSocket = *mut std::os::raw::c_void;
825
826#[cfg(test)]
827mod tests {
828 #[test]
829 #[cfg(unix)]
830 fn socket_messages() {
831 use std::{
832 io,
833 os::unix::io::{AsRawFd, FromRawFd, OwnedFd},
834 };
835
836 use super::Socket;
837 use crate::{prelude::*, Cancellable, UnixFDMessage};
838
839 let mut fds = [0 as libc::c_int; 2];
840 let (out_sock, in_sock) = unsafe {
841 let ret = libc::socketpair(libc::AF_UNIX, libc::SOCK_STREAM, 0, fds.as_mut_ptr());
842 if ret != 0 {
843 panic!("{}", io::Error::last_os_error());
844 }
845 (
846 Socket::from_fd(OwnedFd::from_raw_fd(fds[0])).unwrap(),
847 Socket::from_fd(OwnedFd::from_raw_fd(fds[1])).unwrap(),
848 )
849 };
850
851 let fd_msg = UnixFDMessage::new();
852 fd_msg.append_fd(out_sock.as_raw_fd()).unwrap();
853 let vs = [super::OutputVector::new(&[0])];
854 let ctrl_msgs = [fd_msg.upcast()];
855 let mut out_msg = [super::OutputMessage::new(
856 crate::SocketAddress::NONE,
857 vs.as_slice(),
858 ctrl_msgs.as_slice(),
859 )];
860 let written = super::SocketExtManual::send_messages(
861 &out_sock,
862 out_msg.as_mut_slice(),
863 0,
864 Cancellable::NONE,
865 )
866 .unwrap();
867 assert_eq!(written, 1);
868 assert_eq!(out_msg[0].bytes_sent(), 1);
869
870 let mut v = [0u8];
871 let mut vs = [super::InputVector::new(v.as_mut_slice())];
872 let mut ctrl_msgs = super::SocketControlMessages::new();
873 let mut in_msg = [super::InputMessage::new(
874 None,
875 vs.as_mut_slice(),
876 Some(&mut ctrl_msgs),
877 )];
878 let received = super::SocketExtManual::receive_messages(
879 &in_sock,
880 in_msg.as_mut_slice(),
881 0,
882 Cancellable::NONE,
883 )
884 .unwrap();
885
886 assert_eq!(received, 1);
887 assert_eq!(in_msg[0].bytes_received(), 1);
888 assert_eq!(ctrl_msgs.len(), 1);
889 let fds = ctrl_msgs[0]
890 .downcast_ref::<UnixFDMessage>()
891 .unwrap()
892 .fd_list();
893 assert_eq!(fds.length(), 1);
894 }
895 #[test]
896 #[cfg(unix)]
897 fn dgram_socket_messages() {
898 use super::Socket;
899 use crate::{prelude::*, Cancellable};
900
901 let addr = crate::InetSocketAddress::from_string("127.0.0.1", 28351).unwrap();
902
903 let out_sock = Socket::new(
904 crate::SocketFamily::Ipv4,
905 crate::SocketType::Datagram,
906 crate::SocketProtocol::Udp,
907 )
908 .unwrap();
909 let in_sock = Socket::new(
910 crate::SocketFamily::Ipv4,
911 crate::SocketType::Datagram,
912 crate::SocketProtocol::Udp,
913 )
914 .unwrap();
915 in_sock.bind(&addr, true).unwrap();
916
917 const DATA: [u8; std::mem::size_of::<u64>()] = 1234u64.to_be_bytes();
918 let out_vec = DATA;
919 let out_vecs = [super::OutputVector::new(out_vec.as_slice())];
920 let mut out_msg = [super::OutputMessage::new(
921 Some(&addr),
922 out_vecs.as_slice(),
923 &[],
924 )];
925 let written = super::SocketExtManual::send_messages(
926 &out_sock,
927 out_msg.as_mut_slice(),
928 0,
929 Cancellable::NONE,
930 )
931 .unwrap();
932 assert_eq!(written, 1);
933 assert_eq!(out_msg[0].bytes_sent() as usize, out_vec.len());
934
935 let mut in_addr = None;
936 let mut in_vec = [0u8; DATA.len()];
937 let mut in_vecs = [super::InputVector::new(in_vec.as_mut_slice())];
938 let mut in_msg = [super::InputMessage::new(
939 Some(&mut in_addr),
940 in_vecs.as_mut_slice(),
941 None,
942 )];
943 let received = super::SocketExtManual::receive_messages(
944 &in_sock,
945 in_msg.as_mut_slice(),
946 0,
947 Cancellable::NONE,
948 )
949 .unwrap();
950
951 assert_eq!(received, 1);
952 assert_eq!(in_msg[0].bytes_received(), in_vec.len());
953 assert_eq!(in_vec, out_vec);
954 let in_addr = in_addr
955 .unwrap()
956 .downcast::<crate::InetSocketAddress>()
957 .unwrap();
958 assert_eq!(in_addr.address().to_str(), addr.address().to_str());
959 }
960}