linux_video_core/impls/
buffer.rs

1use crate::{
2    calls,
3    safe_ref::{Lock, Mut, Ref},
4    types::*,
5    ContentType, Direction, DirectionImpl, In, Internal, IsTimestamp, MethodImpl, Out, Result,
6};
7use core::{
8    marker::PhantomData,
9    mem::{ManuallyDrop, MaybeUninit},
10    num::NonZeroUsize,
11};
12use getset::CopyGetters;
13use std::{
14    collections::VecDeque,
15    os::unix::io::RawFd,
16    sync::atomic::{AtomicBool, Ordering},
17};
18
19impl Internal<RequestBuffers> {
20    /// Request buffers
21    pub fn request(fd: RawFd, type_: BufferType, memory: Memory, count: u32) -> Result<Self> {
22        let req_bufs = MaybeUninit::<RequestBuffers>::zeroed();
23
24        unsafe_call!({
25            let mut req_bufs = req_bufs.assume_init();
26            req_bufs.type_ = type_;
27            req_bufs.memory = memory;
28            req_bufs.count = count;
29            calls::req_bufs(fd, &mut req_bufs).map(|_| req_bufs.into())
30        })
31    }
32}
33
34impl Buffer {
35    /// Get timestamp
36    pub fn timestamp<T: IsTimestamp>(&self) -> T {
37        T::from_time_val(self.timestamp)
38    }
39
40    /// Set timestamp
41    pub fn set_timestamp<T: IsTimestamp>(&mut self, time: T) {
42        self.timestamp = time.into_time_val();
43    }
44
45    /// Buffer has time code
46    pub fn has_timecode(&self) -> bool {
47        self.flags.contains(BufferFlag::TimeCode)
48    }
49
50    /// Buffer time code
51    pub fn timecode(&self) -> Option<TimeCode> {
52        if self.has_timecode() {
53            Some(self.timecode)
54        } else {
55            None
56        }
57    }
58
59    /// Set time code
60    pub fn set_timecode(&mut self, timecode: Option<TimeCode>) {
61        if let Some(timecode) = timecode {
62            self.timecode = timecode;
63            self.flags |= BufferFlag::TimeCode;
64        } else {
65            self.timecode = unsafe { MaybeUninit::zeroed().assume_init() };
66            self.flags &= !BufferFlag::TimeCode;
67        }
68    }
69
70    /// Is buffer locked by driver
71    pub fn is_queued(&self) -> bool {
72        self.flags.contains(BufferFlag::Queued)
73    }
74}
75
76impl core::fmt::Display for Buffer {
77    fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
78        '#'.fmt(f)?;
79        self.sequence.fmt(f)?;
80        " @".fmt(f)?;
81        self.index.fmt(f)?;
82        ' '.fmt(f)?;
83        self.timestamp.fmt(f)?;
84        if self.has_timecode() {
85            ' '.fmt(f)?;
86            self.timecode.fmt(f)?;
87        }
88        ' '.fmt(f)?;
89        self.type_.fmt(f)?;
90        ' '.fmt(f)?;
91        self.memory.fmt(f)?;
92        ' '.fmt(f)?;
93        self.bytes_used.fmt(f)?;
94        '/'.fmt(f)?;
95        self.length.fmt(f)?;
96        if !self.flags.is_none() {
97            ' '.fmt(f)?;
98            self.flags.fmt(f)?;
99        }
100        if self.field != Field::None {
101            ' '.fmt(f)?;
102            self.field.fmt(f)?;
103        }
104        Ok(())
105    }
106}
107
108impl Internal<Buffer> {
109    /// Instantiate buffer
110    pub fn new(type_: BufferType, memory: Memory, index: u32) -> Self {
111        let buffer = MaybeUninit::<Buffer>::zeroed();
112        let mut buffer = unsafe { buffer.assume_init() };
113
114        buffer.type_ = type_;
115        buffer.memory = memory;
116        buffer.index = index;
117
118        buffer.into()
119    }
120
121    /// Query bufer by index
122    pub fn query(&mut self, fd: RawFd) -> Result<()> {
123        unsafe_call!(calls::query_buf(fd, self.as_mut()).map(|_| ()))
124    }
125
126    /// Queue buffer
127    pub fn queue(&mut self, fd: RawFd) -> Result<()> {
128        unsafe_call!(calls::q_buf(fd, self.as_mut()).map(|_| ()))
129    }
130
131    /// Dequeue buffer
132    pub fn dequeue(&mut self, fd: RawFd) -> Result<()> {
133        unsafe_call!(calls::dq_buf(fd, self.as_mut()).map(|_| ()))
134    }
135
136    pub fn mark_dequeued(&mut self) {
137        self.flags &= !BufferFlag::Queued;
138    }
139}
140
141/// I/O method types
142pub trait Method: MethodImpl {
143    /// Corresponding memory type
144    const MEMORY: Memory;
145}
146
147/// Memory mapping
148#[derive(Debug, Clone, Copy)]
149pub struct Mmap;
150
151impl Method for Mmap {
152    const MEMORY: Memory = Memory::Mmap;
153}
154
155impl MethodImpl for Mmap {
156    fn init(buffer: &Buffer, fd: RawFd) -> Result<*mut u8> {
157        use nix::sys::mman::{mmap, MapFlags, ProtFlags};
158
159        unsafe_call!(mmap(
160            None,
161            NonZeroUsize::new(buffer.length as _).unwrap(),
162            ProtFlags::PROT_READ | ProtFlags::PROT_WRITE,
163            MapFlags::MAP_SHARED,
164            fd,
165            buffer.m.offset as _,
166        ))
167        .map(|pointer| pointer as _)
168    }
169
170    fn done(buffer: &Buffer, pointer: *mut u8) {
171        use nix::sys::mman::munmap;
172
173        let _ = unsafe_call!(munmap(pointer as *mut _, buffer.length as _));
174    }
175}
176
177/// Userspace pointer
178#[derive(Debug, Clone, Copy)]
179pub struct UserPtr;
180
181impl Method for UserPtr {
182    const MEMORY: Memory = Memory::UserPtr;
183}
184
185impl MethodImpl for UserPtr {
186    fn init(buffer: &Buffer, _fd: RawFd) -> Result<*mut u8> {
187        let mut buffer = Vec::<u8>::with_capacity(buffer.length as _);
188
189        let pointer = buffer.as_mut_ptr();
190
191        let _ = ManuallyDrop::new(buffer);
192
193        Ok(pointer)
194    }
195
196    #[allow(clippy::not_unsafe_ptr_arg_deref)]
197    fn done(buffer: &Buffer, pointer: *mut u8) {
198        let _ = unsafe { Vec::<u8>::from_raw_parts(pointer, 0, buffer.length as _) };
199    }
200
201    fn update(buffer: &mut Buffer, pointer: *mut u8) {
202        buffer.m.userptr = pointer as _;
203    }
204}
205
206struct BufferState<Met: Method> {
207    pointer: *mut u8,
208    buffer: Internal<Buffer>,
209    _phantom: PhantomData<Met>,
210}
211
212unsafe impl<Met: Method> Send for BufferState<Met> {}
213
214impl<Met: Method> core::ops::Deref for BufferState<Met> {
215    type Target = Buffer;
216
217    fn deref(&self) -> &Self::Target {
218        &self.buffer
219    }
220}
221
222impl<Met: Method> core::ops::DerefMut for BufferState<Met> {
223    fn deref_mut(&mut self) -> &mut Self::Target {
224        &mut self.buffer
225    }
226}
227
228impl<Met: Method> BufferState<Met> {
229    fn new(fd: RawFd, buffer: Internal<Buffer>) -> Result<Self> {
230        let pointer = Met::init(&buffer, fd)?;
231
232        let data = Self {
233            pointer,
234            buffer,
235            _phantom: PhantomData,
236        };
237
238        Ok(data)
239    }
240
241    fn enqueue(&mut self, fd: RawFd) -> Result<()> {
242        // update buffer data
243        Met::update(&mut self.buffer, self.pointer);
244        // add buffer to queue
245        self.buffer.queue(fd)
246    }
247
248    fn reuse(&mut self, buffer: Internal<Buffer>) {
249        self.buffer = buffer;
250    }
251
252    fn mark_dequeued(&mut self) {
253        self.buffer.mark_dequeued()
254    }
255}
256
257impl<Met: Method> Drop for BufferState<Met> {
258    fn drop(&mut self) {
259        Met::done(&self.buffer, self.pointer)
260    }
261}
262
263#[derive(CopyGetters)]
264pub struct QueueData<Dir, Met: Method> {
265    /// Requested buffers
266    buffers: Vec<Ref<Mut<BufferState<Met>>>>,
267
268    /// Dequeued buffers indexes
269    dequeued: Mut<VecDeque<u32>>,
270
271    /// Stream on flag
272    on: AtomicBool,
273
274    /// Buffers type
275    #[getset(get_copy = "pub")]
276    buffer_type: Internal<BufferType>,
277
278    _phantom: PhantomData<Dir>,
279}
280
281impl<Dir, Met: Method> QueueData<Dir, Met> {
282    /// Queue is empty
283    pub fn is_empty(&self) -> bool {
284        self.buffers.is_empty()
285    }
286
287    /// Get actual number of buffers
288    pub fn len(&self) -> usize {
289        self.buffers.len()
290    }
291}
292
293impl<Dir, Met: Method> Internal<QueueData<Dir, Met>> {
294    /// Create buffers queue
295    pub fn new(fd: RawFd, content_type: ContentType, count: u32) -> Result<Self>
296    where
297        Dir: Direction,
298    {
299        let buffer_type = Dir::buffer_type(content_type);
300
301        let request_buffers =
302            Internal::<RequestBuffers>::request(fd, buffer_type, Met::MEMORY, count)?;
303
304        let count = request_buffers.count;
305
306        let mut buffers = Vec::with_capacity(count as _);
307
308        for index in 0..count {
309            let mut buffer = Internal::<Buffer>::new(buffer_type, Met::MEMORY, index);
310            buffer.query(fd)?;
311            let data = BufferState::new(fd, buffer)?;
312
313            buffers.push(Ref::new(Mut::new(data)));
314        }
315
316        Ok(QueueData {
317            buffers,
318            dequeued: Mut::new(VecDeque::with_capacity(count as _)),
319            on: AtomicBool::new(false),
320            buffer_type: buffer_type.into(),
321            _phantom: PhantomData,
322        }
323        .into())
324    }
325
326    /// Delete buffers queue
327    pub fn del(&mut self, fd: RawFd) -> Result<()> {
328        self.off(fd)?;
329
330        let _request_buffers =
331            Internal::<RequestBuffers>::request(fd, *self.buffer_type, Met::MEMORY, 0)?;
332
333        Ok(())
334    }
335
336    /// Is queue started
337    #[inline(always)]
338    fn is_on(&self) -> bool {
339        self.on.load(Ordering::SeqCst)
340    }
341
342    /// Start stream
343    fn on(&self, fd: RawFd) -> Result<()> {
344        let type_ = *self.buffer_type.as_ref() as int;
345
346        unsafe_call!(calls::stream_on(fd, &type_).map(|_| ()))?;
347
348        self.on.store(true, Ordering::SeqCst);
349
350        Ok(())
351    }
352
353    /// Stop stream and mark all buffers as dequeued
354    fn off(&self, fd: RawFd) -> Result<()> {
355        let type_ = *self.buffer_type.as_ref() as int;
356
357        unsafe_call!(calls::stream_off(fd, &type_).map(|_| ()))?;
358
359        self.on.store(false, Ordering::SeqCst);
360
361        // stream_off removes all buffers from both queues
362        // and unlocks all buffers as a side effect
363        self.dequeue_queued();
364
365        Ok(())
366    }
367
368    /// Dequeue all buffers
369    fn dequeue_all(&self) {
370        for index in 0..self.buffers.len() {
371            let buffer_ref = &self.buffers[index];
372            if Ref::strong_count(buffer_ref) == 1 {
373                buffer_ref.lock().mark_dequeued();
374                self.dequeued.lock().push_back(index as _);
375            }
376        }
377    }
378
379    /// Dequeue queued buffers
380    fn dequeue_queued(&self) {
381        for index in 0..self.buffers.len() {
382            let buffer_ref = &self.buffers[index];
383            if Ref::strong_count(buffer_ref) == 1 {
384                let mut buffer_data = buffer_ref.lock();
385                if buffer_data.is_queued() {
386                    buffer_data.mark_dequeued();
387                    self.dequeued.lock().push_back(index as _);
388                }
389            }
390        }
391    }
392
393    /// Dequeue single unused buffer
394    fn dequeue_unused(&self) -> Option<BufferRef<Dir, Met>> {
395        for index in 0..self.buffers.len() {
396            let buffer_ref = &self.buffers[index];
397            if Ref::strong_count(buffer_ref) == 1 && !buffer_ref.lock().is_queued() {
398                self.dequeued.lock().push_back(index as _);
399                return Some(BufferRef::new(buffer_ref));
400            }
401        }
402        None
403    }
404
405    /// Enqueue ready dequeued buffers
406    fn enqueue_ready(&self, fd: RawFd) -> Result<()> {
407        // we need enqueue only first N buffers which is ready
408        // (already processed by user)
409        while let Some(first) = {
410            let dequeued = self.dequeued.lock();
411            dequeued.front().copied()
412        } {
413            let buffer_ref = &self.buffers[first as usize];
414            if Ref::strong_count(buffer_ref) == 1 {
415                let mut buffer_data = buffer_ref.lock();
416                buffer_data.enqueue(fd)?;
417                self.dequeued.lock().pop_front();
418            } else {
419                // stop on first not ready buffer to preserve sequence
420                break;
421            }
422        }
423
424        Ok(())
425    }
426
427    /// Try dequeue buffer
428    fn dequeue(&self, fd: RawFd) -> Result<BufferRef<Dir, Met>> {
429        let mut buffer = Internal::<Buffer>::new(*self.buffer_type, Met::MEMORY, 0);
430        buffer.dequeue(fd)?;
431        let index = buffer.index as usize;
432        let buffer_ref = &self.buffers[index];
433        if Ref::strong_count(buffer_ref) == 1 {
434            buffer_ref.lock().reuse(buffer);
435            self.dequeued.lock().push_back(index as _);
436            Ok(BufferRef::new(buffer_ref))
437        } else {
438            unreachable!();
439        }
440    }
441
442    /// Get next buffer to read or write
443    pub fn next(&self, fd: RawFd) -> Result<BufferRef<Dir, Met>>
444    where
445        Dir: Direction,
446    {
447        Dir::next(self, fd)
448    }
449}
450
451impl DirectionImpl for In {
452    fn next<Met: Method>(
453        queue: &Internal<QueueData<Self, Met>>,
454        fd: RawFd,
455    ) -> Result<BufferRef<Self, Met>> {
456        if queue.is_on() {
457            queue.enqueue_ready(fd)?;
458        } else {
459            queue.dequeue_all();
460            queue.enqueue_ready(fd)?;
461            queue.on(fd)?;
462        }
463        queue.dequeue(fd)
464    }
465}
466
467impl DirectionImpl for Out {
468    fn next<Met: Method>(
469        queue: &Internal<QueueData<Self, Met>>,
470        fd: RawFd,
471    ) -> Result<BufferRef<Self, Met>> {
472        queue.enqueue_ready(fd)?;
473        if queue.is_on() {
474            queue.dequeue(fd)
475        } else {
476            if let Some(buffer) = queue.dequeue_unused() {
477                return Ok(buffer);
478            }
479            queue.on(fd)?;
480            queue.dequeue(fd)
481        }
482    }
483}
484
485pub struct BufferRef<Dir, Met: Method> {
486    data: Ref<Mut<BufferState<Met>>>,
487    _phantom: PhantomData<Dir>,
488}
489
490impl<Dir, Met: Method> BufferRef<Dir, Met> {
491    #[inline(always)]
492    fn new(data: &Ref<Mut<BufferState<Met>>>) -> Self {
493        Self {
494            data: data.clone(),
495            _phantom: PhantomData,
496        }
497    }
498
499    /// Get access to buffer data
500    pub fn lock(&self) -> BufferData<'_, Dir, Met> {
501        BufferData {
502            data: self.data.lock(),
503            _phantom: PhantomData,
504        }
505    }
506
507    /// Try get access to buffer data
508    pub fn try_lock(&self) -> Option<BufferData<'_, Dir, Met>> {
509        Some(BufferData {
510            data: self.data.try_lock()?,
511            _phantom: PhantomData,
512        })
513    }
514}
515
516impl<Dir, Met: Method> core::fmt::Display for BufferRef<Dir, Met> {
517    fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
518        self.data.lock().buffer.as_ref().fmt(f)
519    }
520}
521
522pub struct BufferData<'r, Dir, Met: Method> {
523    data: Lock<'r, BufferState<Met>>,
524    _phantom: PhantomData<Dir>,
525}
526
527impl<'r, Dir, Met: Method> core::ops::Deref for BufferData<'r, Dir, Met> {
528    type Target = Buffer;
529
530    fn deref(&self) -> &Self::Target {
531        &self.data.buffer
532    }
533}
534
535impl<'r, Met: Method> core::ops::DerefMut for BufferData<'r, Out, Met> {
536    fn deref_mut(&mut self) -> &mut Self::Target {
537        &mut self.data.buffer
538    }
539}
540
541impl<'r, Dir, Met: Method> AsRef<[u8]> for BufferData<'r, Dir, Met> {
542    fn as_ref(&self) -> &[u8] {
543        unsafe { core::slice::from_raw_parts(self.data.pointer, self.len()) }
544    }
545}
546
547impl<'r, Met: Method> AsMut<[u8]> for BufferData<'r, Out, Met> {
548    fn as_mut(&mut self) -> &mut [u8] {
549        unsafe { core::slice::from_raw_parts_mut(self.data.pointer, self.len()) }
550    }
551}
552
553impl<'r, Met: Method> BufferData<'r, Out, Met> {
554    /// Set new size of buffer
555    ///
556    /// New size should be less than or equal to capacity.
557    /// If new size greater than capacity it will be set to be equal to capacity.
558    pub fn set_len(&mut self, len: usize) {
559        self.data.buffer.bytes_used = self.data.buffer.length.min(len as _);
560    }
561}
562
563impl<'r, Dir, Met: Method> BufferData<'r, Dir, Met> {
564    /// Check no used bytes in buffer
565    pub fn is_empty(&self) -> bool {
566        self.data.buffer.bytes_used == 0
567    }
568
569    /// Get used data of buffer in bytes
570    pub fn len(&self) -> usize {
571        self.data.buffer.bytes_used as _
572    }
573
574    /// Get available buffer capacity in bytes
575    pub fn capacity(&self) -> usize {
576        self.data.buffer.length as _
577    }
578}
579
580impl<'r, Dir, Met: Method> core::fmt::Display for BufferData<'r, Dir, Met> {
581    fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
582        self.data.buffer.as_ref().fmt(f)
583    }
584}