Skip to main content

gstthreadshare/
socket.rs

1// Copyright (C) 2018 Sebastian Dröge <sebastian@centricular.com>
2// Copyright (C) 2018 LEE Dongjun <redongjun@gmail.com>
3//
4// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
5// If a copy of the MPL was not distributed with this file, You can obtain one at
6// <https://mozilla.org/MPL/2.0/>.
7//
8// SPDX-License-Identifier: MPL-2.0
9
10use gst::glib;
11use gst::prelude::*;
12
13use std::sync::LazyLock;
14
15use std::error;
16use std::fmt;
17use std::future::Future;
18use std::io;
19use std::net::UdpSocket;
20
21use crate::runtime::Async;
22
23#[cfg(unix)]
24use std::os::fd::OwnedFd;
25
26#[cfg(windows)]
27use std::os::windows::io::OwnedSocket;
28
29static SOCKET_CAT: LazyLock<gst::DebugCategory> = LazyLock::new(|| {
30    gst::DebugCategory::new(
31        "ts-socket",
32        gst::DebugColorFlags::empty(),
33        Some("Thread-sharing Socket"),
34    )
35});
36
37pub trait SocketRead: Send + Unpin {
38    const DO_TIMESTAMP: bool;
39
40    fn read<'buf>(
41        &'buf mut self,
42        buffer: &'buf mut [u8],
43    ) -> impl Future<Output = io::Result<(usize, Option<std::net::SocketAddr>)>> + Send;
44}
45
46pub struct Socket<T: SocketRead> {
47    element: gst::Element,
48    buffer_pool: gst::BufferPool,
49    reader: T,
50    mapped_buffer: Option<gst::MappedBuffer<gst::buffer::Writable>>,
51    clock: Option<gst::Clock>,
52    base_time: Option<gst::ClockTime>,
53}
54
55impl<T: SocketRead> Socket<T> {
56    pub fn try_new(
57        element: gst::Element,
58        buffer_pool: gst::BufferPool,
59        reader: T,
60    ) -> Result<Self, glib::BoolError> {
61        // FIXME couldn't we just delegate this to caller?
62        buffer_pool.set_active(true).map_err(|err| {
63            gst::error!(
64                SOCKET_CAT,
65                obj = element,
66                "Failed to prepare socket: {}",
67                err
68            );
69
70            err
71        })?;
72
73        Ok(Socket::<T> {
74            buffer_pool,
75            element,
76            reader,
77            mapped_buffer: None,
78            clock: None,
79            base_time: None,
80        })
81    }
82
83    pub fn set_clock(&mut self, clock: Option<gst::Clock>, base_time: Option<gst::ClockTime>) {
84        self.clock = clock;
85        self.base_time = base_time;
86    }
87
88    pub fn get(&self) -> &T {
89        &self.reader
90    }
91}
92
93#[derive(Debug)]
94pub enum SocketError {
95    Gst(gst::FlowError),
96    Io(io::Error),
97}
98
99impl error::Error for SocketError {}
100
101impl fmt::Display for SocketError {
102    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
103        match self {
104            SocketError::Gst(err) => write!(f, "flow error: {err}"),
105            SocketError::Io(err) => write!(f, "IO error: {err}"),
106        }
107    }
108}
109
110impl<T: SocketRead> Socket<T> {
111    // Can't implement this as a Stream trait because we end up using things like
112    // tokio::net::UdpSocket which don't implement pollable functions.
113    #[allow(clippy::should_implement_trait)]
114    pub async fn try_next(
115        &mut self,
116    ) -> Result<(gst::Buffer, Option<std::net::SocketAddr>), SocketError> {
117        gst::log!(SOCKET_CAT, obj = self.element, "Trying to read data");
118
119        if self.mapped_buffer.is_none() {
120            match self.buffer_pool.acquire_buffer(None) {
121                Ok(buffer) => {
122                    self.mapped_buffer = Some(buffer.into_mapped_buffer_writable().unwrap());
123                }
124                Err(err) => {
125                    gst::debug!(
126                        SOCKET_CAT,
127                        obj = self.element,
128                        "Failed to acquire buffer {:?}",
129                        err
130                    );
131                    return Err(SocketError::Gst(err));
132                }
133            }
134        }
135
136        match self
137            .reader
138            .read(self.mapped_buffer.as_mut().unwrap().as_mut_slice())
139            .await
140        {
141            Ok((len, saddr)) => {
142                let dts = if T::DO_TIMESTAMP {
143                    let time = self.clock.as_ref().unwrap().time();
144                    let running_time = time.opt_checked_sub(self.base_time).ok().flatten();
145                    // FIXME maybe we should check if running_time.is_none
146                    // so as to display another message
147                    gst::debug!(
148                        SOCKET_CAT,
149                        obj = self.element,
150                        "Read {} bytes at {} (clock {})",
151                        len,
152                        running_time.display(),
153                        time.display(),
154                    );
155                    running_time
156                } else {
157                    gst::debug!(SOCKET_CAT, obj = self.element, "Read {} bytes", len);
158                    gst::ClockTime::NONE
159                };
160
161                let mut buffer = self.mapped_buffer.take().unwrap().into_buffer();
162                {
163                    let buffer = buffer.get_mut().unwrap();
164                    if len < buffer.size() {
165                        buffer.set_size(len);
166                    }
167                    buffer.set_dts(dts);
168                }
169
170                Ok((buffer, saddr))
171            }
172            Err(err) => {
173                gst::debug!(SOCKET_CAT, obj = self.element, "Read error {:?}", err);
174
175                Err(SocketError::Io(err))
176            }
177        }
178    }
179}
180
181impl<T: SocketRead> Drop for Socket<T> {
182    fn drop(&mut self) {
183        if let Err(err) = self.buffer_pool.set_active(false) {
184            gst::error!(
185                SOCKET_CAT,
186                obj = self.element,
187                "Failed to unprepare socket: {}",
188                err
189            );
190        }
191    }
192}
193
194// Send/Sync struct for passing around a gio::Socket
195// and getting the fd from it
196//
197// gio::Socket is not Send/Sync as it's generally unsafe
198// to access it from multiple threads. Getting the underlying raw
199// fd is safe though, as is receiving/sending from two different threads
200#[derive(Debug, Clone)]
201pub struct GioSocketWrapper {
202    socket: gio::Socket,
203}
204
205unsafe impl Send for GioSocketWrapper {}
206unsafe impl Sync for GioSocketWrapper {}
207
208impl GioSocketWrapper {
209    pub fn new(socket: &gio::Socket) -> Self {
210        Self {
211            socket: socket.clone(),
212        }
213    }
214
215    pub fn as_socket(&self) -> &gio::Socket {
216        &self.socket
217    }
218
219    #[cfg(any(
220        target_os = "macos",
221        target_os = "ios",
222        target_os = "freebsd",
223        target_os = "dragonfly",
224        target_os = "openbsd",
225        target_os = "netbsd",
226        target_os = "linux",
227        target_os = "android",
228        target_os = "aix",
229        target_os = "fuchsia",
230        target_os = "haiku",
231        target_env = "newlib"
232    ))]
233    pub fn set_tos(&self, qos_dscp: i32) -> rustix::io::Result<()> {
234        use std::os::fd::AsFd;
235
236        use gio::prelude::*;
237        use rustix::net::sockopt;
238
239        let tos = (qos_dscp & 0x3f) << 2;
240
241        let socket = self.as_socket();
242
243        sockopt::set_ip_tos(socket.as_fd(), tos as u8)?;
244
245        if socket.family() == gio::SocketFamily::Ipv6 {
246            sockopt::set_ipv6_tclass(socket.as_fd(), tos as u32)?;
247        }
248
249        Ok(())
250    }
251
252    #[cfg(not(any(
253        target_os = "macos",
254        target_os = "ios",
255        target_os = "freebsd",
256        target_os = "dragonfly",
257        target_os = "openbsd",
258        target_os = "netbsd",
259        target_os = "linux",
260        target_os = "android",
261        target_os = "aix",
262        target_os = "fuchsia",
263        target_os = "haiku",
264        target_env = "newlib"
265    )))]
266    pub fn set_tos(&self, _qos_dscp: i32) -> rustix::io::Result<()> {
267        Ok(())
268    }
269
270    #[cfg(not(windows))]
271    pub fn get<T: From<OwnedFd>>(&self) -> T {
272        use std::os::fd::AsFd;
273
274        let fd = self.socket.as_fd();
275        let fd = fd.try_clone_to_owned().unwrap();
276        T::from(fd)
277    }
278
279    #[cfg(windows)]
280    pub fn get<T: From<OwnedSocket>>(&self) -> T {
281        unsafe {
282            use std::os::windows::io::{AsRawSocket, BorrowedSocket};
283
284            let socket = self.socket.as_raw_socket();
285            let socket = BorrowedSocket::borrow_raw(socket);
286            let socket = socket.try_clone_to_owned().unwrap();
287            T::from(socket)
288        }
289    }
290}
291
292pub fn wrap_socket(socket: &Async<UdpSocket>) -> Result<GioSocketWrapper, gst::ErrorMessage> {
293    #[cfg(unix)]
294    {
295        use std::os::fd::AsFd;
296
297        let fd = socket.as_fd();
298        let fd = fd.try_clone_to_owned().unwrap();
299
300        let gio_socket = gio::Socket::from_fd(fd);
301        let gio_socket = gio_socket.map_err(|err| {
302            gst::error_msg!(
303                gst::ResourceError::OpenWrite,
304                ["Failed to create wrapped GIO socket: {}", err]
305            )
306        })?;
307
308        Ok(GioSocketWrapper::new(&gio_socket))
309    }
310    #[cfg(windows)]
311    unsafe {
312        use std::os::windows::io::{AsRawSocket, BorrowedSocket};
313
314        let socket = socket.as_raw_socket();
315        let socket = BorrowedSocket::borrow_raw(socket);
316        let socket = socket.try_clone_to_owned().unwrap();
317
318        let gio_socket = gio::Socket::from_socket(socket).map_err(|err| {
319            gst::error_msg!(
320                gst::ResourceError::OpenWrite,
321                ["Failed to create wrapped GIO socket: {}", err]
322            )
323        })?;
324
325        Ok(GioSocketWrapper::new(&gio_socket))
326    }
327}