Skip to main content

compio_driver/
key.rs

1#![allow(dead_code)]
2
3use std::{
4    fmt::{self, Debug},
5    hash::Hash,
6    io,
7    mem::{self, ManuallyDrop},
8    ops::{Deref, DerefMut},
9    ptr,
10    task::Waker,
11};
12
13use compio_buf::{BufResult, IntoInner};
14use compio_send_wrapper::SendWrapper;
15use thin_cell::unsync::{Inner, Ref, ThinCell, Weak};
16
17use crate::{Carry, DriverType, Extra, OpCode, PushEntry, control::Carrier};
18
19/// An operation with other needed information.
20///
21/// You should not use `RawOp` directly. Instead, use [`Key`] to manage the
22/// reference-counted pointer to it.
23#[repr(C)]
24pub(crate) struct RawOp<M: ?Sized> {
25    // Platform-specific extra data.
26    //
27    // - On Windows, it holds the `OVERLAPPED` buffer and a pointer to the driver.
28    // - On Linux with `io_uring`, it holds the flags returned by kernel.
29    // - On other platforms, it stores tracker for multi-fd `OpCode`s.
30    //
31    // Extra MUST be the first field to guarantee the layout for casting on windows. An invariant
32    // on IOCP driver is that `RawOp` pointer is the same as `OVERLAPPED` pointer.
33    extra: Extra,
34    // The cancelled flag indicates the op has been cancelled.
35    cancelled: bool,
36    result: PushEntry<Option<Waker>, io::Result<usize>>,
37    pub(crate) carrier: M,
38}
39
40impl<C: ?Sized> RawOp<C> {
41    pub fn extra(&self) -> &Extra {
42        &self.extra
43    }
44
45    pub fn extra_mut(&mut self) -> &mut Extra {
46        &mut self.extra
47    }
48
49    #[cfg(io_uring)]
50    pub fn wake_by_ref(&mut self) {
51        if let PushEntry::Pending(Some(w)) = &self.result {
52            w.wake_by_ref();
53        }
54    }
55}
56
57#[cfg(io_uring)]
58impl<C: crate::Carry + ?Sized> RawOp<C> {
59    pub fn create_entry<const FALLBACK: bool>(&mut self) -> crate::OpEntry {
60        if FALLBACK {
61            self.carrier.create_entry_fallback().with_extra(&self.extra)
62        } else {
63            self.carrier.create_entry().with_extra(&self.extra)
64        }
65    }
66}
67
68#[cfg(windows)]
69impl<C: crate::Carry + ?Sized> RawOp<C> {
70    /// Call [`OpCode::operate`] and assume that it is not an overlapped op,
71    /// which means it never returns [`Poll::Pending`].
72    ///
73    /// [`Poll::Pending`]: std::task::Poll::Pending
74    pub fn operate_blocking(&mut self) -> io::Result<usize> {
75        use std::{panic::AssertUnwindSafe, task::Poll};
76
77        use crate::panic::catch_unwind_io;
78
79        let optr = self.extra_mut().optr();
80        catch_unwind_io(AssertUnwindSafe(|| unsafe {
81            match self.carrier.operate(optr.cast()) {
82                Poll::Pending => unreachable!("this operation is not overlapped"),
83                Poll::Ready(res) => res,
84            }
85        }))
86    }
87}
88
89impl<C: ?Sized> Debug for RawOp<C> {
90    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
91        f.debug_struct("RawOp")
92            .field("extra", &self.extra)
93            .field("cancelled", &self.cancelled)
94            .field("result", &self.result)
95            .field("Carrier", &"<...>")
96            .finish()
97    }
98}
99
100/// A typed wrapper for key of Ops submitted into driver.
101#[repr(transparent)]
102pub struct Key<T> {
103    erased: ErasedKey,
104    _p: std::marker::PhantomData<T>,
105}
106
107/// A type-erased reference-counted pointer to an operation.
108///
109/// Internally, it uses [`ThinCell`] to manage the reference count and borrowing
110/// state. It provides methods to manipulate the underlying operation, such as
111/// setting results, checking completion status, and cancelling the operation.
112#[derive(Clone)]
113#[repr(transparent)]
114pub struct ErasedKey {
115    inner: ThinCell<RawOp<dyn Carry>>,
116}
117
118/// A weak reference of [`ErasedKey`].
119#[derive(Clone)]
120#[repr(transparent)]
121pub(crate) struct WeakKey {
122    inner: Weak<RawOp<dyn Carry>>,
123}
124
125impl<T> Clone for Key<T> {
126    fn clone(&self) -> Self {
127        Self {
128            erased: self.erased.clone(),
129            _p: std::marker::PhantomData,
130        }
131    }
132}
133
134impl<T> Debug for Key<T> {
135    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
136        write!(f, "Key({})", self.erased.inner.as_ptr() as usize)
137    }
138}
139
140impl<T> Key<T> {
141    pub(crate) fn into_raw(self) -> usize {
142        self.erased.into_raw()
143    }
144
145    pub(crate) fn erase(self) -> ErasedKey {
146        self.erased
147    }
148}
149
150impl<T: OpCode> Key<T> {
151    /// Take the inner result if it is completed.
152    ///
153    /// # Panics
154    ///
155    /// Panics if the result is not ready or the `Key` is not unique (multiple
156    /// references or borrowed).
157    pub(crate) fn take_result(self) -> BufResult<usize, T> {
158        // SAFETY: `Key` invariant guarantees that `T` is the actual concrete type.
159        unsafe { self.erased.take_result::<T>() }
160    }
161}
162
163impl<T: OpCode + 'static> Key<T> {
164    /// Create [`RawOp`] and get the [`Key`] to it.
165    pub(crate) fn new(op: T, extra: impl Into<Extra>, driver_ty: DriverType) -> Self {
166        let erased = ErasedKey::new(op, extra.into(), driver_ty);
167
168        Self {
169            erased,
170            _p: std::marker::PhantomData,
171        }
172    }
173
174    pub(crate) fn set_extra(&self, extra: impl Into<Extra>) {
175        self.borrow().extra = extra.into();
176    }
177}
178
179impl<T> Deref for Key<T> {
180    type Target = ErasedKey;
181
182    fn deref(&self) -> &Self::Target {
183        &self.erased
184    }
185}
186
187impl<T> DerefMut for Key<T> {
188    fn deref_mut(&mut self) -> &mut Self::Target {
189        &mut self.erased
190    }
191}
192
193impl PartialEq for ErasedKey {
194    fn eq(&self, other: &Self) -> bool {
195        self.inner.ptr_eq(&other.inner)
196    }
197}
198
199impl Eq for ErasedKey {}
200
201impl Hash for ErasedKey {
202    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
203        (self.inner.as_ptr() as usize).hash(state)
204    }
205}
206
207impl Unpin for ErasedKey {}
208
209impl ErasedKey {
210    /// Create [`RawOp`] and get the [`ErasedKey`] to it.
211    pub(crate) fn new<T: OpCode + 'static>(op: T, extra: Extra, driver_ty: DriverType) -> Self {
212        let raw_op = RawOp {
213            extra,
214            cancelled: false,
215            result: PushEntry::Pending(None),
216            carrier: Carrier::new(op, driver_ty),
217        };
218        let mut inner = ThinCell::new(raw_op);
219        // SAFETY:
220        // - ThinCell is just created, there will be no shared owner or borrower
221        // - Carrier is being pinned by ThinCell, it will have a stable address until
222        //   move out
223        unsafe { inner.borrow_unchecked().carrier.init() };
224        Self {
225            inner: unsafe { inner.unsize(|p| p as *const Inner<RawOp<dyn Carry>>) },
226        }
227    }
228
229    /// Create from `user_data` pointer.
230    ///
231    /// # Safety
232    ///
233    /// `user_data` must be a valid pointer to `RawOp<dyn OpCode>` previously
234    /// created by [`Key::into_raw`].
235    pub(crate) unsafe fn from_raw(user_data: usize) -> Self {
236        let inner = unsafe { ThinCell::from_raw(user_data as *mut ()) };
237        Self { inner }
238    }
239
240    /// Create from `Overlapped` pointer.
241    ///
242    /// # Safety
243    ///
244    /// `optr` must be a valid pointer to `Overlapped` stored in `Extra` of
245    /// `RawOp<dyn OpCode>`.
246    #[cfg(windows)]
247    pub(crate) unsafe fn from_optr(optr: *mut crate::sys::Overlapped) -> Self {
248        let ptr = unsafe { optr.cast::<usize>().offset(-2).cast() };
249        let inner = unsafe { ThinCell::from_raw(ptr) };
250        Self { inner }
251    }
252
253    /// Leak self into a pointer to `Overlapped`.
254    #[cfg(windows)]
255    pub(crate) fn into_optr(self) -> *mut crate::sys::Overlapped {
256        unsafe { self.inner.leak().cast::<usize>().add(2).cast() }
257    }
258
259    /// Get a weak reference to the allocation.
260    ///
261    /// It will not prevent dropping [`RawOp`] or cause panic when calling
262    /// [`take_result`], and can be upgrade back when needed.
263    ///
264    /// [`take_result`]: Self::take_result
265    pub(crate) fn downgrade(&self) -> WeakKey {
266        WeakKey {
267            inner: self.inner.downgrade(),
268        }
269    }
270
271    /// Get the pointer as `user_data`.
272    ///
273    /// **Do not** call [`from_raw`](Self::from_raw) on the returned value of
274    /// this method.
275    pub(crate) fn as_raw(&self) -> usize {
276        self.inner.as_ptr() as _
277    }
278
279    /// Leak self and get the pointer as `user_data`.
280    pub(crate) fn into_raw(self) -> usize {
281        self.inner.leak() as _
282    }
283
284    #[inline]
285    pub(crate) fn borrow(&self) -> Ref<'_, RawOp<dyn Carry>> {
286        self.inner.borrow()
287    }
288
289    /// Set the `cancelled` flag, returning whether it was already cancelled.
290    pub(crate) fn set_cancelled(&self) -> bool {
291        let mut op = self.borrow();
292        mem::replace(&mut op.cancelled, true)
293    }
294
295    /// Whether the op is completed.
296    pub(crate) fn has_result(&self) -> bool {
297        self.borrow().result.is_ready()
298    }
299
300    /// Whether the key is uniquely owned.
301    pub(crate) fn is_unique(&self) -> bool {
302        ThinCell::count(&self.inner) == 1
303    }
304
305    /// Complete the op and wake up the future if a waker is set.
306    pub(crate) fn set_result(&self, res: io::Result<usize>) {
307        let mut this = self.borrow();
308        {
309            let RawOp { extra, carrier, .. } = &mut *this;
310            unsafe { crate::sys::Carry::set_result(carrier, &res, extra) };
311        }
312        if let PushEntry::Pending(Some(w)) =
313            std::mem::replace(&mut this.result, PushEntry::Ready(res))
314        {
315            w.wake();
316        }
317    }
318
319    /// Swap the inner [`Extra`] with the provided one, returning the previous
320    /// value.
321    pub(crate) fn swap_extra(&self, extra: Extra) -> Extra {
322        std::mem::replace(&mut self.borrow().extra, extra)
323    }
324
325    /// Set waker of the current future.
326    pub(crate) fn set_waker(&self, waker: &Waker) {
327        let PushEntry::Pending(w) = &mut self.borrow().result else {
328            return;
329        };
330
331        if w.as_ref().is_some_and(|w| w.will_wake(waker)) {
332            return;
333        }
334
335        *w = Some(waker.clone());
336    }
337
338    /// Take the inner result if it is completed.
339    ///
340    /// # Safety
341    ///
342    /// `T` must be the actual concrete type of the `Key`.
343    ///
344    /// # Panics
345    ///
346    /// Panics if the result is not ready or the `Key` is not unique (multiple
347    /// references or borrowed).
348    unsafe fn take_result<T: OpCode>(self) -> BufResult<usize, T> {
349        // SAFETY: Caller guarantees that `T` is the actual concrete type.
350        let this = unsafe { self.inner.downcast_unchecked::<RawOp<Carrier<T>>>() };
351        let op = this.try_unwrap().map_err(|_| ()).expect("Key not unique");
352        let res = op.result.take_ready().expect("Result not ready");
353        BufResult(res, op.carrier.into_inner())
354    }
355
356    /// Unsafely freeze the `Key` by bypassing borrow flag of [`ThinCell`],
357    /// preventing it from being dropped and unconditionally expose the
358    /// underlying `RawOp<dyn OpCode>`.
359    ///
360    /// # Safety
361    /// - During the time the [`FrozenKey`] is alive, no other references to the
362    ///   underlying `RawOp<dyn OpCode>` is used.
363    /// - One must not touch [`ThinCell`]'s internal state at all, as `Cell` is
364    ///   strictly single-threaded. This means no borrowing, no cloning, no
365    ///   dropping, etc.
366    pub(crate) unsafe fn freeze(self) -> FrozenKey {
367        FrozenKey {
368            inner: ManuallyDrop::new(self),
369            thread_id: SendWrapper::new(()),
370        }
371    }
372}
373
374impl Debug for ErasedKey {
375    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
376        write!(f, "ErasedKey({})", self.inner.as_ptr() as usize)
377    }
378}
379
380impl WeakKey {
381    pub(crate) fn upgrade(&self) -> Option<ErasedKey> {
382        Some(ErasedKey {
383            inner: self.inner.upgrade()?,
384        })
385    }
386
387    pub(crate) fn as_ptr(&self) -> *const () {
388        self.inner.as_ptr()
389    }
390}
391
392impl PartialEq for WeakKey {
393    fn eq(&self, other: &Self) -> bool {
394        ptr::eq(self.inner.as_ptr(), other.inner.as_ptr())
395    }
396}
397
398impl Eq for WeakKey {}
399
400impl Hash for WeakKey {
401    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
402        (self.inner.as_ptr() as usize).hash(state)
403    }
404}
405
406impl Debug for WeakKey {
407    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
408        if let Some(upgraded) = self.inner.upgrade() {
409            Debug::fmt(&upgraded, f)
410        } else {
411            write!(f, "(Dropped)")
412        }
413    }
414}
415
416/// A frozen view into a [`Key`].
417///
418/// It's guaranteed to have [`ErasedKey`] as the first field.
419#[repr(C)]
420pub(crate) struct FrozenKey {
421    inner: ManuallyDrop<ErasedKey>,
422    thread_id: SendWrapper<()>,
423}
424
425impl FrozenKey {
426    pub fn as_mut(&mut self) -> &mut RawOp<dyn Carry> {
427        unsafe { self.inner.inner.borrow_unchecked() }
428    }
429
430    pub fn into_inner(self) -> ErasedKey {
431        let mut this = ManuallyDrop::new(self);
432        unsafe { ManuallyDrop::take(&mut this.inner) }
433    }
434}
435
436impl Drop for FrozenKey {
437    fn drop(&mut self) {
438        if self.thread_id.valid() {
439            unsafe { ManuallyDrop::drop(&mut self.inner) }
440        }
441    }
442}
443
444unsafe impl Send for FrozenKey {}
445unsafe impl Sync for FrozenKey {}
446
447/// A temporary view into a [`Key`].
448///
449/// It is mainly used in the driver to avoid accidentally decreasing the
450/// reference count of the `Key` when the driver is not completed and may still
451/// emit event with the `user_data`.
452pub(crate) struct BorrowedKey(ManuallyDrop<ErasedKey>);
453
454impl BorrowedKey {
455    pub unsafe fn from_raw(user_data: usize) -> Self {
456        let key = unsafe { ErasedKey::from_raw(user_data) };
457        Self(ManuallyDrop::new(key))
458    }
459
460    pub fn upgrade(self) -> ErasedKey {
461        ManuallyDrop::into_inner(self.0)
462    }
463}
464
465impl Deref for BorrowedKey {
466    type Target = ErasedKey;
467
468    fn deref(&self) -> &Self::Target {
469        &self.0
470    }
471}