use std::{
collections::VecDeque,
fmt::Debug,
sync::{Arc, RwLock},
};
use datafusion::arrow::record_batch::RecordBatch;
use futures::{future::BoxFuture, stream::FuturesOrdered, Future, FutureExt, StreamExt};
use tokio::sync::{
mpsc::{self, error::TrySendError},
Notify,
};
use tracing::Instrument;
#[derive(Debug)]
pub struct ValueTracker {
items: VecDeque<RecordBatch>,
pending: usize,
}
impl ValueTracker {
fn new() -> Self {
let items = VecDeque::new();
let pending = 0;
Self { items, pending }
}
fn new_count(&self) -> usize {
self.items.len() - self.pending
}
fn push(&mut self, item: RecordBatch) {
self.items.push_front(item);
}
fn process(&mut self, count: usize) -> Vec<RecordBatch> {
debug_assert!(count <= self.items.len() - self.pending);
let start = self.pending;
self.pending += count;
self.items
.iter()
.rev()
.skip(start)
.take(count)
.cloned()
.collect::<Vec<_>>()
}
fn finish(&mut self, count: usize) {
debug_assert!(count <= self.pending);
self.items.truncate(self.items.len() - count);
self.pending -= count;
}
fn values(&self) -> Vec<RecordBatch> {
Vec::from(self.items.clone())
}
}
#[derive(Debug)]
pub struct WorkQueueIn<T> {
values: Arc<RwLock<ValueTracker>>,
send: mpsc::Sender<(usize, BoxFuture<'static, crate::Result<T>>)>,
}
impl<T> WorkQueueIn<T>
where
T: Send + 'static,
{
pub fn push(&self, item: RecordBatch) {
self.values.write().unwrap().push(item);
}
pub fn values(&self) -> Vec<RecordBatch> {
self.values.read().unwrap().values()
}
pub fn try_process<F, Fut>(&self, f: F) -> crate::Result<()>
where
F: FnOnce(Vec<RecordBatch>) -> Fut,
Fut: Future<Output = crate::Result<T>> + Send + 'static,
{
let (count, values) = {
let mut v = self.values.write().unwrap();
let count = v.new_count();
(count, v.process(count))
};
match self.send.try_send((count, Box::pin(f(values)))) {
Ok(_) => Ok(()),
Err(TrySendError::Closed(_)) => Err(crate::EngineError::TableClosed.into()),
Err(TrySendError::Full(_)) => Err(crate::EngineError::TableQueueFull.into()),
}
}
pub fn process<F, Fut>(&self, f: F) -> crate::Result<()>
where
F: FnOnce(Vec<RecordBatch>) -> Fut,
Fut: Future<Output = T> + Send + 'static,
{
self.try_process(|values| f(values).map(crate::Result::Ok))
}
}
#[derive(Debug)]
pub struct WorkQueueOut<T> {
values: Arc<RwLock<ValueTracker>>,
recv: mpsc::Receiver<(usize, crate::Result<T>)>,
stop: Arc<Notify>,
}
impl<T> WorkQueueOut<T> {
pub async fn ready(&mut self) -> Option<crate::Result<T>> {
if let Some((count, item)) = self.recv.recv().await {
self.values.write().unwrap().finish(count);
Some(item)
} else {
None
}
}
pub fn close(&mut self) {
self.stop.notify_one();
}
}
pub fn work_queue<T>(capacity: usize) -> (WorkQueueIn<T>, WorkQueueOut<T>)
where
T: Send + 'static,
{
let stop = Arc::new(Notify::new());
let (send, worker_recv) = mpsc::channel(capacity);
let (worker_send, recv) = mpsc::channel(capacity);
tokio::spawn(process_queue(worker_recv, worker_send, stop.clone()).in_current_span());
let values = Arc::new(RwLock::new(ValueTracker::new()));
let queue_in = WorkQueueIn {
values: values.clone(),
send,
};
let queue_out = WorkQueueOut { values, recv, stop };
(queue_in, queue_out)
}
async fn process_queue<T>(
mut recv: mpsc::Receiver<(usize, BoxFuture<'static, crate::Result<T>>)>,
send: mpsc::Sender<(usize, crate::Result<T>)>,
stop: Arc<Notify>,
) {
let done = stop.notified().fuse();
futures::pin_mut!(done);
let mut pending = FuturesOrdered::new();
let mut counts = VecDeque::new();
loop {
tokio::select! {
item = recv.recv() => match item {
Some((count, job)) => {
counts.push_back(count);
pending.push_back(job);
},
None => break,
},
res = pending.next(), if !pending.is_empty() => {
let count = counts.pop_front().unwrap();
if send.try_send((count, res.unwrap())).is_err() {
tracing::error!("failed to send work queue result");
}
},
_ = &mut done => {
recv.close()
},
}
}
while let Some(res) = pending.next().await {
let count = counts.pop_front().unwrap();
let _ = send.try_send((count, res));
}
}