libpulse_tokio/
lib.rs

1use libc::timeval;
2use libpulse_binding::context::{self, Context};
3use libpulse_binding::def::{Retval, RetvalActual};
4use libpulse_binding::mainloop::api::DeferEventCb;
5use libpulse_binding::mainloop::api::DeferEventDestroyCb;
6use libpulse_binding::mainloop::api::IoEventCb;
7use libpulse_binding::mainloop::api::IoEventDestroyCb;
8use libpulse_binding::mainloop::api::Mainloop as MainloopTrait;
9use libpulse_binding::mainloop::api::MainloopApi;
10use libpulse_binding::mainloop::api::TimeEventCb;
11use libpulse_binding::mainloop::api::TimeEventDestroyCb;
12use libpulse_binding::mainloop::api::{MainloopInnerType, MainloopInternalType};
13use libpulse_binding::mainloop::events::deferred::DeferEventInternal;
14use libpulse_binding::mainloop::events::io::FlagSet as IoEventFlagSet;
15use libpulse_binding::mainloop::events::io::IoEventInternal;
16use libpulse_binding::mainloop::events::timer::TimeEventInternal;
17use std::cell::{Cell, UnsafeCell};
18use std::fmt;
19use std::future::{Future,poll_fn};
20use std::os::raw::c_void;
21use std::os::unix::io::{AsRawFd, RawFd};
22use std::pin::Pin;
23use std::rc::{Rc, Weak};
24use std::task;
25use std::time::{Duration, SystemTime, UNIX_EPOCH};
26use tokio::io::unix::AsyncFd;
27
28struct Fd(RawFd);
29
30impl AsRawFd for Fd {
31    fn as_raw_fd(&self) -> RawFd {
32        self.0
33    }
34}
35
36enum Item {
37    Defer {
38        main: Weak<MainInner>,
39        dead: bool,
40        enabled: bool,
41        cb: Option<DeferEventCb>,
42        userdata: *mut c_void,
43        free: Option<DeferEventDestroyCb>,
44    },
45    Timer {
46        main: Weak<MainInner>,
47        dead: bool,
48        ts: Cell<Option<Duration>>,
49        cb: Option<TimeEventCb>,
50        userdata: *mut c_void,
51        free: Option<TimeEventDestroyCb>,
52    },
53    Event {
54        main: Weak<MainInner>,
55        dead: Cell<bool>,
56        fd: i32,
57        afd: Cell<Option<AsyncFd<Fd>>>,
58        cb: Option<IoEventCb>,
59        events: Cell<IoEventFlagSet>,
60        userdata: *mut c_void,
61        free: Option<IoEventDestroyCb>,
62    },
63}
64
65impl Item {
66    fn is_dead(&self) -> bool {
67        match self {
68            Item::Defer { dead, .. } | Item::Timer { dead, .. } => *dead,
69            Item::Event { dead, .. } => dead.get(),
70        }
71    }
72
73    fn kill(&mut self) {
74        match self {
75            Item::Defer { dead, .. } | Item::Timer { dead, .. } => {
76                *dead = true;
77            }
78            Item::Event { .. } => unreachable!(),
79        }
80    }
81}
82
83/// An implementation of the [pulse](libpulse_binding) [Mainloop](MainloopTrait) trait that
84/// dispatches through tokio.
85#[derive(Debug)]
86pub struct TokioMain {
87    mi: Rc<MainInner>,
88}
89
90/// The state structure passed to pulse.
91pub struct MainInner {
92    api: MainloopApi,
93    /// Note: items are stored as raw pointers because the actual items are also available to C and
94    /// via iter_get_item.  Otherwise, they are Box pointers owned by this vector.
95    items: UnsafeCell<Vec<*mut Item>>,
96    /// Note: only allow access following the rules for Pin
97    sleep: UnsafeCell<Option<tokio::time::Sleep>>,
98    waker: Cell<Option<task::Waker>>,
99    quit: Cell<Option<RetvalActual>>,
100}
101
102impl fmt::Debug for MainInner {
103    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
104        write!(fmt, "MainInner")
105    }
106}
107
108impl MainloopTrait for TokioMain {
109    type MI = MainInner;
110    fn inner(&self) -> Rc<MainInner> {
111        self.mi.clone()
112    }
113}
114
115impl MainloopInternalType for MainInner {}
116
117impl MainloopInnerType for MainInner {
118    type I = Self;
119    fn get_ptr(&self) -> *mut Self {
120        panic!("This function is not well-defined and is never called")
121    }
122
123    fn get_api_ptr(&self) -> *const MainloopApi {
124        &self.api
125    }
126
127    fn get_api(&self) -> &MainloopApi {
128        &self.api
129    }
130
131    fn supports_rtclock(&self) -> bool {
132        false
133    }
134}
135
136impl Drop for MainInner {
137    fn drop(&mut self) {
138        unsafe {
139            // drop the cyclic weak reference set up by TokioMain::new
140            Weak::from_raw(self.api.userdata as *mut MainInner);
141            // drop any remaining items (they should all be dead anyway)
142            for item in self.items.get_mut().drain(..) {
143                drop(Box::from_raw(item));
144            }
145        }
146    }
147}
148
149impl TokioMain {
150    pub fn new() -> Self {
151        let mut mi = Rc::new(MainInner {
152            api: MainloopApi {
153                userdata: 0 as *mut _,
154                io_new: Some(MainInner::io_new),
155                io_enable: Some(MainInner::io_enable),
156                io_free: Some(MainInner::io_free),
157                io_set_destroy: Some(MainInner::io_set_destroy),
158                time_new: Some(MainInner::time_new),
159                time_restart: Some(MainInner::time_restart),
160                time_free: Some(MainInner::time_free),
161                time_set_destroy: Some(MainInner::time_set_destroy),
162                defer_new: Some(MainInner::defer_new),
163                defer_enable: Some(MainInner::defer_enable),
164                defer_free: Some(MainInner::defer_free),
165                defer_set_destroy: Some(MainInner::defer_set_destroy),
166                quit: Some(MainInner::quit),
167            },
168            items: UnsafeCell::new(Vec::new()),
169            sleep: UnsafeCell::new(None),
170            waker: Cell::new(None),
171            quit: Cell::new(None),
172        });
173        let v = Rc::get_mut(&mut mi).unwrap();
174        v.api.userdata = v as *mut MainInner as *mut _;
175        let _cyclic = Rc::downgrade(&mi).into_raw();
176        TokioMain { mi }
177    }
178
179    fn iter_get_item(&mut self, i: usize) -> Option<(&MainloopApi, &Item)> {
180        let api = &self.mi.api;
181        let items = unsafe { &mut *self.mi.items.get() };
182        loop {
183            if i >= items.len() {
184                return None;
185            }
186            if unsafe { (*items[i]).is_dead() } {
187                let mut dead = unsafe { Box::from_raw(items.swap_remove(i)) };
188                match &*dead {
189                    &Item::Defer {
190                        free: Some(cb),
191                        userdata,
192                        ..
193                    } => {
194                        let raw_item = &mut *dead as *mut Item;
195                        cb(api, raw_item as *mut _, userdata);
196                    }
197                    &Item::Timer {
198                        free: Some(cb),
199                        userdata,
200                        ..
201                    } => {
202                        let raw_item = &mut *dead as *mut Item;
203                        cb(api, raw_item as *mut _, userdata);
204                    }
205                    &Item::Event {
206                        free: Some(cb),
207                        userdata,
208                        ..
209                    } => {
210                        let raw_item = &mut *dead as *mut Item;
211                        cb(api, raw_item as *mut _, userdata);
212                    }
213                    _ => {}
214                }
215                drop(dead);
216                continue;
217            }
218            let item = unsafe { &*items[i] };
219            return Some((api, item));
220        }
221    }
222
223    /// Run callbacks and register wakers for the pulse mainloop.
224    ///
225    /// This returns Ready if a callback was invoked, or Pending if everything is waiting on timers
226    /// or I/O.  The async run() or wait_for_ready() functions call this function internally.
227    pub fn tick(&mut self, ctx: &mut task::Context) -> task::Poll<Option<Retval>> {
228        let inow = tokio::time::Instant::now();
229        let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
230        let mut wake = None::<Duration>;
231        let mut rv = task::Poll::Pending;
232        let mut i = 0;
233        self.mi.waker.set(Some(ctx.waker().clone()));
234        while let Some((api, item)) = self.iter_get_item(i) {
235            let raw_item = item as *const Item;
236            i += 1;
237            match &*item {
238                &Item::Defer {
239                    enabled: true,
240                    cb: Some(cb),
241                    userdata,
242                    ..
243                } => {
244                    cb(api, raw_item as *mut _, userdata);
245                }
246                &Item::Defer { .. } => continue,
247                &Item::Timer { cb: None, .. } => continue,
248                &Item::Timer {
249                    cb: Some(cb),
250                    userdata,
251                    ref ts,
252                    ..
253                } => {
254                    match ts.replace(None) {
255                        Some(ts) if ts < now => {
256                            rv = task::Poll::Ready(None);
257                            let tv = timeval {
258                                tv_sec: ts.as_secs() as i64,
259                                tv_usec: ts.subsec_micros() as i64,
260                            };
261                            cb(api, raw_item as *mut _, &tv, userdata);
262                        }
263                        later => ts.set(later),
264                    }
265
266                    if let Some(ts) = ts.get() {
267                        if wake.is_some() {
268                            wake = std::cmp::min(wake, Some(ts));
269                        } else {
270                            wake = Some(ts);
271                        }
272                    }
273                }
274                &Item::Event { cb: None, .. } => continue,
275                &Item::Event {
276                    cb: Some(cb),
277                    userdata,
278                    fd,
279                    ref afd,
280                    ref events,
281                    ref dead,
282                    ..
283                } => {
284                    // Operate on a local, mutable version of the AsyncFd and restore it later
285                    let mut local_fd = afd.take();
286
287                    // this creation of AsyncFd is a bit lazy to allow deletions to happen
288                    // first, since tokio may reject attempts to add duplicate FDs
289                    let async_fd = local_fd
290                        .get_or_insert_with(|| AsyncFd::new(Fd(fd)).expect("Pulse fed a bad FD"));
291                    let mut ready = IoEventFlagSet::NULL;
292                    let mut rg = None;
293                    let mut wg = None;
294                    if events.get().contains(IoEventFlagSet::INPUT) {
295                        match async_fd.poll_read_ready(ctx) {
296                            task::Poll::Ready(Ok(g)) => {
297                                ready |= IoEventFlagSet::INPUT;
298                                rg = Some(g);
299                            }
300                            task::Poll::Ready(Err(_)) => ready |= IoEventFlagSet::ERROR,
301                            task::Poll::Pending => {}
302                        }
303                    }
304                    if events.get().contains(IoEventFlagSet::OUTPUT) {
305                        match async_fd.poll_write_ready(ctx) {
306                            task::Poll::Ready(Ok(g)) => {
307                                ready |= IoEventFlagSet::OUTPUT;
308                                wg = Some(g);
309                            }
310                            task::Poll::Ready(Err(_)) => ready |= IoEventFlagSet::ERROR,
311                            task::Poll::Pending => {}
312                        }
313                    }
314                    if ready == IoEventFlagSet::NULL {
315                        afd.set(local_fd);
316                        continue;
317                    }
318
319                    rv = task::Poll::Ready(None);
320                    cb(api, raw_item as *mut _, fd, ready, userdata);
321                    if dead.get() {
322                        // don't restore afd if it was supposed to be dead
323                        continue;
324                    }
325                    let wants = events.get();
326                    if wants.intersects(ready) {
327                        // pulse still wants an event that was reported as ready.  We might
328                        // need to inform tokio that the FD is not ready
329                        let mut pfd = libc::pollfd {
330                            fd: fd,
331                            events: 0,
332                            revents: 0,
333                        };
334                        if wants.contains(IoEventFlagSet::INPUT) && rg.is_some() {
335                            pfd.events |= libc::POLLIN;
336                        }
337                        if wants.contains(IoEventFlagSet::OUTPUT) && wg.is_some() {
338                            pfd.events |= libc::POLLOUT;
339                        }
340                        unsafe {
341                            libc::poll(&mut pfd, 1, 0);
342                        }
343                        if let Some(mut g) = rg {
344                            if (pfd.revents & libc::POLLIN) != 0 {
345                                g.retain_ready();
346                            } else {
347                                g.clear_ready();
348                            }
349                        }
350                        if let Some(mut g) = wg {
351                            if (pfd.revents & libc::POLLOUT) != 0 {
352                                g.retain_ready();
353                            } else {
354                                g.clear_ready();
355                            }
356                        }
357                    }
358                    afd.set(local_fd);
359                }
360            }
361        }
362        if let Some(ret) = self.mi.quit.replace(None) {
363            return task::Poll::Ready(Some(Retval(ret)));
364        }
365        if rv.is_pending() {
366            // Note: we can't pin the Rc normally because the Mainloop trait inner() returns an
367            // unpinned Rc.  However, the value is in fact pinned because the TokioMain::inner Rc
368            // is not directly exposed (preventing Rc::try_unwrap from succeeding), and the Drop
369            // impl for that struct overwrites the field with None.
370            let mut sleep = unsafe { Pin::new_unchecked(&mut *self.mi.sleep.get()) };
371            if let Some(d) = wake {
372                sleep.set(Some(tokio::time::sleep_until(inow + d)));
373                match sleep.as_mut().as_pin_mut().map(|f| f.poll(ctx)) {
374                    Some(task::Poll::Ready(())) => {
375                        sleep.set(None);
376                        rv = task::Poll::Ready(None);
377                    }
378                    _ => {}
379                }
380            } else {
381                sleep.set(None);
382            }
383        }
384        rv
385    }
386
387    /// Run the mainloop until the given context is either Ready or Failed/Terminated.
388    ///
389    /// When initializing a single Context object, this can be simpler use than registering a state
390    /// callback.  You will need to call run on another task to actually use the context.
391    pub async fn wait_for_ready(&mut self, ctx: &Context) -> Result<context::State, Retval> {
392        loop {
393            match poll_fn(|ctx| self.tick(ctx)).await {
394                Some(rv) => return Err(rv),
395                None => {}
396            }
397            let s = ctx.get_state();
398            match s {
399                context::State::Ready | context::State::Failed | context::State::Terminated => {
400                    return Ok(s);
401                }
402                _ => {}
403            }
404        }
405    }
406
407    /// Run the mainloop until a quit is requested through the pulse API
408    pub async fn run(&mut self) -> Retval {
409        loop {
410            match poll_fn(|ctx| self.tick(ctx)).await {
411                Some(rv) => return rv,
412                None => (),
413            }
414        }
415    }
416}
417
418impl Drop for TokioMain {
419    fn drop(&mut self) {
420        let mut sleep = unsafe { Pin::new_unchecked(&mut *self.mi.sleep.get()) };
421        sleep.set(None);
422        // It is now safe to move MainInner
423    }
424}
425
426impl MainInner {
427    unsafe fn from_api(api: *const MainloopApi) -> Rc<Self> {
428        let ptr = Weak::from_raw((*api).userdata as *const Self);
429        let rv = ptr.upgrade();
430        let _ = ptr.into_raw(); // we only want to borrow the Weak, not own it...
431        rv.expect("Called from_api on a dropped MainloopApi")
432    }
433
434    fn push(&self, item: Box<Item>) {
435        let items = unsafe { &mut *self.items.get() };
436        items.push(Box::into_raw(item));
437    }
438
439    fn wake(main: &Weak<MainInner>) {
440        main.upgrade().map(|inner| inner.wake_real());
441    }
442
443    fn wake_real(&self) {
444        self.waker.take().map(|waker| waker.wake());
445    }
446
447    extern "C" fn io_new(
448        a: *const MainloopApi,
449        fd: i32,
450        events: IoEventFlagSet,
451        cb: Option<IoEventCb>,
452        userdata: *mut c_void,
453    ) -> *mut IoEventInternal {
454        unsafe {
455            let inner = MainInner::from_api(a);
456            let events = Cell::new(events);
457            let mut item = Box::new(Item::Event {
458                fd,
459                cb,
460                events,
461                userdata,
462                free: None,
463                afd: Cell::new(None),
464                dead: Cell::new(false),
465                main: Rc::downgrade(&inner),
466            });
467            let rv = &mut *item as *mut Item as *mut _;
468            inner.push(item);
469            inner.wake_real();
470            rv
471        }
472    }
473    extern "C" fn io_enable(e: *mut IoEventInternal, new: IoEventFlagSet) {
474        unsafe {
475            let item: *mut Item = e.cast();
476            match &*item {
477                Item::Event { main, events, .. } => {
478                    events.set(new);
479                    MainInner::wake(main);
480                }
481                _ => panic!(),
482            }
483        }
484    }
485    extern "C" fn io_free(e: *mut IoEventInternal) {
486        unsafe {
487            let item: *mut Item = e.cast();
488            match &*item {
489                Item::Event { dead, afd, .. } => {
490                    dead.set(true);
491                    afd.set(None);
492                }
493                _ => panic!(),
494            }
495        }
496    }
497    extern "C" fn io_set_destroy(e: *mut IoEventInternal, cb: Option<IoEventDestroyCb>) {
498        unsafe {
499            let item: *mut Item = e.cast();
500            match &mut *item {
501                Item::Event { free, .. } => {
502                    *free = cb;
503                }
504                _ => panic!(),
505            }
506        }
507    }
508    extern "C" fn time_new(
509        a: *const MainloopApi,
510        tv: *const timeval,
511        cb: Option<TimeEventCb>,
512        userdata: *mut c_void,
513    ) -> *mut TimeEventInternal {
514        unsafe {
515            let inner = MainInner::from_api(a);
516            let tv = tv.read();
517            let ts = Cell::new(Some(
518                Duration::from_secs(tv.tv_sec as u64) + Duration::from_micros(tv.tv_usec as u64),
519            ));
520            let mut item = Box::new(Item::Timer {
521                main: Rc::downgrade(&inner),
522                ts,
523                cb,
524                userdata,
525                free: None,
526                dead: false,
527            });
528            let rv = &mut *item as *mut Item as *mut _;
529            inner.push(item);
530            inner.wake_real();
531            rv
532        }
533    }
534    extern "C" fn time_restart(e: *mut TimeEventInternal, tv: *const timeval) {
535        unsafe {
536            let item: *mut Item = e.cast();
537            match &*item {
538                Item::Timer { main, ts, .. } => {
539                    let tv = tv.read();
540                    ts.set(Some(
541                        Duration::from_secs(tv.tv_sec as u64)
542                            + Duration::from_micros(tv.tv_usec as u64),
543                    ));
544                    MainInner::wake(main);
545                }
546                _ => panic!(),
547            }
548        }
549    }
550    extern "C" fn time_free(e: *mut TimeEventInternal) {
551        unsafe {
552            let item: *mut Item = e.cast();
553            (*item).kill();
554        }
555    }
556    extern "C" fn time_set_destroy(e: *mut TimeEventInternal, cb: Option<TimeEventDestroyCb>) {
557        unsafe {
558            let item: *mut Item = e.cast();
559            match &mut *item {
560                Item::Timer { free, .. } => {
561                    *free = cb;
562                }
563                _ => panic!(),
564            }
565        }
566    }
567    extern "C" fn defer_new(
568        a: *const MainloopApi,
569        cb: Option<DeferEventCb>,
570        userdata: *mut c_void,
571    ) -> *mut DeferEventInternal {
572        unsafe {
573            let inner = MainInner::from_api(a);
574            let mut item = Box::new(Item::Defer {
575                main: Rc::downgrade(&inner),
576                cb,
577                userdata,
578                free: None,
579                dead: false,
580                enabled: true,
581            });
582            let rv = &mut *item as *mut Item as *mut _;
583            inner.push(item);
584            inner.wake_real();
585            rv
586        }
587    }
588    extern "C" fn defer_enable(e: *mut DeferEventInternal, b: i32) {
589        unsafe {
590            let item: *mut Item = e.cast();
591            match &mut *item {
592                Item::Defer { main, enabled, .. } => {
593                    *enabled = b != 0;
594                    if b != 0 {
595                        MainInner::wake(main);
596                    }
597                }
598                _ => panic!(),
599            }
600        }
601    }
602    extern "C" fn defer_free(e: *mut DeferEventInternal) {
603        unsafe {
604            let item: *mut Item = e.cast();
605            (*item).kill();
606        }
607    }
608    extern "C" fn defer_set_destroy(e: *mut DeferEventInternal, cb: Option<DeferEventDestroyCb>) {
609        unsafe {
610            let item: *mut Item = e.cast();
611            match &mut *item {
612                Item::Defer { free, .. } => {
613                    *free = cb;
614                }
615                _ => panic!(),
616            }
617        }
618    }
619    extern "C" fn quit(a: *const MainloopApi, retval: RetvalActual) {
620        unsafe {
621            let inner = MainInner::from_api(a);
622            inner.quit.set(Some(retval));
623            inner.wake_real();
624        }
625    }
626}