1#[allow(warnings)]
2mod librados;
3
4pub use librados::*;
5
6use std::{
7 ffi::{CStr, CString, c_void},
8 marker::PhantomData,
9 pin::Pin,
10 task::Poll,
11};
12
13use futures::FutureExt;
14
15pub struct IoCtx<'rados> {
16 inner: rados_ioctx_t,
17 rados: PhantomData<&'rados ()>,
18}
19
20impl<'rados> IoCtx<'rados> {
21 pub fn new(cluster: &'rados librados::rados_t, name: &str) -> Option<Self> {
22 let mut inner = std::ptr::null_mut();
23 let name = CString::new(name).unwrap();
24
25 if unsafe { rados_ioctx_create(*cluster, name.as_ptr(), &mut inner) } == 0 {
26 Some(Self {
27 inner,
28 rados: Default::default(),
29 })
30 } else {
31 None
32 }
33 }
34
35 pub fn get_xattr<'a>(
36 &'a mut self,
37 object: &'a CStr,
38 name: &'a CStr,
39 ) -> impl Future<Output = Result<Vec<u8>, ()>> + 'a {
40 GetXAttr::new(self, object, name)
41 }
42}
43
44impl Drop for IoCtx<'_> {
45 fn drop(&mut self) {
46 unsafe { rados_ioctx_destroy(self.inner) };
47 }
48}
49
50struct GetXAttr<'a> {
51 ctx: &'a IoCtx<'a>,
52 completion: Option<Result<RadosCompletion, ()>>,
53 object: &'a CStr,
54 name: &'a CStr,
55 output_buf: Vec<u8>,
56 completed: bool,
57}
58
59impl<'a> GetXAttr<'a> {
60 fn new(io: &'a IoCtx<'a>, object: &'a CStr, name: &'a CStr) -> Self {
61 let output_buf = vec![0u8; 128];
62
63 Self {
64 ctx: io,
65 completion: None,
66 object,
67 name,
68 output_buf,
69 completed: false,
70 }
71 }
72}
73
74impl Future for GetXAttr<'_> {
75 type Output = Result<Vec<u8>, ()>;
76
77 fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
78 if self.completed {
79 return Poll::Ready(Ok(Vec::new()));
80 }
81
82 let (output_buf, output_buf_len) = (self.output_buf.as_mut_ptr(), self.output_buf.len());
83 let ctx = self.ctx.inner;
84 let object = self.object.as_ptr();
85 let name = self.name.as_ptr();
86
87 let completion = self.completion.get_or_insert_with(|| {
88 let completion = RadosCompletion::new();
89
90 let len = unsafe {
91 rados_aio_getxattr(
92 ctx,
93 object,
94 completion.completion,
95 name,
96 output_buf as _,
97 output_buf_len,
98 )
99 };
100
101 (len == 0).then_some(completion).ok_or(())
102 });
103
104 let completion = match completion {
105 Ok(v) => v,
106 Err(_) => return Poll::Ready(Err(())),
107 };
108
109 if let Poll::Ready(len) = completion.poll(cx) {
110 self.completed = true;
111 let len = usize::try_from(len).map_err(|_| ())?;
112 let mut data = core::mem::take(&mut self.output_buf);
113 data.truncate(len);
114 Poll::Ready(Ok(data))
115 } else {
116 Poll::Pending
117 }
118 }
119}
120
121struct RadosCompletion {
122 safe: bool,
123 completion: rados_completion_t,
124 rx: futures::channel::oneshot::Receiver<()>,
125}
126
127impl RadosCompletion {
128 pub fn new() -> Self {
129 let safe = false;
130
131 let (tx, rx) = futures::channel::oneshot::channel();
132
133 let tx = Box::leak(Box::new(tx));
134
135 unsafe extern "C" fn wake_waker_and_drop_box(_: rados_completion_t, arg: *mut c_void) {
136 let arg = unsafe { &mut *(arg as *mut futures::channel::oneshot::Sender<()>) };
137 let arg = *unsafe { Box::from_raw(arg) };
138 arg.send(()).ok();
139 }
140
141 let mut completion = std::ptr::null_mut();
142
143 let (complete, safe) = if safe {
144 (None, Some(wake_waker_and_drop_box as _))
145 } else {
146 (Some(wake_waker_and_drop_box as _), None)
147 };
148
149 let completion_created = unsafe {
150 rados_aio_create_completion(tx as *mut _ as _, complete, safe, &mut completion)
151 };
152
153 assert!(
154 completion_created == 0,
155 "rados_aio_create_completion returned undocumented return code"
156 );
157
158 Self {
159 safe: false,
160 completion,
161 rx,
162 }
163 }
164
165 fn poll(&mut self, cx: &mut std::task::Context<'_>) -> core::task::Poll<i32> {
166 if self.rx.poll_unpin(cx).is_ready() {
167 if self.safe && unsafe { rados_aio_is_safe(self.completion) } != 0 {
168 let value = unsafe { rados_aio_get_return_value(self.completion) };
169 Poll::Ready(value)
170 } else if unsafe { rados_aio_is_complete(self.completion) } != 0 {
171 let value = unsafe { rados_aio_get_return_value(self.completion) };
172 Poll::Ready(value)
173 } else {
174 Poll::Pending
175 }
176 } else {
177 Poll::Pending
178 }
179 }
180}
181
182impl Drop for RadosCompletion {
183 fn drop(&mut self) {
184 unsafe { rados_aio_release(self.completion) }
185 }
186}