ceph_async/
list_stream.rs

1use std::ffi::CStr;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use futures::executor::ThreadPool;
6use futures::task::SpawnExt;
7use futures::{Future, Stream};
8
9use crate::ceph::CephObject;
10use crate::error::{RadosError, RadosResult};
11use crate::rados::{rados_list_ctx_t, rados_nobjects_list_close, rados_nobjects_list_next};
12
13/// Wrap rados_list_ctx_t to make it Send (hold across .await)
14#[derive(Copy, Clone)]
15struct ListCtxHandle(rados_list_ctx_t);
16unsafe impl Send for ListCtxHandle {}
17
18/// A high level Stream interface to the librados 'nobjects_list' functionality.
19///
20/// librados does not expose asynchronous calls for object listing, so we use
21/// a background helper thread.
22pub struct ListStream {
23    ctx: ListCtxHandle,
24    workers: ThreadPool,
25
26    // We only have a single call to nobjects_list_next outstanding at
27    // any time: rely on underlying librados/Objecter to do
28    // batching/readahead
29    next: Option<Pin<Box<dyn Future<Output = Option<RadosResult<CephObject>>>>>>,
30}
31
32unsafe impl Send for ListStream {}
33
34impl ListStream {
35    pub fn new(ctx: rados_list_ctx_t) -> Self {
36        Self {
37            ctx: ListCtxHandle(ctx),
38            workers: ThreadPool::builder()
39                .pool_size(1)
40                .create()
41                .expect("Could not spawn worker thread"),
42            next: None,
43        }
44    }
45}
46
47impl Stream for ListStream {
48    type Item = Result<CephObject, RadosError>;
49
50    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
51        if self.next.is_none() {
52            let list_ctx = self.ctx;
53            self.next = Some(Box::pin(
54                self.workers
55                    .spawn_with_handle(async move {
56                        let mut entry_ptr: *mut *const ::libc::c_char = std::ptr::null_mut();
57                        let mut key_ptr: *mut *const ::libc::c_char = std::ptr::null_mut();
58                        let mut nspace_ptr: *mut *const ::libc::c_char = std::ptr::null_mut();
59                        unsafe {
60                            let r = rados_nobjects_list_next(
61                                list_ctx.0,
62                                &mut entry_ptr,
63                                &mut key_ptr,
64                                &mut nspace_ptr,
65                            );
66
67                            if r == -libc::ENOENT {
68                                None
69                            } else if r < 0 {
70                                Some(Err(r.into()))
71                            } else {
72                                let object_name =
73                                    CStr::from_ptr(entry_ptr as *const ::libc::c_char);
74                                let mut object_locator = String::new();
75                                let mut namespace = String::new();
76                                if !key_ptr.is_null() {
77                                    object_locator.push_str(
78                                        &CStr::from_ptr(key_ptr as *const ::libc::c_char)
79                                            .to_string_lossy(),
80                                    );
81                                }
82                                if !nspace_ptr.is_null() {
83                                    namespace.push_str(
84                                        &CStr::from_ptr(nspace_ptr as *const ::libc::c_char)
85                                            .to_string_lossy(),
86                                    );
87                                }
88
89                                Some(Ok(CephObject {
90                                    name: object_name.to_string_lossy().into_owned(),
91                                    entry_locator: object_locator,
92                                    namespace,
93                                }))
94                            }
95                        }
96                    })
97                    .expect("Could not spawn background task"),
98            ));
99        }
100
101        let result = self.next.as_mut().unwrap().as_mut().poll(cx);
102        match &result {
103            Poll::Pending => Poll::Pending,
104            _ => {
105                self.next = None;
106                result
107            }
108        }
109
110        // match self.next.as_mut().unwrap().as_mut().poll(cx) {
111        //     Poll::Pending => Poll: Pending,
112        //     Poll::Ready(None) => Poll::Ready(None),
113        //     Poll::Ready(Some(Err(rados_error))) => Poll::Ready(Some(Err(rados_error))),
114        //     Poll::Ready(Some(Ok(ceph_object))) => Poll::Ready(Some(Err(rados_error))),
115        // }
116    }
117}
118
119impl Drop for ListStream {
120    fn drop(&mut self) {
121        unsafe {
122            rados_nobjects_list_close(self.ctx.0);
123        }
124    }
125}