1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
use std::future::poll_fn;
use std::{
cell::RefCell,
io,
os::unix::io::{BorrowedFd, FromRawFd, RawFd},
rc::Rc,
task::Waker,
};
use crate::runtime::driver::op::Op;
// Tracks in-flight operations on a file descriptor. Ensures all in-flight
// operations complete before submitting the close.
//
// When closing the file descriptor because it is going out of scope, a synchronous close is
// employed.
//
// The closed state is tracked so close calls after the first are ignored.
// Only the first close call returns the true result of closing the file descriptor.
#[derive(Clone)]
pub(crate) struct SharedFd {
inner: Rc<Inner>,
}
struct Inner {
// Open file descriptor
fd: RawFd,
// Track the sharing state of the file descriptor:
// normal, being waited on to allow a close by the parent's owner, or already closed.
state: RefCell<State>,
}
enum State {
/// Initial state
Init,
/// Waiting for the number of strong Rc pointers to drop to 1.
WaitingForUniqueness(Waker),
/// The close has been triggered by the parent owner.
Closed,
}
impl SharedFd {
pub(crate) fn new(fd: RawFd) -> SharedFd {
SharedFd {
inner: Rc::new(Inner {
fd,
state: RefCell::new(State::Init),
}),
}
}
/// Returns the RawFd
pub(crate) fn raw_fd(&self) -> RawFd {
self.inner.fd
}
pub(crate) fn fd(&self) -> BorrowedFd<'_> {
// SAFETY: we're ensuring the fd stays open as long as SharedFd is
// alive.
unsafe { BorrowedFd::borrow_raw(self.inner.fd) }
}
/// An FD cannot be closed until all in-flight operation have completed.
/// This prevents bugs where in-flight reads could operate on the incorrect
/// file descriptor.
///
pub(crate) async fn close(&mut self) -> io::Result<()> {
loop {
// Get a mutable reference to Inner, indicating there are no
// in-flight operations on the FD.
if let Some(inner) = Rc::get_mut(&mut self.inner) {
// Wait for the close operation.
return inner.async_close_op().await;
}
self.sharedfd_is_unique().await;
}
}
/// Completes when the SharedFd's Inner Rc strong count is 1.
/// Gets polled any time a SharedFd is dropped.
async fn sharedfd_is_unique(&self) {
use std::task::Poll;
poll_fn(|cx| {
if Rc::<Inner>::strong_count(&self.inner) == 1 {
return Poll::Ready(());
}
let mut state = self.inner.state.borrow_mut();
match &mut *state {
State::Init => {
*state = State::WaitingForUniqueness(cx.waker().clone());
Poll::Pending
}
State::WaitingForUniqueness(waker) => {
if !waker.will_wake(cx.waker()) {
waker.clone_from(cx.waker());
}
Poll::Pending
}
State::Closed => Poll::Ready(()),
}
})
.await;
}
}
impl Inner {
async fn async_close_op(&mut self) -> io::Result<()> {
// &mut self implies there are no outstanding operations.
// If state already closed, the user closed multiple times; simply return Ok.
// Otherwise, set state to closed and then submit and await the uring close operation.
{
// Release state guard before await.
let state = RefCell::get_mut(&mut self.state);
if let State::Closed = *state {
return Ok(());
}
*state = State::Closed;
}
Op::close(self.fd)?.await
}
}
impl Drop for SharedFd {
fn drop(&mut self) {
// If the SharedFd state is Waiting
// The job of the SharedFd's drop is to possibly wake a task that is waiting for the
// reference count to go down.
use std::mem;
let mut state = self.inner.state.borrow_mut();
if let State::WaitingForUniqueness(_) = *state {
let state = &mut *state;
if let State::WaitingForUniqueness(waker) = mem::replace(state, State::Init) {
// Wake the task wanting to close this SharedFd and let it try again. If it finds
// there are no more outstanding clones, it will succeed. Otherwise it will start a new
// Future, waiting for another SharedFd to be dropped.
waker.wake()
}
}
}
}
impl Drop for Inner {
fn drop(&mut self) {
// If the inner state isn't `Closed`, the user hasn't called close().await
// so do it synchronously.
let state = self.state.borrow_mut();
if let State::Closed = *state {
return;
}
let _ = unsafe { std::fs::File::from_raw_fd(self.fd) };
}
}