use std::time::{Duration, Instant};
use crate::cache::CacheEntry;
use crate::downstream::DownstreamRequest;
use crate::execute::NextRequest;
use crate::object_store::{KvStoreError, ObjectValue};
use crate::{body::Body, error::Error, streaming_body::StreamingBody};
use anyhow::anyhow;
use futures::Future;
use futures::FutureExt;
use http::Response;
use tokio::sync::oneshot;
#[derive(Debug)]
pub struct PendingKvLookupTask(PeekableTask<Result<Option<ObjectValue>, KvStoreError>>);
impl PendingKvLookupTask {
pub fn new(t: PeekableTask<Result<Option<ObjectValue>, KvStoreError>>) -> PendingKvLookupTask {
PendingKvLookupTask(t)
}
pub fn task(self) -> PeekableTask<Result<Option<ObjectValue>, KvStoreError>> {
self.0
}
}
#[derive(Debug)]
pub struct PendingKvInsertTask(PeekableTask<Result<(), KvStoreError>>);
impl PendingKvInsertTask {
pub fn new(t: PeekableTask<Result<(), KvStoreError>>) -> PendingKvInsertTask {
PendingKvInsertTask(t)
}
pub fn task(self) -> PeekableTask<Result<(), KvStoreError>> {
self.0
}
}
#[derive(Debug)]
pub struct PendingKvDeleteTask(PeekableTask<Result<bool, KvStoreError>>);
impl PendingKvDeleteTask {
pub fn new(t: PeekableTask<Result<bool, KvStoreError>>) -> PendingKvDeleteTask {
PendingKvDeleteTask(t)
}
pub fn task(self) -> PeekableTask<Result<bool, KvStoreError>> {
self.0
}
}
#[derive(Debug)]
pub struct PendingKvListTask(PeekableTask<Result<Vec<u8>, KvStoreError>>);
impl PendingKvListTask {
pub fn new(t: PeekableTask<Result<Vec<u8>, KvStoreError>>) -> PendingKvListTask {
PendingKvListTask(t)
}
pub fn task(self) -> PeekableTask<Result<Vec<u8>, KvStoreError>> {
self.0
}
}
#[derive(Debug)]
pub enum PendingDownstreamReqTask {
Complete(Result<Option<NextRequest>, Error>),
Waiting(oneshot::Receiver<NextRequest>, Instant),
}
impl PendingDownstreamReqTask {
pub fn new(
rx: Option<oneshot::Receiver<NextRequest>>,
timeout: Duration,
) -> PendingDownstreamReqTask {
if let Some(rx) = rx {
PendingDownstreamReqTask::Waiting(rx, Instant::now() + timeout)
} else {
PendingDownstreamReqTask::Complete(Ok(None))
}
}
pub async fn recv(mut self) -> Result<Option<DownstreamRequest>, Error> {
self.await_ready().await;
let Self::Complete(res) = self else {
return Ok(None);
};
let Some(res) = res? else {
return Ok(None);
};
Ok(res.into_request())
}
pub async fn await_ready(&mut self) {
if let Self::Waiting(rx, deadline) = self {
let v = if Instant::now() > *deadline {
rx.close();
rx.try_recv().ok()
} else {
tokio::time::timeout_at((*deadline).into(), rx)
.await
.ok()
.and_then(|r| r.ok())
};
*self = Self::Complete(Ok(v));
}
}
}
#[derive(Debug)]
pub struct PendingCacheTask(PeekableTask<CacheEntry>);
impl PendingCacheTask {
pub fn new(t: PeekableTask<CacheEntry>) -> PendingCacheTask {
PendingCacheTask(t)
}
pub fn task(self) -> PeekableTask<CacheEntry> {
self.0
}
pub async fn as_mut(&mut self) -> &mut Result<CacheEntry, Error> {
self.0.await_ready().await;
self.0
.get_mut()
.expect("internal error: PeekableTask was not ready after AwaitReady")
}
}
#[derive(Debug)]
pub enum AsyncItem {
Body(Body),
StreamingBody(StreamingBody),
PendingReq(PeekableTask<Response<Body>>),
PendingDownstream(PendingDownstreamReqTask),
PendingKvLookup(PendingKvLookupTask),
PendingKvInsert(PendingKvInsertTask),
PendingKvDelete(PendingKvDeleteTask),
PendingKvList(PendingKvListTask),
PendingCache(PendingCacheTask),
Ready,
}
impl AsyncItem {
pub fn is_streaming(&self) -> bool {
matches!(self, Self::StreamingBody(_))
}
pub fn as_body(&self) -> Option<&Body> {
match self {
Self::Body(body) => Some(body),
_ => None,
}
}
pub fn as_body_mut(&mut self) -> Option<&mut Body> {
match self {
Self::Body(body) => Some(body),
_ => None,
}
}
pub fn into_body(self) -> Option<Body> {
match self {
Self::Body(body) => Some(body),
_ => None,
}
}
pub fn as_streaming_mut(&mut self) -> Option<&mut StreamingBody> {
match self {
Self::StreamingBody(sender) => Some(sender),
_ => None,
}
}
pub fn into_streaming(self) -> Option<StreamingBody> {
match self {
Self::StreamingBody(streaming) => Some(streaming),
_ => None,
}
}
pub fn begin_streaming(&mut self) -> Option<Body> {
if self.is_streaming() {
return None;
}
let (streaming, receiver) = StreamingBody::new();
match std::mem::replace(self, Self::StreamingBody(streaming)) {
Self::Body(mut body) => {
body.push_back(receiver);
Some(body)
}
_ => {
unreachable!("!self.is_streaming, but was actually streaming");
}
}
}
pub fn as_pending_kv_lookup(&self) -> Option<&PendingKvLookupTask> {
match self {
Self::PendingKvLookup(req) => Some(req),
_ => None,
}
}
pub fn into_pending_kv_lookup(self) -> Option<PendingKvLookupTask> {
match self {
Self::PendingKvLookup(req) => Some(req),
_ => None,
}
}
pub fn as_pending_kv_insert(&self) -> Option<&PendingKvInsertTask> {
match self {
Self::PendingKvInsert(req) => Some(req),
_ => None,
}
}
pub fn into_pending_kv_insert(self) -> Option<PendingKvInsertTask> {
match self {
Self::PendingKvInsert(req) => Some(req),
_ => None,
}
}
pub fn as_pending_kv_delete(&self) -> Option<&PendingKvDeleteTask> {
match self {
Self::PendingKvDelete(req) => Some(req),
_ => None,
}
}
pub fn into_pending_kv_delete(self) -> Option<PendingKvDeleteTask> {
match self {
Self::PendingKvDelete(req) => Some(req),
_ => None,
}
}
pub fn as_pending_kv_list(&self) -> Option<&PendingKvListTask> {
match self {
Self::PendingKvList(req) => Some(req),
_ => None,
}
}
pub fn into_pending_kv_list(self) -> Option<PendingKvListTask> {
match self {
Self::PendingKvList(req) => Some(req),
_ => None,
}
}
pub fn as_pending_req(&self) -> Option<&PeekableTask<Response<Body>>> {
match self {
Self::PendingReq(req) => Some(req),
_ => None,
}
}
pub fn as_pending_req_mut(&mut self) -> Option<&mut PeekableTask<Response<Body>>> {
match self {
Self::PendingReq(req) => Some(req),
_ => None,
}
}
pub fn as_pending_cache(&self) -> Option<&PendingCacheTask> {
match self {
Self::PendingCache(op) => Some(op),
_ => None,
}
}
pub fn as_pending_cache_mut(&mut self) -> Option<&mut PendingCacheTask> {
match self {
Self::PendingCache(op) => Some(op),
_ => None,
}
}
pub fn into_pending_cache(self) -> Option<PendingCacheTask> {
match self {
Self::PendingCache(op) => Some(op),
_ => None,
}
}
pub fn into_pending_req(self) -> Option<PeekableTask<Response<Body>>> {
match self {
Self::PendingReq(req) => Some(req),
_ => None,
}
}
pub fn as_pending_downstream_req_mut(&mut self) -> Option<&mut PendingDownstreamReqTask> {
match self {
Self::PendingDownstream(req) => Some(req),
_ => None,
}
}
pub fn into_pending_downstream_req(self) -> Option<PendingDownstreamReqTask> {
match self {
Self::PendingDownstream(req) => Some(req),
_ => None,
}
}
pub async fn await_ready(&mut self) {
match self {
Self::StreamingBody(body) => body.await_ready().await,
Self::Body(body) => body.await_ready().await,
Self::PendingReq(req) => req.await_ready().await,
Self::PendingDownstream(req) => req.await_ready().await,
Self::PendingKvLookup(req) => req.0.await_ready().await,
Self::PendingKvInsert(req) => req.0.await_ready().await,
Self::PendingKvDelete(req) => req.0.await_ready().await,
Self::PendingKvList(req) => req.0.await_ready().await,
Self::PendingCache(req) => req.0.await_ready().await,
Self::Ready => (),
}
}
pub fn is_ready(&mut self) -> bool {
self.await_ready().now_or_never().is_some()
}
}
impl From<PeekableTask<Response<Body>>> for AsyncItem {
fn from(req: PeekableTask<Response<Body>>) -> Self {
Self::PendingReq(req)
}
}
impl From<PendingKvLookupTask> for AsyncItem {
fn from(task: PendingKvLookupTask) -> Self {
Self::PendingKvLookup(task)
}
}
impl From<PendingKvInsertTask> for AsyncItem {
fn from(task: PendingKvInsertTask) -> Self {
Self::PendingKvInsert(task)
}
}
impl From<PendingKvDeleteTask> for AsyncItem {
fn from(task: PendingKvDeleteTask) -> Self {
Self::PendingKvDelete(task)
}
}
impl From<PendingKvListTask> for AsyncItem {
fn from(task: PendingKvListTask) -> Self {
Self::PendingKvList(task)
}
}
impl From<PendingCacheTask> for AsyncItem {
fn from(task: PendingCacheTask) -> Self {
Self::PendingCache(task)
}
}
impl From<PendingDownstreamReqTask> for AsyncItem {
fn from(task: PendingDownstreamReqTask) -> Self {
Self::PendingDownstream(task)
}
}
#[derive(Debug)]
pub enum PeekableTask<T> {
Waiting(oneshot::Receiver<Result<T, Error>>),
Complete(Result<T, Error>),
}
impl<T: Send + 'static> PeekableTask<T> {
pub async fn spawn(fut: impl Future<Output = Result<T, Error>> + 'static + Send) -> Self {
let (sender, receiver) = oneshot::channel();
tokio::task::spawn(async move { sender.send(fut.await) });
Self::Waiting(receiver)
}
pub fn complete(t: T) -> Self {
PeekableTask::Complete(Ok(t))
}
pub async fn await_ready(&mut self) {
if let PeekableTask::Waiting(rx) = self {
match rx.await {
Ok(v) => *self = PeekableTask::Complete(v),
_ => {
*self = PeekableTask::Complete(Err(anyhow!(
"peekable task sender unexpectedly dropped"
)
.into()));
}
}
}
}
pub async fn recv(self) -> Result<T, Error> {
match self {
PeekableTask::Waiting(rx) => rx
.await
.map_err(|_| anyhow!("peekable task sender unexpectedly dropped"))?,
PeekableTask::Complete(res) => res,
}
}
pub fn get_mut(&mut self) -> Option<&mut Result<T, Error>> {
if let PeekableTask::Complete(res) = self {
Some(res)
} else {
None
}
}
}