ceph_async/
write_sink.rs

1use futures::{FutureExt, Sink, Stream};
2use std::future::Future;
3use std::pin::Pin;
4use std::task::{Context, Poll};
5
6use crate::ceph::IoCtx;
7use crate::completion::with_completion;
8use crate::error::{RadosError, RadosResult};
9use crate::rados::rados_aio_write;
10use futures::stream::FuturesUnordered;
11use std::ffi::CString;
12use std::os::raw::c_char;
13
14const DEFAULT_CONCURRENCY: usize = 2;
15
16pub struct WriteSink<'a> {
17    ioctx: &'a IoCtx,
18    in_flight: Pin<Box<FuturesUnordered<Pin<Box<dyn Future<Output = RadosResult<u32>> + 'a>>>>>,
19    object_name: String,
20
21    // Offset into object where the next write will land
22    next: u64,
23
24    // How many RADOS ops in flight at same time?
25    concurrency: usize,
26}
27
28unsafe impl Send for WriteSink<'_> {}
29
30impl<'a> WriteSink<'a> {
31    pub fn new(ioctx: &'a IoCtx, object_name: &str, concurrency: Option<usize>) -> Self {
32        let concurrency = concurrency.unwrap_or(DEFAULT_CONCURRENCY);
33        assert!(concurrency > 0);
34
35        Self {
36            ioctx,
37            in_flight: Box::pin(FuturesUnordered::new()),
38            object_name: object_name.to_string(),
39            next: 0,
40            concurrency,
41        }
42    }
43
44    fn trim_in_flight(
45        mut self: Pin<&mut Self>,
46        cx: &mut Context<'_>,
47        target_len: usize,
48    ) -> Poll<Result<(), <Self as Sink<Vec<u8>>>::Error>> {
49        while self.in_flight.len() > target_len {
50            match self.in_flight.as_mut().poll_next(cx) {
51                Poll::Pending => return Poll::Pending,
52                Poll::Ready(None) => {
53                    // (because we check for in_flight size first)
54                    unreachable!()
55                }
56                Poll::Ready(Some(result)) => match result {
57                    Err(e) => return Poll::Ready(Err(e)),
58                    Ok(sz) => {
59                        debug!("trim_in_flight: IO completed with r={}", sz);
60                    }
61                },
62            };
63        }
64
65        // Nothing left in flight, we're done
66        Poll::Ready(Ok(()))
67    }
68}
69
70impl<'a> Sink<Vec<u8>> for WriteSink<'a> {
71    type Error = RadosError;
72
73    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
74        // If we have fewer than 1 slots available, this will try to wait on some outstanding futures
75        let target = self.as_ref().concurrency - 1;
76        if self.in_flight.len() > target {
77            self.trim_in_flight(cx, target)
78        } else {
79            Poll::Ready(Ok(()))
80        }
81    }
82
83    fn start_send(mut self: Pin<&mut Self>, item: Vec<u8>) -> Result<(), Self::Error> {
84        let ioctx = self.ioctx;
85        let obj_name_str = CString::new(self.object_name.clone()).expect("CString error");
86        let write_at = self.next;
87        self.next += item.len() as u64;
88
89        let mut fut = Box::pin(async move {
90            let c = with_completion(ioctx, |c| unsafe {
91                rados_aio_write(
92                    ioctx.ioctx,
93                    obj_name_str.as_ptr(),
94                    c,
95                    item.as_ptr() as *mut c_char,
96                    item.len(),
97                    write_at,
98                )
99            })?;
100
101            c.await
102        });
103
104        // Kick the async{} future to get the RADOS op sent
105        match fut.as_mut().now_or_never() {
106            Some(Ok(_)) => Ok(()),
107            Some(Err(e)) => return Err(e),
108            None => {
109                self.in_flight.push(fut);
110                Ok(())
111            }
112        }
113    }
114
115    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
116        self.trim_in_flight(cx, 0)
117    }
118
119    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
120        // There is no special work to be done on close
121        self.poll_flush(cx)
122    }
123}