1use gst::glib;
11use gst::prelude::*;
12
13use std::sync::LazyLock;
14
15use std::error;
16use std::fmt;
17use std::future::Future;
18use std::io;
19use std::net::UdpSocket;
20
21use crate::runtime::Async;
22
23#[cfg(unix)]
24use std::os::fd::OwnedFd;
25
26#[cfg(windows)]
27use std::os::windows::io::OwnedSocket;
28
29static SOCKET_CAT: LazyLock<gst::DebugCategory> = LazyLock::new(|| {
30 gst::DebugCategory::new(
31 "ts-socket",
32 gst::DebugColorFlags::empty(),
33 Some("Thread-sharing Socket"),
34 )
35});
36
37pub trait SocketRead: Send + Unpin {
38 const DO_TIMESTAMP: bool;
39
40 fn read<'buf>(
41 &'buf mut self,
42 buffer: &'buf mut [u8],
43 ) -> impl Future<Output = io::Result<(usize, Option<std::net::SocketAddr>)>> + Send;
44}
45
46pub struct Socket<T: SocketRead> {
47 element: gst::Element,
48 buffer_pool: gst::BufferPool,
49 reader: T,
50 mapped_buffer: Option<gst::MappedBuffer<gst::buffer::Writable>>,
51 clock: Option<gst::Clock>,
52 base_time: Option<gst::ClockTime>,
53}
54
55impl<T: SocketRead> Socket<T> {
56 pub fn try_new(
57 element: gst::Element,
58 buffer_pool: gst::BufferPool,
59 reader: T,
60 ) -> Result<Self, glib::BoolError> {
61 buffer_pool.set_active(true).map_err(|err| {
63 gst::error!(
64 SOCKET_CAT,
65 obj = element,
66 "Failed to prepare socket: {}",
67 err
68 );
69
70 err
71 })?;
72
73 Ok(Socket::<T> {
74 buffer_pool,
75 element,
76 reader,
77 mapped_buffer: None,
78 clock: None,
79 base_time: None,
80 })
81 }
82
83 pub fn set_clock(&mut self, clock: Option<gst::Clock>, base_time: Option<gst::ClockTime>) {
84 self.clock = clock;
85 self.base_time = base_time;
86 }
87
88 pub fn get(&self) -> &T {
89 &self.reader
90 }
91}
92
93#[derive(Debug)]
94pub enum SocketError {
95 Gst(gst::FlowError),
96 Io(io::Error),
97}
98
99impl error::Error for SocketError {}
100
101impl fmt::Display for SocketError {
102 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
103 match self {
104 SocketError::Gst(err) => write!(f, "flow error: {err}"),
105 SocketError::Io(err) => write!(f, "IO error: {err}"),
106 }
107 }
108}
109
110impl<T: SocketRead> Socket<T> {
111 #[allow(clippy::should_implement_trait)]
114 pub async fn try_next(
115 &mut self,
116 ) -> Result<(gst::Buffer, Option<std::net::SocketAddr>), SocketError> {
117 gst::log!(SOCKET_CAT, obj = self.element, "Trying to read data");
118
119 if self.mapped_buffer.is_none() {
120 match self.buffer_pool.acquire_buffer(None) {
121 Ok(buffer) => {
122 self.mapped_buffer = Some(buffer.into_mapped_buffer_writable().unwrap());
123 }
124 Err(err) => {
125 gst::debug!(
126 SOCKET_CAT,
127 obj = self.element,
128 "Failed to acquire buffer {:?}",
129 err
130 );
131 return Err(SocketError::Gst(err));
132 }
133 }
134 }
135
136 match self
137 .reader
138 .read(self.mapped_buffer.as_mut().unwrap().as_mut_slice())
139 .await
140 {
141 Ok((len, saddr)) => {
142 let dts = if T::DO_TIMESTAMP {
143 let time = self.clock.as_ref().unwrap().time();
144 let running_time = time.opt_checked_sub(self.base_time).ok().flatten();
145 gst::debug!(
148 SOCKET_CAT,
149 obj = self.element,
150 "Read {} bytes at {} (clock {})",
151 len,
152 running_time.display(),
153 time.display(),
154 );
155 running_time
156 } else {
157 gst::debug!(SOCKET_CAT, obj = self.element, "Read {} bytes", len);
158 gst::ClockTime::NONE
159 };
160
161 let mut buffer = self.mapped_buffer.take().unwrap().into_buffer();
162 {
163 let buffer = buffer.get_mut().unwrap();
164 if len < buffer.size() {
165 buffer.set_size(len);
166 }
167 buffer.set_dts(dts);
168 }
169
170 Ok((buffer, saddr))
171 }
172 Err(err) => {
173 gst::debug!(SOCKET_CAT, obj = self.element, "Read error {:?}", err);
174
175 Err(SocketError::Io(err))
176 }
177 }
178 }
179}
180
181impl<T: SocketRead> Drop for Socket<T> {
182 fn drop(&mut self) {
183 if let Err(err) = self.buffer_pool.set_active(false) {
184 gst::error!(
185 SOCKET_CAT,
186 obj = self.element,
187 "Failed to unprepare socket: {}",
188 err
189 );
190 }
191 }
192}
193
194#[derive(Debug, Clone)]
201pub struct GioSocketWrapper {
202 socket: gio::Socket,
203}
204
205unsafe impl Send for GioSocketWrapper {}
206unsafe impl Sync for GioSocketWrapper {}
207
208impl GioSocketWrapper {
209 pub fn new(socket: &gio::Socket) -> Self {
210 Self {
211 socket: socket.clone(),
212 }
213 }
214
215 pub fn as_socket(&self) -> &gio::Socket {
216 &self.socket
217 }
218
219 #[cfg(any(
220 target_os = "macos",
221 target_os = "ios",
222 target_os = "freebsd",
223 target_os = "dragonfly",
224 target_os = "openbsd",
225 target_os = "netbsd",
226 target_os = "linux",
227 target_os = "android",
228 target_os = "aix",
229 target_os = "fuchsia",
230 target_os = "haiku",
231 target_env = "newlib"
232 ))]
233 pub fn set_tos(&self, qos_dscp: i32) -> rustix::io::Result<()> {
234 use std::os::fd::AsFd;
235
236 use gio::prelude::*;
237 use rustix::net::sockopt;
238
239 let tos = (qos_dscp & 0x3f) << 2;
240
241 let socket = self.as_socket();
242
243 sockopt::set_ip_tos(socket.as_fd(), tos as u8)?;
244
245 if socket.family() == gio::SocketFamily::Ipv6 {
246 sockopt::set_ipv6_tclass(socket.as_fd(), tos as u32)?;
247 }
248
249 Ok(())
250 }
251
252 #[cfg(not(any(
253 target_os = "macos",
254 target_os = "ios",
255 target_os = "freebsd",
256 target_os = "dragonfly",
257 target_os = "openbsd",
258 target_os = "netbsd",
259 target_os = "linux",
260 target_os = "android",
261 target_os = "aix",
262 target_os = "fuchsia",
263 target_os = "haiku",
264 target_env = "newlib"
265 )))]
266 pub fn set_tos(&self, _qos_dscp: i32) -> rustix::io::Result<()> {
267 Ok(())
268 }
269
270 #[cfg(not(windows))]
271 pub fn get<T: From<OwnedFd>>(&self) -> T {
272 use std::os::fd::AsFd;
273
274 let fd = self.socket.as_fd();
275 let fd = fd.try_clone_to_owned().unwrap();
276 T::from(fd)
277 }
278
279 #[cfg(windows)]
280 pub fn get<T: From<OwnedSocket>>(&self) -> T {
281 unsafe {
282 use std::os::windows::io::{AsRawSocket, BorrowedSocket};
283
284 let socket = self.socket.as_raw_socket();
285 let socket = BorrowedSocket::borrow_raw(socket);
286 let socket = socket.try_clone_to_owned().unwrap();
287 T::from(socket)
288 }
289 }
290}
291
292pub fn wrap_socket(socket: &Async<UdpSocket>) -> Result<GioSocketWrapper, gst::ErrorMessage> {
293 #[cfg(unix)]
294 {
295 use std::os::fd::AsFd;
296
297 let fd = socket.as_fd();
298 let fd = fd.try_clone_to_owned().unwrap();
299
300 let gio_socket = gio::Socket::from_fd(fd);
301 let gio_socket = gio_socket.map_err(|err| {
302 gst::error_msg!(
303 gst::ResourceError::OpenWrite,
304 ["Failed to create wrapped GIO socket: {}", err]
305 )
306 })?;
307
308 Ok(GioSocketWrapper::new(&gio_socket))
309 }
310 #[cfg(windows)]
311 unsafe {
312 use std::os::windows::io::{AsRawSocket, BorrowedSocket};
313
314 let socket = socket.as_raw_socket();
315 let socket = BorrowedSocket::borrow_raw(socket);
316 let socket = socket.try_clone_to_owned().unwrap();
317
318 let gio_socket = gio::Socket::from_socket(socket).map_err(|err| {
319 gst::error_msg!(
320 gst::ResourceError::OpenWrite,
321 ["Failed to create wrapped GIO socket: {}", err]
322 )
323 })?;
324
325 Ok(GioSocketWrapper::new(&gio_socket))
326 }
327}