librsados/
lib.rs

1#[allow(warnings)]
2mod librados;
3
4pub use librados::*;
5
6use std::{
7    ffi::{CStr, CString, c_void},
8    marker::PhantomData,
9    pin::Pin,
10    task::Poll,
11};
12
13use futures::FutureExt;
14
15pub struct IoCtx<'rados> {
16    inner: rados_ioctx_t,
17    rados: PhantomData<&'rados ()>,
18}
19
20impl<'rados> IoCtx<'rados> {
21    pub fn new(cluster: &'rados librados::rados_t, name: &str) -> Option<Self> {
22        let mut inner = std::ptr::null_mut();
23        let name = CString::new(name).unwrap();
24
25        if unsafe { rados_ioctx_create(*cluster, name.as_ptr(), &mut inner) } == 0 {
26            Some(Self {
27                inner,
28                rados: Default::default(),
29            })
30        } else {
31            None
32        }
33    }
34
35    pub fn get_xattr<'a>(
36        &'a mut self,
37        object: &'a CStr,
38        name: &'a CStr,
39    ) -> impl Future<Output = Result<Vec<u8>, ()>> + 'a {
40        GetXAttr::new(self, object, name)
41    }
42}
43
44impl Drop for IoCtx<'_> {
45    fn drop(&mut self) {
46        unsafe { rados_ioctx_destroy(self.inner) };
47    }
48}
49
50struct GetXAttr<'a> {
51    ctx: &'a IoCtx<'a>,
52    completion: Option<Result<RadosCompletion, ()>>,
53    object: &'a CStr,
54    name: &'a CStr,
55    output_buf: Vec<u8>,
56    completed: bool,
57}
58
59impl<'a> GetXAttr<'a> {
60    fn new(io: &'a IoCtx<'a>, object: &'a CStr, name: &'a CStr) -> Self {
61        let output_buf = vec![0u8; 128];
62
63        Self {
64            ctx: io,
65            completion: None,
66            object,
67            name,
68            output_buf,
69            completed: false,
70        }
71    }
72}
73
74impl Future for GetXAttr<'_> {
75    type Output = Result<Vec<u8>, ()>;
76
77    fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
78        if self.completed {
79            return Poll::Ready(Ok(Vec::new()));
80        }
81
82        let (output_buf, output_buf_len) = (self.output_buf.as_mut_ptr(), self.output_buf.len());
83        let ctx = self.ctx.inner;
84        let object = self.object.as_ptr();
85        let name = self.name.as_ptr();
86
87        let completion = self.completion.get_or_insert_with(|| {
88            let completion = RadosCompletion::new();
89
90            let len = unsafe {
91                rados_aio_getxattr(
92                    ctx,
93                    object,
94                    completion.completion,
95                    name,
96                    output_buf as _,
97                    output_buf_len,
98                )
99            };
100
101            (len == 0).then_some(completion).ok_or(())
102        });
103
104        let completion = match completion {
105            Ok(v) => v,
106            Err(_) => return Poll::Ready(Err(())),
107        };
108
109        if let Poll::Ready(len) = completion.poll(cx) {
110            self.completed = true;
111            let len = usize::try_from(len).map_err(|_| ())?;
112            let mut data = core::mem::take(&mut self.output_buf);
113            data.truncate(len);
114            Poll::Ready(Ok(data))
115        } else {
116            Poll::Pending
117        }
118    }
119}
120
121struct RadosCompletion {
122    safe: bool,
123    completion: rados_completion_t,
124    rx: futures::channel::oneshot::Receiver<()>,
125}
126
127impl RadosCompletion {
128    pub fn new() -> Self {
129        let safe = false;
130
131        let (tx, rx) = futures::channel::oneshot::channel();
132
133        let tx = Box::leak(Box::new(tx));
134
135        unsafe extern "C" fn wake_waker_and_drop_box(_: rados_completion_t, arg: *mut c_void) {
136            let arg = unsafe { &mut *(arg as *mut futures::channel::oneshot::Sender<()>) };
137            let arg = *unsafe { Box::from_raw(arg) };
138            arg.send(()).ok();
139        }
140
141        let mut completion = std::ptr::null_mut();
142
143        let (complete, safe) = if safe {
144            (None, Some(wake_waker_and_drop_box as _))
145        } else {
146            (Some(wake_waker_and_drop_box as _), None)
147        };
148
149        let completion_created = unsafe {
150            rados_aio_create_completion(tx as *mut _ as _, complete, safe, &mut completion)
151        };
152
153        assert!(
154            completion_created == 0,
155            "rados_aio_create_completion returned undocumented return code"
156        );
157
158        Self {
159            safe: false,
160            completion,
161            rx,
162        }
163    }
164
165    fn poll(&mut self, cx: &mut std::task::Context<'_>) -> core::task::Poll<i32> {
166        if self.rx.poll_unpin(cx).is_ready() {
167            if self.safe && unsafe { rados_aio_is_safe(self.completion) } != 0 {
168                let value = unsafe { rados_aio_get_return_value(self.completion) };
169                Poll::Ready(value)
170            } else if unsafe { rados_aio_is_complete(self.completion) } != 0 {
171                let value = unsafe { rados_aio_get_return_value(self.completion) };
172                Poll::Ready(value)
173            } else {
174                Poll::Pending
175            }
176        } else {
177            Poll::Pending
178        }
179    }
180}
181
182impl Drop for RadosCompletion {
183    fn drop(&mut self) {
184        unsafe { rados_aio_release(self.completion) }
185    }
186}