use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
use std::sync::Arc;
use aerospike_rt::Mutex;
use async_channel::{Receiver, Sender};
use crate::errors::Result;
use crate::query::{PartitionFilter, PartitionTracker};
use crate::Record;
pub struct RecordStream(Arc<Recordset>);
#[derive(Debug)]
pub struct Recordset {
instances: AtomicUsize,
rx: Receiver<Result<Record>>,
tx: Sender<Result<Record>>,
active: AtomicBool,
task_id: AtomicU64,
pub(crate) tracker: Arc<Mutex<PartitionTracker>>,
}
impl Drop for Recordset {
fn drop(&mut self) {
self.close();
}
}
impl Recordset {
pub(crate) fn new(
rec_queue_size: usize,
nodes: usize,
tracker: Arc<Mutex<PartitionTracker>>,
) -> Self {
let task_id = rand::random::<u64>();
let (tx, rx) = async_channel::bounded(rec_queue_size);
Recordset {
instances: AtomicUsize::new(nodes),
rx,
tx,
active: AtomicBool::new(true),
task_id: AtomicU64::new(task_id),
tracker,
}
}
pub fn close(&self) {
self.active.store(false, Ordering::Relaxed);
}
pub fn is_active(&self) -> bool {
self.active.load(Ordering::Relaxed)
}
pub(crate) fn set_instances(&self, count: usize) {
self.instances.store(count, Ordering::Relaxed);
}
pub(crate) fn reset_task_id(&self) {
let task_id = rand::random::<u64>();
self.task_id.store(task_id, Ordering::Relaxed);
}
pub(crate) async fn err(&self, e: crate::Error) {
let _ = self.tx.clone().send(Err(e)).await;
}
pub(crate) async fn push(&self, record: Result<Record>) -> Result<()> {
match record {
Err(crate::Error::StreamTerminatedError()) => Ok(()),
_ => match self.tx.send(record).await {
Ok(()) => Ok(()),
Err(_) => Err(crate::Error::StreamTerminatedError()),
},
}
}
pub(crate) fn task_id(&self) -> u64 {
self.task_id.load(Ordering::Relaxed) as u64
}
pub(crate) fn signal_end(&self) {
if self.instances.fetch_sub(1, Ordering::Relaxed) == 1 {
self.close();
}
}
pub async fn partition_filter(&self) -> Option<PartitionFilter> {
if !self.is_active() {
return self.tracker.lock().await.extract_partition_filter();
}
None
}
#[cfg(feature = "sync")]
pub fn next_record(&self) -> Option<Result<Record>> {
self.rx.try_recv().ok()
}
pub const fn into_stream(self: Arc<Self>) -> RecordStream {
RecordStream(self)
}
}
#[cfg(feature = "sync")]
impl Iterator for &Recordset {
type Item = Result<Record>;
fn next(&mut self) -> Option<Result<Record>> {
use futures::executor::block_on;
loop {
let result = self.next_record();
if result.is_some() {
return result;
}
if self.is_active() {
block_on(aerospike_rt::task::yield_now());
continue;
}
return None;
}
}
}
impl futures::Stream for RecordStream {
type Item = Result<Record>;
fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
match self.0.rx.try_recv() {
Ok(r) => std::task::Poll::Ready(Some(r)),
Err(e) => {
if !self.0.is_active() && e.is_empty() {
std::task::Poll::Ready(None)
} else {
cx.waker().wake_by_ref();
std::task::Poll::Pending
}
}
}
}
}
impl AsRef<Recordset> for RecordStream {
fn as_ref(&self) -> &Recordset {
&self.0
}
}
impl RecordStream {
pub async fn partition_filter(&self) -> Option<PartitionFilter> {
self.0.partition_filter().await
}
}