use std::time::Duration;
use arrow::array::RecordBatch;
use arrow::datatypes::SchemaRef;
use super::error::ApiError;
use crate::catalog::ArrowRecord;
pub struct ArrowSubscription {
inner: laminar_core::streaming::Subscription<ArrowRecord>,
schema: SchemaRef,
active: bool,
}
impl ArrowSubscription {
pub(crate) fn new(
inner: laminar_core::streaming::Subscription<ArrowRecord>,
schema: SchemaRef,
) -> Self {
Self {
inner,
schema,
active: true,
}
}
#[must_use]
pub fn schema(&self) -> SchemaRef {
self.schema.clone()
}
#[allow(clippy::should_implement_trait)] pub fn next(&mut self) -> Result<Option<RecordBatch>, ApiError> {
if !self.active {
return Ok(None);
}
match self.inner.recv() {
Ok(batch) => Ok(Some(batch)),
Err(laminar_core::streaming::RecvError::Disconnected) => {
self.active = false;
Ok(None)
}
Err(e) => Err(ApiError::subscription(e.to_string())),
}
}
pub fn next_timeout(&mut self, timeout: Duration) -> Result<Option<RecordBatch>, ApiError> {
if !self.active {
return Ok(None);
}
match self.inner.recv_timeout(timeout) {
Ok(batch) => Ok(Some(batch)),
Err(laminar_core::streaming::RecvError::Disconnected) => {
self.active = false;
Ok(None)
}
Err(laminar_core::streaming::RecvError::Timeout) => {
Err(ApiError::subscription_timeout())
}
}
}
pub fn try_next(&mut self) -> Result<Option<RecordBatch>, ApiError> {
if !self.active {
return Ok(None);
}
Ok(self.inner.poll())
}
#[must_use]
pub fn is_active(&self) -> bool {
self.active && !self.inner.is_disconnected()
}
pub fn cancel(&mut self) {
self.active = false;
}
}
unsafe impl Send for ArrowSubscription {}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_subscription_send() {
fn assert_send<T: Send>() {}
assert_send::<ArrowSubscription>();
}
}