use futures::{FutureExt, Stream};
use std::ffi::CString;
use std::future::Future;
use std::os::raw::c_char;
use std::pin::Pin;
use std::task::{Context, Poll};
use crate::ceph::IoCtx;
use crate::completion::with_completion;
use crate::error::RadosResult;
use crate::rados::rados_aio_read;
const DEFAULT_BUFFER_SIZE: usize = 4 * 1024 * 1024;
const DEFAULT_CONCURRENCY: usize = 2;
pub struct ReadStream<'a> {
ioctx: &'a IoCtx,
buffer_size: usize,
concurrency: usize,
size_hint: Option<u64>,
in_flight: Vec<IOSlot<'a>>,
next: u64,
yielded: u64,
object_name: String,
done: bool,
}
unsafe impl Send for ReadStream<'_> {}
impl<'a> ReadStream<'a> {
pub fn new(
ioctx: &'a IoCtx,
object_name: &str,
buffer_size: Option<usize>,
concurrency: Option<usize>,
size_hint: Option<u64>,
) -> Self {
let mut inst = Self {
ioctx,
buffer_size: buffer_size.unwrap_or(DEFAULT_BUFFER_SIZE),
concurrency: concurrency.unwrap_or(DEFAULT_CONCURRENCY),
size_hint,
in_flight: Vec::new(),
next: 0,
yielded: 0,
object_name: object_name.to_string(),
done: false,
};
inst.maybe_issue();
inst
}
}
enum IOSlot<'a> {
Pending(Pin<Box<dyn Future<Output = (Vec<u8>, RadosResult<u32>)> + 'a>>),
Complete((Vec<u8>, RadosResult<u32>)),
}
impl<'a> ReadStream<'a> {
fn maybe_issue(&mut self) {
while !self.done
&& (self.in_flight.is_empty()
|| (((self.size_hint.is_some()
&& (self.next < self.size_hint.unwrap()
|| self.yielded > self.size_hint.unwrap()))
|| self.size_hint.is_none())
&& (self.in_flight.len() < self.concurrency)))
{
let read_at = self.next;
self.next += self.buffer_size as u64;
let object_name_bg = self.object_name.clone();
let ioctx = self.ioctx;
let read_size = self.buffer_size;
let fut = async move {
let obj_name_str = CString::new(object_name_bg).expect("CString error");
let mut fill_buffer = Vec::with_capacity(read_size);
let completion = with_completion(ioctx, |c| unsafe {
rados_aio_read(
ioctx.ioctx,
obj_name_str.as_ptr(),
c,
fill_buffer.as_mut_ptr() as *mut c_char,
fill_buffer.capacity(),
read_at,
)
})
.expect("Can't issue read");
let result = completion.await;
if let Ok(rval) = &result {
unsafe {
let len = *rval as usize;
assert!(len <= fill_buffer.capacity());
fill_buffer.set_len(len);
}
}
(fill_buffer, result)
};
let mut fut = Box::pin(fut);
let slot = match fut.as_mut().now_or_never() {
Some(result) => IOSlot::Complete(result),
None => IOSlot::Pending(fut),
};
self.in_flight.push(slot);
}
}
}
impl<'a> Stream for ReadStream<'a> {
type Item = RadosResult<Vec<u8>>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if self.done {
return Poll::Ready(None);
}
self.maybe_issue();
let next_op = &mut self.in_flight[0];
let (buffer, result) = match next_op {
IOSlot::Complete(_) => {
let complete = self.in_flight.remove(0);
if let IOSlot::Complete(c) = complete {
c
} else {
panic!("Cannot happen")
}
}
IOSlot::Pending(fut) => match fut.as_mut().poll(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(r) => {
self.in_flight.remove(0);
r
}
},
};
let r = match result {
Ok(length) => {
if (length as usize) < self.buffer_size {
self.in_flight.clear();
self.done = true;
}
self.yielded += buffer.len() as u64;
Poll::Ready(Some(Ok(buffer)))
}
Err(e) => Poll::Ready(Some(Err(e))),
};
self.maybe_issue();
r
}
}