ceph_async/
read_stream.rs1use futures::{FutureExt, Stream};
16use std::ffi::CString;
17use std::future::Future;
18use std::os::raw::c_char;
19use std::pin::Pin;
20use std::task::{Context, Poll};
21
22use crate::ceph::IoCtx;
23use crate::completion::with_completion;
24use crate::error::RadosResult;
25use crate::rados::rados_aio_read;
26
27const DEFAULT_BUFFER_SIZE: usize = 4 * 1024 * 1024;
28const DEFAULT_CONCURRENCY: usize = 2;
29
30pub struct ReadStream<'a> {
31 ioctx: &'a IoCtx,
32
33 buffer_size: usize,
35
36 concurrency: usize,
38
39 size_hint: Option<u64>,
41
42 in_flight: Vec<IOSlot<'a>>,
43
44 next: u64,
46
47 yielded: u64,
50
51 object_name: String,
52
53 done: bool,
56}
57
58unsafe impl Send for ReadStream<'_> {}
59
60impl<'a> ReadStream<'a> {
61 pub fn new(
62 ioctx: &'a IoCtx,
63 object_name: &str,
64 buffer_size: Option<usize>,
65 concurrency: Option<usize>,
66 size_hint: Option<u64>,
67 ) -> Self {
68 let mut inst = Self {
69 ioctx,
70 buffer_size: buffer_size.unwrap_or(DEFAULT_BUFFER_SIZE),
71 concurrency: concurrency.unwrap_or(DEFAULT_CONCURRENCY),
72 size_hint,
73 in_flight: Vec::new(),
74 next: 0,
75 yielded: 0,
76 object_name: object_name.to_string(),
77 done: false,
78 };
79
80 inst.maybe_issue();
82
83 inst
84 }
85}
86
87enum IOSlot<'a> {
88 Pending(Pin<Box<dyn Future<Output = (Vec<u8>, RadosResult<u32>)> + 'a>>),
89 Complete((Vec<u8>, RadosResult<u32>)),
90}
91
92impl<'a> ReadStream<'a> {
93 fn maybe_issue(&mut self) {
94 while !self.done
101 && (self.in_flight.is_empty()
102 || (((self.size_hint.is_some()
103 && (self.next < self.size_hint.unwrap()
104 || self.yielded > self.size_hint.unwrap()))
105 || self.size_hint.is_none())
106 && (self.in_flight.len() < self.concurrency)))
107 {
108 let read_at = self.next;
109 self.next += self.buffer_size as u64;
110
111 let object_name_bg = self.object_name.clone();
113
114 let ioctx = self.ioctx;
116 let read_size = self.buffer_size;
117
118 let fut = async move {
120 let obj_name_str = CString::new(object_name_bg).expect("CString error");
121 let mut fill_buffer = Vec::with_capacity(read_size);
122 let completion = with_completion(ioctx, |c| unsafe {
123 rados_aio_read(
124 ioctx.ioctx,
125 obj_name_str.as_ptr(),
126 c,
127 fill_buffer.as_mut_ptr() as *mut c_char,
128 fill_buffer.capacity(),
129 read_at,
130 )
131 })
132 .expect("Can't issue read");
133
134 let result = completion.await;
135 if let Ok(rval) = &result {
136 unsafe {
137 let len = *rval as usize;
138 assert!(len <= fill_buffer.capacity());
139 fill_buffer.set_len(len);
140 }
141 }
142
143 (fill_buffer, result)
144 };
145
146 let mut fut = Box::pin(fut);
147
148 let slot = match fut.as_mut().now_or_never() {
149 Some(result) => IOSlot::Complete(result),
150 None => IOSlot::Pending(fut),
151 };
152
153 self.in_flight.push(slot);
154 }
155 }
156}
157
158impl<'a> Stream for ReadStream<'a> {
159 type Item = RadosResult<Vec<u8>>;
160
161 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
162 if self.done {
163 return Poll::Ready(None);
165 }
166
167 self.maybe_issue();
168
169 let next_op = &mut self.in_flight[0];
171 let (buffer, result) = match next_op {
172 IOSlot::Complete(_) => {
173 let complete = self.in_flight.remove(0);
174 if let IOSlot::Complete(c) = complete {
175 c
176 } else {
177 panic!("Cannot happen")
178 }
179 }
180 IOSlot::Pending(fut) => match fut.as_mut().poll(cx) {
181 Poll::Pending => return Poll::Pending,
182 Poll::Ready(r) => {
183 self.in_flight.remove(0);
184 r
185 }
186 },
187 };
188
189 let r = match result {
191 Ok(length) => {
192 if (length as usize) < self.buffer_size {
193 self.in_flight.clear();
195
196 self.done = true;
198 }
199 self.yielded += buffer.len() as u64;
200 Poll::Ready(Some(Ok(buffer)))
201 }
202 Err(e) => Poll::Ready(Some(Err(e))),
203 };
204
205 self.maybe_issue();
206
207 r
208 }
209}