ceph_async/
read_stream.rs

1// Copyright 2021 John Spray All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License
14
15use futures::{FutureExt, Stream};
16use std::ffi::CString;
17use std::future::Future;
18use std::os::raw::c_char;
19use std::pin::Pin;
20use std::task::{Context, Poll};
21
22use crate::ceph::IoCtx;
23use crate::completion::with_completion;
24use crate::error::RadosResult;
25use crate::rados::rados_aio_read;
26
27const DEFAULT_BUFFER_SIZE: usize = 4 * 1024 * 1024;
28const DEFAULT_CONCURRENCY: usize = 2;
29
30pub struct ReadStream<'a> {
31    ioctx: &'a IoCtx,
32
33    // Size of each RADOS read op
34    buffer_size: usize,
35
36    // Number of concurrent RADOS read ops to issue
37    concurrency: usize,
38
39    // Caller's hint as to the object size (not required to be accurate)
40    size_hint: Option<u64>,
41
42    in_flight: Vec<IOSlot<'a>>,
43
44    // Counter for how many bytes we have issued reads for
45    next: u64,
46
47    // Counter for how many bytes we have yielded from poll_next()
48    // (i.e. the size of the object so far)
49    yielded: u64,
50
51    object_name: String,
52
53    // Flag is set when we see a short read - means do not issue any more IOs,
54    // and return Poll::Ready(None) on next poll
55    done: bool,
56}
57
58unsafe impl Send for ReadStream<'_> {}
59
60impl<'a> ReadStream<'a> {
61    pub fn new(
62        ioctx: &'a IoCtx,
63        object_name: &str,
64        buffer_size: Option<usize>,
65        concurrency: Option<usize>,
66        size_hint: Option<u64>,
67    ) -> Self {
68        let mut inst = Self {
69            ioctx,
70            buffer_size: buffer_size.unwrap_or(DEFAULT_BUFFER_SIZE),
71            concurrency: concurrency.unwrap_or(DEFAULT_CONCURRENCY),
72            size_hint,
73            in_flight: Vec::new(),
74            next: 0,
75            yielded: 0,
76            object_name: object_name.to_string(),
77            done: false,
78        };
79
80        // Start IOs early, don't wait for the first poll.
81        inst.maybe_issue();
82
83        inst
84    }
85}
86
87enum IOSlot<'a> {
88    Pending(Pin<Box<dyn Future<Output = (Vec<u8>, RadosResult<u32>)> + 'a>>),
89    Complete((Vec<u8>, RadosResult<u32>)),
90}
91
92impl<'a> ReadStream<'a> {
93    fn maybe_issue(&mut self) {
94        // Issue reads if any of these are true:
95        // - Nothing is in flight
96        // - No size bound, and in flight < concurrency
97        // - A size bound, and we're within it, and in flight < concurrency
98        // - A size bound, and it has been disproved, and in flight < concurrency
99
100        while !self.done
101            && (self.in_flight.is_empty()
102                || (((self.size_hint.is_some()
103                    && (self.next < self.size_hint.unwrap()
104                        || self.yielded > self.size_hint.unwrap()))
105                    || self.size_hint.is_none())
106                    && (self.in_flight.len() < self.concurrency)))
107        {
108            let read_at = self.next;
109            self.next += self.buffer_size as u64;
110
111            // Inefficient: copying out string to dodge ownership issues for the moment
112            let object_name_bg = self.object_name.clone();
113
114            // Grab items for use inside async{} block to avoid referencing self from in there.
115            let ioctx = self.ioctx;
116            let read_size = self.buffer_size;
117
118            // Use an async block to tie together the lifetime of a Vec and the Completion that uses it
119            let fut = async move {
120                let obj_name_str = CString::new(object_name_bg).expect("CString error");
121                let mut fill_buffer = Vec::with_capacity(read_size);
122                let completion = with_completion(ioctx, |c| unsafe {
123                    rados_aio_read(
124                        ioctx.ioctx,
125                        obj_name_str.as_ptr(),
126                        c,
127                        fill_buffer.as_mut_ptr() as *mut c_char,
128                        fill_buffer.capacity(),
129                        read_at,
130                    )
131                })
132                .expect("Can't issue read");
133
134                let result = completion.await;
135                if let Ok(rval) = &result {
136                    unsafe {
137                        let len = *rval as usize;
138                        assert!(len <= fill_buffer.capacity());
139                        fill_buffer.set_len(len);
140                    }
141                }
142
143                (fill_buffer, result)
144            };
145
146            let mut fut = Box::pin(fut);
147
148            let slot = match fut.as_mut().now_or_never() {
149                Some(result) => IOSlot::Complete(result),
150                None => IOSlot::Pending(fut),
151            };
152
153            self.in_flight.push(slot);
154        }
155    }
156}
157
158impl<'a> Stream for ReadStream<'a> {
159    type Item = RadosResult<Vec<u8>>;
160
161    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
162        if self.done {
163            // Our last read result was a short one: we know nothing else needs doing.
164            return Poll::Ready(None);
165        }
166
167        self.maybe_issue();
168
169        // Poll next read: maybe return pending if none is ready
170        let next_op = &mut self.in_flight[0];
171        let (buffer, result) = match next_op {
172            IOSlot::Complete(_) => {
173                let complete = self.in_flight.remove(0);
174                if let IOSlot::Complete(c) = complete {
175                    c
176                } else {
177                    panic!("Cannot happen")
178                }
179            }
180            IOSlot::Pending(fut) => match fut.as_mut().poll(cx) {
181                Poll::Pending => return Poll::Pending,
182                Poll::Ready(r) => {
183                    self.in_flight.remove(0);
184                    r
185                }
186            },
187        };
188
189        // A result is ready, handle it.
190        let r = match result {
191            Ok(length) => {
192                if (length as usize) < self.buffer_size {
193                    // Cancel outstanding ops
194                    self.in_flight.clear();
195
196                    // Flag to return Ready(None) on next call to poll.
197                    self.done = true;
198                }
199                self.yielded += buffer.len() as u64;
200                Poll::Ready(Some(Ok(buffer)))
201            }
202            Err(e) => Poll::Ready(Some(Err(e))),
203        };
204
205        self.maybe_issue();
206
207        r
208    }
209}