use super::current_thread::{process_thread_local_completions, submit_and_wait_thread_local};
use super::requests::IoRequest;
use bytes::Bytes;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
pub struct UringCurrentThreadFuture {
request: Arc<IoRequest>,
}
impl UringCurrentThreadFuture {
pub(super) fn new(request: Arc<IoRequest>) -> Self {
Self { request }
}
}
impl Future for UringCurrentThreadFuture {
type Output = object_store::Result<Bytes>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.request.thread_id != std::thread::current().id() {
panic!("Request thread ID does not match current thread ID");
}
let mut state = self.request.state.lock().unwrap();
if state.completed {
match state.err.take() {
Some(err) => {
return Poll::Ready(Err(object_store::Error::Generic {
store: "io_uring_ct",
source: Box::new(err),
}));
}
None => {
let br = state.bytes_read;
state.buffer.truncate(br);
let bytes = std::mem::take(&mut state.buffer).freeze();
return Poll::Ready(Ok(bytes));
}
}
}
drop(state);
if let Err(e) = submit_and_wait_thread_local() {
log::debug!("Submit and wait error: {:?}", e);
}
if let Err(e) = process_thread_local_completions() {
log::warn!("Error processing completions: {:?}", e);
}
let mut state = self.request.state.lock().unwrap();
if state.completed {
match state.err.take() {
Some(err) => {
return Poll::Ready(Err(object_store::Error::Generic {
store: "io_uring_ct",
source: Box::new(err),
}));
}
None => {
let br = state.bytes_read;
state.buffer.truncate(br);
let bytes = std::mem::take(&mut state.buffer).freeze();
return Poll::Ready(Ok(bytes));
}
}
}
drop(state);
cx.waker().wake_by_ref();
Poll::Pending
}
}