use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use arrow::array::{ArrayRef, Int64Array, RecordBatch};
use arrow_schema::SchemaRef;
use chrono::Utc;
use parking_lot::Mutex;
use tokio::sync::Mutex as AsyncMutex;
use crate::catalog::backend::{BackendImpl, TxOptions};
use crate::source::mutable::MutableTableRegistry;
use crate::store::mutable::definition::MutableTableId;
use crate::tenant::TenantId;
use crate::trigger::broker::TriggerBroker;
use crate::trigger::error::TriggerError;
use crate::trigger::ids::TopicId;
use crate::trigger::offset::Offset;
use crate::trigger::topic::{augment_schema_for_backing, TopicDefinition};
pub struct Publisher {
broker: Arc<dyn TriggerBroker>,
backend: Arc<BackendImpl>,
mutable: Arc<MutableTableRegistry>,
counters: Mutex<HashMap<TopicId, Arc<TopicCounter>>>,
}
struct TopicCounter {
write_lock: AsyncMutex<()>,
next: AtomicU64,
}
impl TopicCounter {
fn new() -> Self {
Self {
write_lock: AsyncMutex::new(()),
next: AtomicU64::new(u64::MAX),
}
}
}
impl Publisher {
pub fn new(
broker: Arc<dyn TriggerBroker>,
backend: Arc<BackendImpl>,
mutable: Arc<MutableTableRegistry>,
) -> Self {
Self {
broker,
backend,
mutable,
counters: Mutex::new(HashMap::new()),
}
}
pub async fn publish_scoped(
&self,
topic: &TopicDefinition,
tenant: Option<TenantId>,
user_batch: RecordBatch,
) -> Result<Offset, TriggerError> {
if user_batch.schema().as_ref() != topic.schema.as_ref() {
return Err(TriggerError::BatchSchemaMismatch(format!(
"topic '{}' expected {} columns, got {}",
topic.name,
topic.schema.fields().len(),
user_batch.schema().fields().len()
)));
}
if let Some(topic_tenant) = topic.tenant {
if tenant != Some(topic_tenant) {
return Err(TriggerError::PublishTenantMismatch {
topic: topic.name.clone(),
topic_tenant: Some(topic_tenant),
publish_tenant: tenant,
});
}
}
let counter = self.counter_for(topic.id);
let _guard = counter.write_lock.lock().await;
let backing_table_id = MutableTableId::new(topic.backing_table_name())
.map_err(|e| TriggerError::Catalog(e.to_string()))?;
if counter.next.load(Ordering::Acquire) == u64::MAX {
let next = self.read_next_offset(backing_table_id.as_str()).await?;
counter.next.store(next, Ordering::Release);
}
let offset_value = counter.next.load(Ordering::Acquire);
let produced_at = Utc::now();
let produced_at_micros = produced_at.timestamp_micros();
let augmented = augment_batch_for_backing(
&topic.schema,
&user_batch,
offset_value,
produced_at_micros,
)?;
let registry = Arc::clone(&self.mutable);
let id_for_closure = backing_table_id.clone();
let augmented_for_closure = augmented;
self.backend
.transaction(TxOptions::default(), move |tx| {
let registry = Arc::clone(®istry);
let id = id_for_closure.clone();
let augmented = augmented_for_closure.clone();
Box::pin(async move {
tx.set_tenant(tenant);
registry
.insert_batch(tx, &id, &augmented)
.await
.map_err(|e| {
crate::catalog::backend::BackendError::Execution(e.to_string())
})?;
Ok::<(), crate::catalog::backend::BackendError>(())
})
})
.await?;
counter.next.store(offset_value + 1, Ordering::Release);
match self
.broker
.publish(topic.id, user_batch, produced_at, offset_value)
.await
{
Ok(off) => Ok(off),
Err(err) => {
tracing::warn!(
topic = %topic.name,
offset = offset_value,
error = %err,
"broker fan-out failed; backing table is authoritative"
);
Ok(Offset::new(offset_value, produced_at))
}
}
}
fn counter_for(&self, topic_id: TopicId) -> Arc<TopicCounter> {
let mut guard = self.counters.lock();
if let Some(existing) = guard.get(&topic_id) {
return Arc::clone(existing);
}
let new = Arc::new(TopicCounter::new());
guard.insert(topic_id, Arc::clone(&new));
new
}
async fn read_next_offset(&self, backing_table: &str) -> Result<u64, TriggerError> {
let sql = format!(
"SELECT COALESCE(MAX(\"_offset\"), -1) + 1 AS next FROM \"{}\"",
backing_table.replace('"', "\"\"")
);
let result = self
.backend
.transaction(
TxOptions {
read_only: true,
..Default::default()
},
move |tx| {
let sql = sql.clone();
Box::pin(async move {
let rows: Vec<i64> =
tx.query(&sql, &[], |row| row.get::<i64>("next")).await?;
Ok::<i64, crate::catalog::backend::BackendError>(
rows.into_iter().next().unwrap_or(0),
)
})
},
)
.await?;
Ok(result.max(0) as u64)
}
}
fn augment_batch_for_backing(
user_schema: &SchemaRef,
user_batch: &RecordBatch,
offset_value: u64,
produced_at_micros: i64,
) -> Result<RecordBatch, TriggerError> {
let n = user_batch.num_rows();
let augmented_schema = Arc::new(augment_schema_for_backing(user_schema));
let row_indices: Vec<i64> = (0..n as i64).collect();
let mut columns: Vec<ArrayRef> = Vec::with_capacity(user_batch.num_columns() + 3);
columns.push(Arc::new(Int64Array::from(vec![offset_value as i64; n])));
columns.push(Arc::new(Int64Array::from(row_indices)));
columns.push(Arc::new(Int64Array::from(vec![produced_at_micros; n])));
for c in user_batch.columns() {
columns.push(c.clone());
}
RecordBatch::try_new(augmented_schema, columns)
.map_err(|e| TriggerError::BatchSchemaMismatch(e.to_string()))
}