1use crate::{
2 calls,
3 safe_ref::{Lock, Mut, Ref},
4 types::*,
5 ContentType, Direction, DirectionImpl, In, Internal, IsTimestamp, MethodImpl, Out, Result,
6};
7use core::{
8 marker::PhantomData,
9 mem::{ManuallyDrop, MaybeUninit},
10 num::NonZeroUsize,
11};
12use getset::CopyGetters;
13use std::{
14 collections::VecDeque,
15 os::unix::io::RawFd,
16 sync::atomic::{AtomicBool, Ordering},
17};
18
19impl Internal<RequestBuffers> {
20 pub fn request(fd: RawFd, type_: BufferType, memory: Memory, count: u32) -> Result<Self> {
22 let req_bufs = MaybeUninit::<RequestBuffers>::zeroed();
23
24 unsafe_call!({
25 let mut req_bufs = req_bufs.assume_init();
26 req_bufs.type_ = type_;
27 req_bufs.memory = memory;
28 req_bufs.count = count;
29 calls::req_bufs(fd, &mut req_bufs).map(|_| req_bufs.into())
30 })
31 }
32}
33
34impl Buffer {
35 pub fn timestamp<T: IsTimestamp>(&self) -> T {
37 T::from_time_val(self.timestamp)
38 }
39
40 pub fn set_timestamp<T: IsTimestamp>(&mut self, time: T) {
42 self.timestamp = time.into_time_val();
43 }
44
45 pub fn has_timecode(&self) -> bool {
47 self.flags.contains(BufferFlag::TimeCode)
48 }
49
50 pub fn timecode(&self) -> Option<TimeCode> {
52 if self.has_timecode() {
53 Some(self.timecode)
54 } else {
55 None
56 }
57 }
58
59 pub fn set_timecode(&mut self, timecode: Option<TimeCode>) {
61 if let Some(timecode) = timecode {
62 self.timecode = timecode;
63 self.flags |= BufferFlag::TimeCode;
64 } else {
65 self.timecode = unsafe { MaybeUninit::zeroed().assume_init() };
66 self.flags &= !BufferFlag::TimeCode;
67 }
68 }
69
70 pub fn is_queued(&self) -> bool {
72 self.flags.contains(BufferFlag::Queued)
73 }
74}
75
76impl core::fmt::Display for Buffer {
77 fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
78 '#'.fmt(f)?;
79 self.sequence.fmt(f)?;
80 " @".fmt(f)?;
81 self.index.fmt(f)?;
82 ' '.fmt(f)?;
83 self.timestamp.fmt(f)?;
84 if self.has_timecode() {
85 ' '.fmt(f)?;
86 self.timecode.fmt(f)?;
87 }
88 ' '.fmt(f)?;
89 self.type_.fmt(f)?;
90 ' '.fmt(f)?;
91 self.memory.fmt(f)?;
92 ' '.fmt(f)?;
93 self.bytes_used.fmt(f)?;
94 '/'.fmt(f)?;
95 self.length.fmt(f)?;
96 if !self.flags.is_none() {
97 ' '.fmt(f)?;
98 self.flags.fmt(f)?;
99 }
100 if self.field != Field::None {
101 ' '.fmt(f)?;
102 self.field.fmt(f)?;
103 }
104 Ok(())
105 }
106}
107
108impl Internal<Buffer> {
109 pub fn new(type_: BufferType, memory: Memory, index: u32) -> Self {
111 let buffer = MaybeUninit::<Buffer>::zeroed();
112 let mut buffer = unsafe { buffer.assume_init() };
113
114 buffer.type_ = type_;
115 buffer.memory = memory;
116 buffer.index = index;
117
118 buffer.into()
119 }
120
121 pub fn query(&mut self, fd: RawFd) -> Result<()> {
123 unsafe_call!(calls::query_buf(fd, self.as_mut()).map(|_| ()))
124 }
125
126 pub fn queue(&mut self, fd: RawFd) -> Result<()> {
128 unsafe_call!(calls::q_buf(fd, self.as_mut()).map(|_| ()))
129 }
130
131 pub fn dequeue(&mut self, fd: RawFd) -> Result<()> {
133 unsafe_call!(calls::dq_buf(fd, self.as_mut()).map(|_| ()))
134 }
135
136 pub fn mark_dequeued(&mut self) {
137 self.flags &= !BufferFlag::Queued;
138 }
139}
140
141pub trait Method: MethodImpl {
143 const MEMORY: Memory;
145}
146
147#[derive(Debug, Clone, Copy)]
149pub struct Mmap;
150
151impl Method for Mmap {
152 const MEMORY: Memory = Memory::Mmap;
153}
154
155impl MethodImpl for Mmap {
156 fn init(buffer: &Buffer, fd: RawFd) -> Result<*mut u8> {
157 use nix::sys::mman::{mmap, MapFlags, ProtFlags};
158
159 unsafe_call!(mmap(
160 None,
161 NonZeroUsize::new(buffer.length as _).unwrap(),
162 ProtFlags::PROT_READ | ProtFlags::PROT_WRITE,
163 MapFlags::MAP_SHARED,
164 fd,
165 buffer.m.offset as _,
166 ))
167 .map(|pointer| pointer as _)
168 }
169
170 fn done(buffer: &Buffer, pointer: *mut u8) {
171 use nix::sys::mman::munmap;
172
173 let _ = unsafe_call!(munmap(pointer as *mut _, buffer.length as _));
174 }
175}
176
177#[derive(Debug, Clone, Copy)]
179pub struct UserPtr;
180
181impl Method for UserPtr {
182 const MEMORY: Memory = Memory::UserPtr;
183}
184
185impl MethodImpl for UserPtr {
186 fn init(buffer: &Buffer, _fd: RawFd) -> Result<*mut u8> {
187 let mut buffer = Vec::<u8>::with_capacity(buffer.length as _);
188
189 let pointer = buffer.as_mut_ptr();
190
191 let _ = ManuallyDrop::new(buffer);
192
193 Ok(pointer)
194 }
195
196 #[allow(clippy::not_unsafe_ptr_arg_deref)]
197 fn done(buffer: &Buffer, pointer: *mut u8) {
198 let _ = unsafe { Vec::<u8>::from_raw_parts(pointer, 0, buffer.length as _) };
199 }
200
201 fn update(buffer: &mut Buffer, pointer: *mut u8) {
202 buffer.m.userptr = pointer as _;
203 }
204}
205
206struct BufferState<Met: Method> {
207 pointer: *mut u8,
208 buffer: Internal<Buffer>,
209 _phantom: PhantomData<Met>,
210}
211
212unsafe impl<Met: Method> Send for BufferState<Met> {}
213
214impl<Met: Method> core::ops::Deref for BufferState<Met> {
215 type Target = Buffer;
216
217 fn deref(&self) -> &Self::Target {
218 &self.buffer
219 }
220}
221
222impl<Met: Method> core::ops::DerefMut for BufferState<Met> {
223 fn deref_mut(&mut self) -> &mut Self::Target {
224 &mut self.buffer
225 }
226}
227
228impl<Met: Method> BufferState<Met> {
229 fn new(fd: RawFd, buffer: Internal<Buffer>) -> Result<Self> {
230 let pointer = Met::init(&buffer, fd)?;
231
232 let data = Self {
233 pointer,
234 buffer,
235 _phantom: PhantomData,
236 };
237
238 Ok(data)
239 }
240
241 fn enqueue(&mut self, fd: RawFd) -> Result<()> {
242 Met::update(&mut self.buffer, self.pointer);
244 self.buffer.queue(fd)
246 }
247
248 fn reuse(&mut self, buffer: Internal<Buffer>) {
249 self.buffer = buffer;
250 }
251
252 fn mark_dequeued(&mut self) {
253 self.buffer.mark_dequeued()
254 }
255}
256
257impl<Met: Method> Drop for BufferState<Met> {
258 fn drop(&mut self) {
259 Met::done(&self.buffer, self.pointer)
260 }
261}
262
263#[derive(CopyGetters)]
264pub struct QueueData<Dir, Met: Method> {
265 buffers: Vec<Ref<Mut<BufferState<Met>>>>,
267
268 dequeued: Mut<VecDeque<u32>>,
270
271 on: AtomicBool,
273
274 #[getset(get_copy = "pub")]
276 buffer_type: Internal<BufferType>,
277
278 _phantom: PhantomData<Dir>,
279}
280
281impl<Dir, Met: Method> QueueData<Dir, Met> {
282 pub fn is_empty(&self) -> bool {
284 self.buffers.is_empty()
285 }
286
287 pub fn len(&self) -> usize {
289 self.buffers.len()
290 }
291}
292
293impl<Dir, Met: Method> Internal<QueueData<Dir, Met>> {
294 pub fn new(fd: RawFd, content_type: ContentType, count: u32) -> Result<Self>
296 where
297 Dir: Direction,
298 {
299 let buffer_type = Dir::buffer_type(content_type);
300
301 let request_buffers =
302 Internal::<RequestBuffers>::request(fd, buffer_type, Met::MEMORY, count)?;
303
304 let count = request_buffers.count;
305
306 let mut buffers = Vec::with_capacity(count as _);
307
308 for index in 0..count {
309 let mut buffer = Internal::<Buffer>::new(buffer_type, Met::MEMORY, index);
310 buffer.query(fd)?;
311 let data = BufferState::new(fd, buffer)?;
312
313 buffers.push(Ref::new(Mut::new(data)));
314 }
315
316 Ok(QueueData {
317 buffers,
318 dequeued: Mut::new(VecDeque::with_capacity(count as _)),
319 on: AtomicBool::new(false),
320 buffer_type: buffer_type.into(),
321 _phantom: PhantomData,
322 }
323 .into())
324 }
325
326 pub fn del(&mut self, fd: RawFd) -> Result<()> {
328 self.off(fd)?;
329
330 let _request_buffers =
331 Internal::<RequestBuffers>::request(fd, *self.buffer_type, Met::MEMORY, 0)?;
332
333 Ok(())
334 }
335
336 #[inline(always)]
338 fn is_on(&self) -> bool {
339 self.on.load(Ordering::SeqCst)
340 }
341
342 fn on(&self, fd: RawFd) -> Result<()> {
344 let type_ = *self.buffer_type.as_ref() as int;
345
346 unsafe_call!(calls::stream_on(fd, &type_).map(|_| ()))?;
347
348 self.on.store(true, Ordering::SeqCst);
349
350 Ok(())
351 }
352
353 fn off(&self, fd: RawFd) -> Result<()> {
355 let type_ = *self.buffer_type.as_ref() as int;
356
357 unsafe_call!(calls::stream_off(fd, &type_).map(|_| ()))?;
358
359 self.on.store(false, Ordering::SeqCst);
360
361 self.dequeue_queued();
364
365 Ok(())
366 }
367
368 fn dequeue_all(&self) {
370 for index in 0..self.buffers.len() {
371 let buffer_ref = &self.buffers[index];
372 if Ref::strong_count(buffer_ref) == 1 {
373 buffer_ref.lock().mark_dequeued();
374 self.dequeued.lock().push_back(index as _);
375 }
376 }
377 }
378
379 fn dequeue_queued(&self) {
381 for index in 0..self.buffers.len() {
382 let buffer_ref = &self.buffers[index];
383 if Ref::strong_count(buffer_ref) == 1 {
384 let mut buffer_data = buffer_ref.lock();
385 if buffer_data.is_queued() {
386 buffer_data.mark_dequeued();
387 self.dequeued.lock().push_back(index as _);
388 }
389 }
390 }
391 }
392
393 fn dequeue_unused(&self) -> Option<BufferRef<Dir, Met>> {
395 for index in 0..self.buffers.len() {
396 let buffer_ref = &self.buffers[index];
397 if Ref::strong_count(buffer_ref) == 1 && !buffer_ref.lock().is_queued() {
398 self.dequeued.lock().push_back(index as _);
399 return Some(BufferRef::new(buffer_ref));
400 }
401 }
402 None
403 }
404
405 fn enqueue_ready(&self, fd: RawFd) -> Result<()> {
407 while let Some(first) = {
410 let dequeued = self.dequeued.lock();
411 dequeued.front().copied()
412 } {
413 let buffer_ref = &self.buffers[first as usize];
414 if Ref::strong_count(buffer_ref) == 1 {
415 let mut buffer_data = buffer_ref.lock();
416 buffer_data.enqueue(fd)?;
417 self.dequeued.lock().pop_front();
418 } else {
419 break;
421 }
422 }
423
424 Ok(())
425 }
426
427 fn dequeue(&self, fd: RawFd) -> Result<BufferRef<Dir, Met>> {
429 let mut buffer = Internal::<Buffer>::new(*self.buffer_type, Met::MEMORY, 0);
430 buffer.dequeue(fd)?;
431 let index = buffer.index as usize;
432 let buffer_ref = &self.buffers[index];
433 if Ref::strong_count(buffer_ref) == 1 {
434 buffer_ref.lock().reuse(buffer);
435 self.dequeued.lock().push_back(index as _);
436 Ok(BufferRef::new(buffer_ref))
437 } else {
438 unreachable!();
439 }
440 }
441
442 pub fn next(&self, fd: RawFd) -> Result<BufferRef<Dir, Met>>
444 where
445 Dir: Direction,
446 {
447 Dir::next(self, fd)
448 }
449}
450
451impl DirectionImpl for In {
452 fn next<Met: Method>(
453 queue: &Internal<QueueData<Self, Met>>,
454 fd: RawFd,
455 ) -> Result<BufferRef<Self, Met>> {
456 if queue.is_on() {
457 queue.enqueue_ready(fd)?;
458 } else {
459 queue.dequeue_all();
460 queue.enqueue_ready(fd)?;
461 queue.on(fd)?;
462 }
463 queue.dequeue(fd)
464 }
465}
466
467impl DirectionImpl for Out {
468 fn next<Met: Method>(
469 queue: &Internal<QueueData<Self, Met>>,
470 fd: RawFd,
471 ) -> Result<BufferRef<Self, Met>> {
472 queue.enqueue_ready(fd)?;
473 if queue.is_on() {
474 queue.dequeue(fd)
475 } else {
476 if let Some(buffer) = queue.dequeue_unused() {
477 return Ok(buffer);
478 }
479 queue.on(fd)?;
480 queue.dequeue(fd)
481 }
482 }
483}
484
485pub struct BufferRef<Dir, Met: Method> {
486 data: Ref<Mut<BufferState<Met>>>,
487 _phantom: PhantomData<Dir>,
488}
489
490impl<Dir, Met: Method> BufferRef<Dir, Met> {
491 #[inline(always)]
492 fn new(data: &Ref<Mut<BufferState<Met>>>) -> Self {
493 Self {
494 data: data.clone(),
495 _phantom: PhantomData,
496 }
497 }
498
499 pub fn lock(&self) -> BufferData<'_, Dir, Met> {
501 BufferData {
502 data: self.data.lock(),
503 _phantom: PhantomData,
504 }
505 }
506
507 pub fn try_lock(&self) -> Option<BufferData<'_, Dir, Met>> {
509 Some(BufferData {
510 data: self.data.try_lock()?,
511 _phantom: PhantomData,
512 })
513 }
514}
515
516impl<Dir, Met: Method> core::fmt::Display for BufferRef<Dir, Met> {
517 fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
518 self.data.lock().buffer.as_ref().fmt(f)
519 }
520}
521
522pub struct BufferData<'r, Dir, Met: Method> {
523 data: Lock<'r, BufferState<Met>>,
524 _phantom: PhantomData<Dir>,
525}
526
527impl<'r, Dir, Met: Method> core::ops::Deref for BufferData<'r, Dir, Met> {
528 type Target = Buffer;
529
530 fn deref(&self) -> &Self::Target {
531 &self.data.buffer
532 }
533}
534
535impl<'r, Met: Method> core::ops::DerefMut for BufferData<'r, Out, Met> {
536 fn deref_mut(&mut self) -> &mut Self::Target {
537 &mut self.data.buffer
538 }
539}
540
541impl<'r, Dir, Met: Method> AsRef<[u8]> for BufferData<'r, Dir, Met> {
542 fn as_ref(&self) -> &[u8] {
543 unsafe { core::slice::from_raw_parts(self.data.pointer, self.len()) }
544 }
545}
546
547impl<'r, Met: Method> AsMut<[u8]> for BufferData<'r, Out, Met> {
548 fn as_mut(&mut self) -> &mut [u8] {
549 unsafe { core::slice::from_raw_parts_mut(self.data.pointer, self.len()) }
550 }
551}
552
553impl<'r, Met: Method> BufferData<'r, Out, Met> {
554 pub fn set_len(&mut self, len: usize) {
559 self.data.buffer.bytes_used = self.data.buffer.length.min(len as _);
560 }
561}
562
563impl<'r, Dir, Met: Method> BufferData<'r, Dir, Met> {
564 pub fn is_empty(&self) -> bool {
566 self.data.buffer.bytes_used == 0
567 }
568
569 pub fn len(&self) -> usize {
571 self.data.buffer.bytes_used as _
572 }
573
574 pub fn capacity(&self) -> usize {
576 self.data.buffer.length as _
577 }
578}
579
580impl<'r, Dir, Met: Method> core::fmt::Display for BufferData<'r, Dir, Met> {
581 fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
582 self.data.buffer.as_ref().fmt(f)
583 }
584}