1use libc::timeval;
2use libpulse_binding::context::{self, Context};
3use libpulse_binding::def::{Retval, RetvalActual};
4use libpulse_binding::mainloop::api::DeferEventCb;
5use libpulse_binding::mainloop::api::DeferEventDestroyCb;
6use libpulse_binding::mainloop::api::IoEventCb;
7use libpulse_binding::mainloop::api::IoEventDestroyCb;
8use libpulse_binding::mainloop::api::Mainloop as MainloopTrait;
9use libpulse_binding::mainloop::api::MainloopApi;
10use libpulse_binding::mainloop::api::TimeEventCb;
11use libpulse_binding::mainloop::api::TimeEventDestroyCb;
12use libpulse_binding::mainloop::api::{MainloopInnerType, MainloopInternalType};
13use libpulse_binding::mainloop::events::deferred::DeferEventInternal;
14use libpulse_binding::mainloop::events::io::FlagSet as IoEventFlagSet;
15use libpulse_binding::mainloop::events::io::IoEventInternal;
16use libpulse_binding::mainloop::events::timer::TimeEventInternal;
17use std::cell::{Cell, UnsafeCell};
18use std::fmt;
19use std::future::{Future,poll_fn};
20use std::os::raw::c_void;
21use std::os::unix::io::{AsRawFd, RawFd};
22use std::pin::Pin;
23use std::rc::{Rc, Weak};
24use std::task;
25use std::time::{Duration, SystemTime, UNIX_EPOCH};
26use tokio::io::unix::AsyncFd;
27
28struct Fd(RawFd);
29
30impl AsRawFd for Fd {
31 fn as_raw_fd(&self) -> RawFd {
32 self.0
33 }
34}
35
36enum Item {
37 Defer {
38 main: Weak<MainInner>,
39 dead: bool,
40 enabled: bool,
41 cb: Option<DeferEventCb>,
42 userdata: *mut c_void,
43 free: Option<DeferEventDestroyCb>,
44 },
45 Timer {
46 main: Weak<MainInner>,
47 dead: bool,
48 ts: Cell<Option<Duration>>,
49 cb: Option<TimeEventCb>,
50 userdata: *mut c_void,
51 free: Option<TimeEventDestroyCb>,
52 },
53 Event {
54 main: Weak<MainInner>,
55 dead: Cell<bool>,
56 fd: i32,
57 afd: Cell<Option<AsyncFd<Fd>>>,
58 cb: Option<IoEventCb>,
59 events: Cell<IoEventFlagSet>,
60 userdata: *mut c_void,
61 free: Option<IoEventDestroyCb>,
62 },
63}
64
65impl Item {
66 fn is_dead(&self) -> bool {
67 match self {
68 Item::Defer { dead, .. } | Item::Timer { dead, .. } => *dead,
69 Item::Event { dead, .. } => dead.get(),
70 }
71 }
72
73 fn kill(&mut self) {
74 match self {
75 Item::Defer { dead, .. } | Item::Timer { dead, .. } => {
76 *dead = true;
77 }
78 Item::Event { .. } => unreachable!(),
79 }
80 }
81}
82
83#[derive(Debug)]
86pub struct TokioMain {
87 mi: Rc<MainInner>,
88}
89
90pub struct MainInner {
92 api: MainloopApi,
93 items: UnsafeCell<Vec<*mut Item>>,
96 sleep: UnsafeCell<Option<tokio::time::Sleep>>,
98 waker: Cell<Option<task::Waker>>,
99 quit: Cell<Option<RetvalActual>>,
100}
101
102impl fmt::Debug for MainInner {
103 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
104 write!(fmt, "MainInner")
105 }
106}
107
108impl MainloopTrait for TokioMain {
109 type MI = MainInner;
110 fn inner(&self) -> Rc<MainInner> {
111 self.mi.clone()
112 }
113}
114
115impl MainloopInternalType for MainInner {}
116
117impl MainloopInnerType for MainInner {
118 type I = Self;
119 fn get_ptr(&self) -> *mut Self {
120 panic!("This function is not well-defined and is never called")
121 }
122
123 fn get_api_ptr(&self) -> *const MainloopApi {
124 &self.api
125 }
126
127 fn get_api(&self) -> &MainloopApi {
128 &self.api
129 }
130
131 fn supports_rtclock(&self) -> bool {
132 false
133 }
134}
135
136impl Drop for MainInner {
137 fn drop(&mut self) {
138 unsafe {
139 Weak::from_raw(self.api.userdata as *mut MainInner);
141 for item in self.items.get_mut().drain(..) {
143 drop(Box::from_raw(item));
144 }
145 }
146 }
147}
148
149impl TokioMain {
150 pub fn new() -> Self {
151 let mut mi = Rc::new(MainInner {
152 api: MainloopApi {
153 userdata: 0 as *mut _,
154 io_new: Some(MainInner::io_new),
155 io_enable: Some(MainInner::io_enable),
156 io_free: Some(MainInner::io_free),
157 io_set_destroy: Some(MainInner::io_set_destroy),
158 time_new: Some(MainInner::time_new),
159 time_restart: Some(MainInner::time_restart),
160 time_free: Some(MainInner::time_free),
161 time_set_destroy: Some(MainInner::time_set_destroy),
162 defer_new: Some(MainInner::defer_new),
163 defer_enable: Some(MainInner::defer_enable),
164 defer_free: Some(MainInner::defer_free),
165 defer_set_destroy: Some(MainInner::defer_set_destroy),
166 quit: Some(MainInner::quit),
167 },
168 items: UnsafeCell::new(Vec::new()),
169 sleep: UnsafeCell::new(None),
170 waker: Cell::new(None),
171 quit: Cell::new(None),
172 });
173 let v = Rc::get_mut(&mut mi).unwrap();
174 v.api.userdata = v as *mut MainInner as *mut _;
175 let _cyclic = Rc::downgrade(&mi).into_raw();
176 TokioMain { mi }
177 }
178
179 fn iter_get_item(&mut self, i: usize) -> Option<(&MainloopApi, &Item)> {
180 let api = &self.mi.api;
181 let items = unsafe { &mut *self.mi.items.get() };
182 loop {
183 if i >= items.len() {
184 return None;
185 }
186 if unsafe { (*items[i]).is_dead() } {
187 let mut dead = unsafe { Box::from_raw(items.swap_remove(i)) };
188 match &*dead {
189 &Item::Defer {
190 free: Some(cb),
191 userdata,
192 ..
193 } => {
194 let raw_item = &mut *dead as *mut Item;
195 cb(api, raw_item as *mut _, userdata);
196 }
197 &Item::Timer {
198 free: Some(cb),
199 userdata,
200 ..
201 } => {
202 let raw_item = &mut *dead as *mut Item;
203 cb(api, raw_item as *mut _, userdata);
204 }
205 &Item::Event {
206 free: Some(cb),
207 userdata,
208 ..
209 } => {
210 let raw_item = &mut *dead as *mut Item;
211 cb(api, raw_item as *mut _, userdata);
212 }
213 _ => {}
214 }
215 drop(dead);
216 continue;
217 }
218 let item = unsafe { &*items[i] };
219 return Some((api, item));
220 }
221 }
222
223 pub fn tick(&mut self, ctx: &mut task::Context) -> task::Poll<Option<Retval>> {
228 let inow = tokio::time::Instant::now();
229 let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
230 let mut wake = None::<Duration>;
231 let mut rv = task::Poll::Pending;
232 let mut i = 0;
233 self.mi.waker.set(Some(ctx.waker().clone()));
234 while let Some((api, item)) = self.iter_get_item(i) {
235 let raw_item = item as *const Item;
236 i += 1;
237 match &*item {
238 &Item::Defer {
239 enabled: true,
240 cb: Some(cb),
241 userdata,
242 ..
243 } => {
244 cb(api, raw_item as *mut _, userdata);
245 }
246 &Item::Defer { .. } => continue,
247 &Item::Timer { cb: None, .. } => continue,
248 &Item::Timer {
249 cb: Some(cb),
250 userdata,
251 ref ts,
252 ..
253 } => {
254 match ts.replace(None) {
255 Some(ts) if ts < now => {
256 rv = task::Poll::Ready(None);
257 let tv = timeval {
258 tv_sec: ts.as_secs() as i64,
259 tv_usec: ts.subsec_micros() as i64,
260 };
261 cb(api, raw_item as *mut _, &tv, userdata);
262 }
263 later => ts.set(later),
264 }
265
266 if let Some(ts) = ts.get() {
267 if wake.is_some() {
268 wake = std::cmp::min(wake, Some(ts));
269 } else {
270 wake = Some(ts);
271 }
272 }
273 }
274 &Item::Event { cb: None, .. } => continue,
275 &Item::Event {
276 cb: Some(cb),
277 userdata,
278 fd,
279 ref afd,
280 ref events,
281 ref dead,
282 ..
283 } => {
284 let mut local_fd = afd.take();
286
287 let async_fd = local_fd
290 .get_or_insert_with(|| AsyncFd::new(Fd(fd)).expect("Pulse fed a bad FD"));
291 let mut ready = IoEventFlagSet::NULL;
292 let mut rg = None;
293 let mut wg = None;
294 if events.get().contains(IoEventFlagSet::INPUT) {
295 match async_fd.poll_read_ready(ctx) {
296 task::Poll::Ready(Ok(g)) => {
297 ready |= IoEventFlagSet::INPUT;
298 rg = Some(g);
299 }
300 task::Poll::Ready(Err(_)) => ready |= IoEventFlagSet::ERROR,
301 task::Poll::Pending => {}
302 }
303 }
304 if events.get().contains(IoEventFlagSet::OUTPUT) {
305 match async_fd.poll_write_ready(ctx) {
306 task::Poll::Ready(Ok(g)) => {
307 ready |= IoEventFlagSet::OUTPUT;
308 wg = Some(g);
309 }
310 task::Poll::Ready(Err(_)) => ready |= IoEventFlagSet::ERROR,
311 task::Poll::Pending => {}
312 }
313 }
314 if ready == IoEventFlagSet::NULL {
315 afd.set(local_fd);
316 continue;
317 }
318
319 rv = task::Poll::Ready(None);
320 cb(api, raw_item as *mut _, fd, ready, userdata);
321 if dead.get() {
322 continue;
324 }
325 let wants = events.get();
326 if wants.intersects(ready) {
327 let mut pfd = libc::pollfd {
330 fd: fd,
331 events: 0,
332 revents: 0,
333 };
334 if wants.contains(IoEventFlagSet::INPUT) && rg.is_some() {
335 pfd.events |= libc::POLLIN;
336 }
337 if wants.contains(IoEventFlagSet::OUTPUT) && wg.is_some() {
338 pfd.events |= libc::POLLOUT;
339 }
340 unsafe {
341 libc::poll(&mut pfd, 1, 0);
342 }
343 if let Some(mut g) = rg {
344 if (pfd.revents & libc::POLLIN) != 0 {
345 g.retain_ready();
346 } else {
347 g.clear_ready();
348 }
349 }
350 if let Some(mut g) = wg {
351 if (pfd.revents & libc::POLLOUT) != 0 {
352 g.retain_ready();
353 } else {
354 g.clear_ready();
355 }
356 }
357 }
358 afd.set(local_fd);
359 }
360 }
361 }
362 if let Some(ret) = self.mi.quit.replace(None) {
363 return task::Poll::Ready(Some(Retval(ret)));
364 }
365 if rv.is_pending() {
366 let mut sleep = unsafe { Pin::new_unchecked(&mut *self.mi.sleep.get()) };
371 if let Some(d) = wake {
372 sleep.set(Some(tokio::time::sleep_until(inow + d)));
373 match sleep.as_mut().as_pin_mut().map(|f| f.poll(ctx)) {
374 Some(task::Poll::Ready(())) => {
375 sleep.set(None);
376 rv = task::Poll::Ready(None);
377 }
378 _ => {}
379 }
380 } else {
381 sleep.set(None);
382 }
383 }
384 rv
385 }
386
387 pub async fn wait_for_ready(&mut self, ctx: &Context) -> Result<context::State, Retval> {
392 loop {
393 match poll_fn(|ctx| self.tick(ctx)).await {
394 Some(rv) => return Err(rv),
395 None => {}
396 }
397 let s = ctx.get_state();
398 match s {
399 context::State::Ready | context::State::Failed | context::State::Terminated => {
400 return Ok(s);
401 }
402 _ => {}
403 }
404 }
405 }
406
407 pub async fn run(&mut self) -> Retval {
409 loop {
410 match poll_fn(|ctx| self.tick(ctx)).await {
411 Some(rv) => return rv,
412 None => (),
413 }
414 }
415 }
416}
417
418impl Drop for TokioMain {
419 fn drop(&mut self) {
420 let mut sleep = unsafe { Pin::new_unchecked(&mut *self.mi.sleep.get()) };
421 sleep.set(None);
422 }
424}
425
426impl MainInner {
427 unsafe fn from_api(api: *const MainloopApi) -> Rc<Self> {
428 let ptr = Weak::from_raw((*api).userdata as *const Self);
429 let rv = ptr.upgrade();
430 let _ = ptr.into_raw(); rv.expect("Called from_api on a dropped MainloopApi")
432 }
433
434 fn push(&self, item: Box<Item>) {
435 let items = unsafe { &mut *self.items.get() };
436 items.push(Box::into_raw(item));
437 }
438
439 fn wake(main: &Weak<MainInner>) {
440 main.upgrade().map(|inner| inner.wake_real());
441 }
442
443 fn wake_real(&self) {
444 self.waker.take().map(|waker| waker.wake());
445 }
446
447 extern "C" fn io_new(
448 a: *const MainloopApi,
449 fd: i32,
450 events: IoEventFlagSet,
451 cb: Option<IoEventCb>,
452 userdata: *mut c_void,
453 ) -> *mut IoEventInternal {
454 unsafe {
455 let inner = MainInner::from_api(a);
456 let events = Cell::new(events);
457 let mut item = Box::new(Item::Event {
458 fd,
459 cb,
460 events,
461 userdata,
462 free: None,
463 afd: Cell::new(None),
464 dead: Cell::new(false),
465 main: Rc::downgrade(&inner),
466 });
467 let rv = &mut *item as *mut Item as *mut _;
468 inner.push(item);
469 inner.wake_real();
470 rv
471 }
472 }
473 extern "C" fn io_enable(e: *mut IoEventInternal, new: IoEventFlagSet) {
474 unsafe {
475 let item: *mut Item = e.cast();
476 match &*item {
477 Item::Event { main, events, .. } => {
478 events.set(new);
479 MainInner::wake(main);
480 }
481 _ => panic!(),
482 }
483 }
484 }
485 extern "C" fn io_free(e: *mut IoEventInternal) {
486 unsafe {
487 let item: *mut Item = e.cast();
488 match &*item {
489 Item::Event { dead, afd, .. } => {
490 dead.set(true);
491 afd.set(None);
492 }
493 _ => panic!(),
494 }
495 }
496 }
497 extern "C" fn io_set_destroy(e: *mut IoEventInternal, cb: Option<IoEventDestroyCb>) {
498 unsafe {
499 let item: *mut Item = e.cast();
500 match &mut *item {
501 Item::Event { free, .. } => {
502 *free = cb;
503 }
504 _ => panic!(),
505 }
506 }
507 }
508 extern "C" fn time_new(
509 a: *const MainloopApi,
510 tv: *const timeval,
511 cb: Option<TimeEventCb>,
512 userdata: *mut c_void,
513 ) -> *mut TimeEventInternal {
514 unsafe {
515 let inner = MainInner::from_api(a);
516 let tv = tv.read();
517 let ts = Cell::new(Some(
518 Duration::from_secs(tv.tv_sec as u64) + Duration::from_micros(tv.tv_usec as u64),
519 ));
520 let mut item = Box::new(Item::Timer {
521 main: Rc::downgrade(&inner),
522 ts,
523 cb,
524 userdata,
525 free: None,
526 dead: false,
527 });
528 let rv = &mut *item as *mut Item as *mut _;
529 inner.push(item);
530 inner.wake_real();
531 rv
532 }
533 }
534 extern "C" fn time_restart(e: *mut TimeEventInternal, tv: *const timeval) {
535 unsafe {
536 let item: *mut Item = e.cast();
537 match &*item {
538 Item::Timer { main, ts, .. } => {
539 let tv = tv.read();
540 ts.set(Some(
541 Duration::from_secs(tv.tv_sec as u64)
542 + Duration::from_micros(tv.tv_usec as u64),
543 ));
544 MainInner::wake(main);
545 }
546 _ => panic!(),
547 }
548 }
549 }
550 extern "C" fn time_free(e: *mut TimeEventInternal) {
551 unsafe {
552 let item: *mut Item = e.cast();
553 (*item).kill();
554 }
555 }
556 extern "C" fn time_set_destroy(e: *mut TimeEventInternal, cb: Option<TimeEventDestroyCb>) {
557 unsafe {
558 let item: *mut Item = e.cast();
559 match &mut *item {
560 Item::Timer { free, .. } => {
561 *free = cb;
562 }
563 _ => panic!(),
564 }
565 }
566 }
567 extern "C" fn defer_new(
568 a: *const MainloopApi,
569 cb: Option<DeferEventCb>,
570 userdata: *mut c_void,
571 ) -> *mut DeferEventInternal {
572 unsafe {
573 let inner = MainInner::from_api(a);
574 let mut item = Box::new(Item::Defer {
575 main: Rc::downgrade(&inner),
576 cb,
577 userdata,
578 free: None,
579 dead: false,
580 enabled: true,
581 });
582 let rv = &mut *item as *mut Item as *mut _;
583 inner.push(item);
584 inner.wake_real();
585 rv
586 }
587 }
588 extern "C" fn defer_enable(e: *mut DeferEventInternal, b: i32) {
589 unsafe {
590 let item: *mut Item = e.cast();
591 match &mut *item {
592 Item::Defer { main, enabled, .. } => {
593 *enabled = b != 0;
594 if b != 0 {
595 MainInner::wake(main);
596 }
597 }
598 _ => panic!(),
599 }
600 }
601 }
602 extern "C" fn defer_free(e: *mut DeferEventInternal) {
603 unsafe {
604 let item: *mut Item = e.cast();
605 (*item).kill();
606 }
607 }
608 extern "C" fn defer_set_destroy(e: *mut DeferEventInternal, cb: Option<DeferEventDestroyCb>) {
609 unsafe {
610 let item: *mut Item = e.cast();
611 match &mut *item {
612 Item::Defer { free, .. } => {
613 *free = cb;
614 }
615 _ => panic!(),
616 }
617 }
618 }
619 extern "C" fn quit(a: *const MainloopApi, retval: RetvalActual) {
620 unsafe {
621 let inner = MainInner::from_api(a);
622 inner.quit.set(Some(retval));
623 inner.wake_real();
624 }
625 }
626}