1use futures::{FutureExt, Sink, Stream};
2use std::future::Future;
3use std::pin::Pin;
4use std::task::{Context, Poll};
5
6use crate::ceph::IoCtx;
7use crate::completion::with_completion;
8use crate::error::{RadosError, RadosResult};
9use crate::rados::rados_aio_write;
10use futures::stream::FuturesUnordered;
11use std::ffi::CString;
12use std::os::raw::c_char;
13
14const DEFAULT_CONCURRENCY: usize = 2;
15
16pub struct WriteSink<'a> {
17 ioctx: &'a IoCtx,
18 in_flight: Pin<Box<FuturesUnordered<Pin<Box<dyn Future<Output = RadosResult<u32>> + 'a>>>>>,
19 object_name: String,
20
21 next: u64,
23
24 concurrency: usize,
26}
27
28unsafe impl Send for WriteSink<'_> {}
29
30impl<'a> WriteSink<'a> {
31 pub fn new(ioctx: &'a IoCtx, object_name: &str, concurrency: Option<usize>) -> Self {
32 let concurrency = concurrency.unwrap_or(DEFAULT_CONCURRENCY);
33 assert!(concurrency > 0);
34
35 Self {
36 ioctx,
37 in_flight: Box::pin(FuturesUnordered::new()),
38 object_name: object_name.to_string(),
39 next: 0,
40 concurrency,
41 }
42 }
43
44 fn trim_in_flight(
45 mut self: Pin<&mut Self>,
46 cx: &mut Context<'_>,
47 target_len: usize,
48 ) -> Poll<Result<(), <Self as Sink<Vec<u8>>>::Error>> {
49 while self.in_flight.len() > target_len {
50 match self.in_flight.as_mut().poll_next(cx) {
51 Poll::Pending => return Poll::Pending,
52 Poll::Ready(None) => {
53 unreachable!()
55 }
56 Poll::Ready(Some(result)) => match result {
57 Err(e) => return Poll::Ready(Err(e)),
58 Ok(sz) => {
59 debug!("trim_in_flight: IO completed with r={}", sz);
60 }
61 },
62 };
63 }
64
65 Poll::Ready(Ok(()))
67 }
68}
69
70impl<'a> Sink<Vec<u8>> for WriteSink<'a> {
71 type Error = RadosError;
72
73 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
74 let target = self.as_ref().concurrency - 1;
76 if self.in_flight.len() > target {
77 self.trim_in_flight(cx, target)
78 } else {
79 Poll::Ready(Ok(()))
80 }
81 }
82
83 fn start_send(mut self: Pin<&mut Self>, item: Vec<u8>) -> Result<(), Self::Error> {
84 let ioctx = self.ioctx;
85 let obj_name_str = CString::new(self.object_name.clone()).expect("CString error");
86 let write_at = self.next;
87 self.next += item.len() as u64;
88
89 let mut fut = Box::pin(async move {
90 let c = with_completion(ioctx, |c| unsafe {
91 rados_aio_write(
92 ioctx.ioctx,
93 obj_name_str.as_ptr(),
94 c,
95 item.as_ptr() as *mut c_char,
96 item.len(),
97 write_at,
98 )
99 })?;
100
101 c.await
102 });
103
104 match fut.as_mut().now_or_never() {
106 Some(Ok(_)) => Ok(()),
107 Some(Err(e)) => return Err(e),
108 None => {
109 self.in_flight.push(fut);
110 Ok(())
111 }
112 }
113 }
114
115 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
116 self.trim_in_flight(cx, 0)
117 }
118
119 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
120 self.poll_flush(cx)
122 }
123}