use std::{sync::Arc, time::Duration};
use arrow::array::RecordBatch;
use futures::stream::BoxStream;
use iceberg::table::Table;
use super_visor::{ManagedProc, ShutdownSignal};
use tokio::{
sync::{mpsc, oneshot},
time,
};
use tracing::{debug, error, info, trace, warn};
use crate::error::Result;
const DEFAULT_POLL_INTERVAL_SECS: u64 = 30;
const DEFAULT_CHANNEL_SIZE: usize = 10;
const DEFAULT_SEND_TIMEOUT: Duration = Duration::from_secs(30);
enum PollOutcome {
Continue,
ConsumerGone,
}
pub struct IcebergFileStream {
pub snapshot_id: i64,
pub table_name: String,
pub batches: BoxStream<'static, iceberg::Result<RecordBatch>>,
ack_tx: Option<oneshot::Sender<()>>,
}
impl IcebergFileStream {
pub fn ack(mut self) {
if let Some(tx) = self.ack_tx.take() {
let _ = tx.send(());
}
}
}
pub type IcebergStreamReceiver = mpsc::Receiver<IcebergFileStream>;
pub struct IcebergPollerConfig {
table: Table,
catalog: Arc<dyn iceberg::Catalog>,
poll_interval: Duration,
send_timeout: Duration,
label: String,
}
pub struct IcebergPollerConfigBuilder {
table: Table,
catalog: Arc<dyn iceberg::Catalog>,
poll_interval: Duration,
send_timeout: Duration,
channel_size: usize,
label: String,
start_after_snapshot: Option<i64>,
}
impl IcebergPollerConfigBuilder {
pub fn new(table: Table, catalog: Arc<dyn iceberg::Catalog>, label: impl Into<String>) -> Self {
Self {
table,
catalog,
poll_interval: Duration::from_secs(DEFAULT_POLL_INTERVAL_SECS),
send_timeout: DEFAULT_SEND_TIMEOUT,
channel_size: DEFAULT_CHANNEL_SIZE,
label: label.into(),
start_after_snapshot: None,
}
}
pub fn poll_interval(self, interval: Duration) -> Self {
Self {
poll_interval: interval,
..self
}
}
pub fn channel_size(self, size: usize) -> Self {
Self {
channel_size: size,
..self
}
}
pub fn send_timeout(self, timeout: Duration) -> Self {
Self {
send_timeout: timeout,
..self
}
}
pub fn start_after_snapshot(self, snapshot_id: i64) -> Self {
Self {
start_after_snapshot: Some(snapshot_id),
..self
}
}
pub fn create(self) -> (IcebergStreamReceiver, IcebergPollerServer) {
let (tx, rx) = mpsc::channel(self.channel_size);
let server = IcebergPollerServer {
config: IcebergPollerConfig {
table: self.table,
catalog: self.catalog,
poll_interval: self.poll_interval,
send_timeout: self.send_timeout,
label: self.label,
},
sender: tx,
last_snapshot_id: self.start_after_snapshot,
pending_ack: None,
};
(rx, server)
}
}
pub struct IcebergPollerServer {
config: IcebergPollerConfig,
sender: mpsc::Sender<IcebergFileStream>,
last_snapshot_id: Option<i64>,
pending_ack: Option<(i64, oneshot::Receiver<()>)>,
}
impl ManagedProc for IcebergPollerServer {
fn run_proc(self: Box<Self>, shutdown: ShutdownSignal) -> super_visor::ManagedFuture {
super_visor::spawn(self.run(shutdown))
}
}
impl IcebergPollerServer {
pub fn last_snapshot_id(&self) -> Option<i64> {
self.last_snapshot_id
}
pub async fn run(mut self, mut shutdown: ShutdownSignal) -> Result {
info!(label = self.config.label, "starting iceberg poller");
let mut poll_timer = time::interval(self.config.poll_interval);
poll_timer.set_missed_tick_behavior(time::MissedTickBehavior::Skip);
loop {
tokio::select! {
biased;
_ = &mut shutdown => break,
_ = poll_timer.tick() => {
match self.poll_once().await {
Ok(PollOutcome::Continue) => {}
Ok(PollOutcome::ConsumerGone) => {
info!(label = self.config.label, "consumer gone, exiting poller");
break;
}
Err(err) => {
warn!(
label = self.config.label,
?err,
"iceberg poll iteration failed"
);
}
}
}
}
}
info!(label = self.config.label, "stopping iceberg poller");
Ok(())
}
async fn poll_once(&mut self) -> Result<PollOutcome> {
if let Some((acked_id, ref mut ack_rx)) = self.pending_ack {
match ack_rx.try_recv() {
Ok(()) => {
debug!(
label = self.config.label,
snapshot_id = acked_id,
"consumer acknowledged snapshot"
);
self.last_snapshot_id = Some(acked_id);
self.pending_ack = None;
}
Err(oneshot::error::TryRecvError::Empty) => {
debug!(
label = self.config.label,
snapshot_id = acked_id,
"waiting for consumer to acknowledge previous snapshot"
);
return Ok(PollOutcome::Continue);
}
Err(oneshot::error::TryRecvError::Closed) => {
warn!(
label = self.config.label,
snapshot_id = acked_id,
"consumer dropped stream without acknowledging, will re-send"
);
self.pending_ack = None;
}
}
}
let table = self
.config
.catalog
.load_table(self.config.table.identifier())
.await?;
self.config.table = table;
let current_snapshot = self.config.table.metadata().current_snapshot();
let current_id = current_snapshot.map(|s| s.snapshot_id());
let Some(current_id) = current_id else {
debug!(label = self.config.label, "no snapshot found");
return Ok(PollOutcome::Continue);
};
if self.last_snapshot_id == Some(current_id) {
trace!(
label = self.config.label,
snapshot_id = current_id,
"no new snapshot"
);
return Ok(PollOutcome::Continue);
}
info!(
label = self.config.label,
snapshot_id = current_id,
previous = ?self.last_snapshot_id,
"new iceberg snapshot detected"
);
let stream = if let Some(after_id) = self.last_snapshot_id {
super::scanner::scan_since_snapshot(&self.config.table, after_id).await?
} else {
super::scanner::scan_snapshot(&self.config.table, current_id).await?
};
let table_name = self.config.table.identifier().to_string();
let (ack_tx, ack_rx) = oneshot::channel();
let file_stream = IcebergFileStream {
snapshot_id: current_id,
table_name,
batches: stream,
ack_tx: Some(ack_tx),
};
match self
.sender
.send_timeout(file_stream, self.config.send_timeout)
.await
{
Ok(()) => {
self.pending_ack = Some((current_id, ack_rx));
}
Err(mpsc::error::SendTimeoutError::Closed(_)) => {
error!(
label = self.config.label,
"iceberg poller channel closed, consumer is gone"
);
return Ok(PollOutcome::ConsumerGone);
}
Err(mpsc::error::SendTimeoutError::Timeout(_)) => {
warn!(
label = self.config.label,
timeout_secs = self.config.send_timeout.as_secs(),
"iceberg poller backpressure: consumer not draining fast enough, skipping snapshot"
);
}
}
Ok(PollOutcome::Continue)
}
}