use std::pin::Pin;
use std::task::{Context, Poll};
use arrow::record_batch::RecordBatch;
use chrono::{DateTime, Utc};
use futures::Stream;
use crate::trigger::error::TriggerError;
use crate::trigger::ids::SubscriptionId;
use crate::trigger::offset::Offset;
#[derive(Debug, Clone)]
pub struct DeliveredBatch {
pub offset: Offset,
pub produced_at: DateTime<Utc>,
pub batch: RecordBatch,
}
pub struct Subscription {
pub id: SubscriptionId,
inner: Pin<Box<dyn Stream<Item = Result<DeliveredBatch, TriggerError>> + Send + 'static>>,
}
impl Subscription {
pub fn new(
id: SubscriptionId,
inner: Pin<Box<dyn Stream<Item = Result<DeliveredBatch, TriggerError>> + Send + 'static>>,
) -> Self {
Self { id, inner }
}
}
impl Stream for Subscription {
type Item = Result<DeliveredBatch, TriggerError>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
this.inner.as_mut().poll_next(cx)
}
}