s2n_quic_core/sync/
atomic_waker.rs

1// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::sync::primitive::{Arc, AtomicBool, Ordering};
5use core::{
6    pin::Pin,
7    task::{Context, Poll, Waker},
8};
9use crossbeam_utils::CachePadded;
10
11pub use crate::sync::primitive::AtomicWaker;
12
13/// Creates a pair of attached atomic wakers
14pub fn pair() -> (Handle, Handle) {
15    let storage = Arc::pin(Storage::default());
16
17    let a_ptr = &storage.a as *const _;
18    let b_ptr = &storage.b as *const _;
19    let is_open = &*storage.is_open as *const _;
20
21    let a = Handle {
22        local: a_ptr,
23        remote: b_ptr,
24        is_open,
25        storage: storage.clone(),
26    };
27
28    let b = Handle {
29        local: b_ptr,
30        remote: a_ptr,
31        is_open,
32        storage: storage.clone(),
33    };
34
35    (a, b)
36}
37
38/// An attached atomic waker
39#[derive(Debug)]
40pub struct Handle {
41    // store pointers so we don't have to go through the Arc pointer first
42    local: *const AtomicWaker,
43    remote: *const AtomicWaker,
44    is_open: *const AtomicBool,
45    #[allow(dead_code)]
46    storage: Pin<Arc<Storage>>,
47}
48
49/// Safety: Pointers live as long as the storage
50unsafe impl Send for Handle {}
51/// Safety: Pointers live as long as the storage
52unsafe impl Sync for Handle {}
53
54impl Handle {
55    /// Registers the local task for notifications from the other handle
56    #[inline]
57    pub fn register(&self, waker: &Waker) {
58        unsafe { (*self.local).register(waker) }
59    }
60
61    /// Notifies the other handle that it should be woken up
62    #[inline]
63    pub fn wake(&self) {
64        unsafe { (*self.remote).wake() }
65    }
66
67    /// Returns if the handle is open
68    ///
69    /// If `false` is returned, the other handle has been dropped and not interested in
70    /// notifications anymore.
71    #[inline]
72    pub fn is_open(&self) -> bool {
73        unsafe { (*self.is_open).load(Ordering::Acquire) }
74    }
75
76    /// Polls the handle until the peer handle has been closed
77    #[inline]
78    pub fn poll_close(&mut self, cx: &mut Context) -> Poll<()> {
79        if !self.is_open() {
80            return Poll::Ready(());
81        }
82
83        self.register(cx.waker());
84
85        if !self.is_open() {
86            Poll::Ready(())
87        } else {
88            Poll::Pending
89        }
90    }
91}
92
93#[derive(Debug)]
94struct Storage {
95    a: AtomicWaker,
96    b: AtomicWaker,
97    is_open: CachePadded<AtomicBool>,
98}
99
100impl Default for Storage {
101    fn default() -> Self {
102        Self {
103            a: Default::default(),
104            b: Default::default(),
105            is_open: CachePadded::new(AtomicBool::new(true)),
106        }
107    }
108}
109
110impl Drop for Handle {
111    #[inline]
112    fn drop(&mut self) {
113        // set that we've closed our handle and notify the peer
114        unsafe {
115            (*self.is_open).store(false, Ordering::Release);
116        }
117        self.wake();
118    }
119}