Skip to main content

krishiv_sql/
kafka_table.rs

1use arrow::datatypes::SchemaRef;
2use arrow::record_batch::RecordBatch;
3use datafusion::catalog::TableProvider;
4use datafusion::catalog::streaming::StreamingTable;
5use std::sync::Arc;
6
7use datafusion::error::{DataFusionError, Result as DataFusionResult};
8use datafusion::physical_plan::SendableRecordBatchStream;
9use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
10use datafusion::physical_plan::streaming::PartitionStream;
11use krishiv_connectors::Source;
12use krishiv_connectors::kafka::{KafkaConfig, KafkaSource};
13
14// Auto-commit interval for dev-local streaming SQL (at-least-once). Durable profiles
15// use manual commit aligned with checkpoint barriers.
16const STREAMING_AUTO_COMMIT_MS: u64 = 1_000;
17
18pub(crate) fn kafka_auto_commit_interval_ms() -> Option<u64> {
19    let profile = std::env::var("KRISHIV_DURABILITY_PROFILE")
20        .ok()
21        .and_then(|v| v.parse().ok())
22        .unwrap_or(krishiv_common::DurabilityProfile::DevLocal);
23    if krishiv_common::requires_manual_kafka_commit(profile) {
24        None
25    } else {
26        Some(STREAMING_AUTO_COMMIT_MS)
27    }
28}
29
30pub(crate) struct KafkaPartitionStream {
31    schema: SchemaRef,
32    source: Arc<tokio::sync::Mutex<KafkaSource>>,
33    /// Handle to the spawned Kafka consumer task; stored so it can be aborted
34    /// if the stream is dropped before the consumer loop exits.
35    consumer_task: std::sync::Mutex<Option<tokio::task::JoinHandle<()>>>,
36}
37
38impl KafkaPartitionStream {
39    pub fn new(schema: SchemaRef, source: KafkaSource) -> Self {
40        Self {
41            schema,
42            source: Arc::new(tokio::sync::Mutex::new(source)),
43            consumer_task: std::sync::Mutex::new(None),
44        }
45    }
46}
47
48impl std::fmt::Debug for KafkaPartitionStream {
49    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
50        f.debug_struct("KafkaPartitionStream").finish()
51    }
52}
53
54impl PartitionStream for KafkaPartitionStream {
55    fn schema(&self) -> &SchemaRef {
56        &self.schema
57    }
58
59    fn execute(&self, _ctx: Arc<datafusion::execution::TaskContext>) -> SendableRecordBatchStream {
60        let source = self.source.clone();
61        let schema = self.schema.clone();
62        let manual_commit = kafka_auto_commit_interval_ms().is_none();
63
64        // Use an async channel so the polling loop can run indefinitely.
65        // `Ok(None)` from `read_batch` means "no message on this poll cycle"
66        // for an unbounded topic — we keep looping rather than ending the stream.
67        let (tx, rx) = tokio::sync::mpsc::channel::<Result<RecordBatch, DataFusionError>>(64);
68
69        let task = tokio::spawn(async move {
70            loop {
71                // Check cancellation before doing any I/O: if the DataFusion
72                // executor dropped the stream, stop immediately rather than
73                // waiting up to poll_timeout_ms to detect it on the next send.
74                if tx.is_closed() {
75                    break;
76                }
77                let res = {
78                    let mut guard = source.lock().await;
79                    guard.read_batch().await
80                };
81                match res {
82                    Ok(Some(batch)) if batch.num_rows() == 0 => {
83                        // Empty batch (tombstone / non-UTF-8 skip) — keep polling.
84                    }
85                    Ok(Some(batch)) => {
86                        let send_result = match project_batch(&batch, &schema) {
87                            Ok(projected) => tx.send(Ok(projected)).await,
88                            Err(e) => {
89                                tx.send(Err(DataFusionError::ArrowError(Box::new(e), None)))
90                                    .await
91                            }
92                        };
93                        if send_result.is_err() {
94                            break; // receiver dropped — query cancelled
95                        }
96                        if manual_commit {
97                            let guard = source.lock().await;
98                            guard.commit_current_offset();
99                        }
100                    }
101                    Ok(None) => {
102                        // Poll timeout — no message ready; yield and retry.
103                        tokio::time::sleep(tokio::time::Duration::from_millis(20)).await;
104                    }
105                    Err(e) => {
106                        let _ = tx.send(Err(DataFusionError::External(Box::new(e)))).await;
107                        break;
108                    }
109                }
110            }
111        });
112        *self.consumer_task.lock().unwrap_or_else(|p| p.into_inner()) = Some(task);
113
114        let recv_stream = tokio_stream::wrappers::ReceiverStream::new(rx);
115        Box::pin(RecordBatchStreamAdapter::new(
116            self.schema.clone(),
117            recv_stream,
118        ))
119    }
120}
121
122/// Project and cast a raw connector batch to the declared table schema.
123///
124/// Missing columns → typed null arrays.
125/// Cast failures → null arrays with a tracing warning (no silent data loss).
126pub(crate) fn project_batch(
127    batch: &RecordBatch,
128    schema: &SchemaRef,
129) -> Result<RecordBatch, arrow::error::ArrowError> {
130    let mut cols = Vec::with_capacity(schema.fields().len());
131    for field in schema.fields() {
132        let col = if let Ok(idx) = batch.schema().index_of(field.name()) {
133            let src = batch.column(idx);
134            arrow::compute::cast(src, field.data_type()).map_err(|e| {
135                arrow::error::ArrowError::CastError(format!(
136                    "Kafka column '{}': cast from {} to {} failed: {e}",
137                    field.name(),
138                    src.data_type(),
139                    field.data_type(),
140                ))
141            })?
142        } else {
143            arrow::array::new_null_array(field.data_type(), batch.num_rows())
144        };
145        cols.push(col);
146    }
147    RecordBatch::try_new(schema.clone(), cols)
148}
149
150/// Build a DataFusion `StreamingTable` backed by a live Kafka/Redpanda topic.
151///
152/// Enables rdkafka auto-commit at 1 s intervals for at-least-once delivery.
153/// Callers that prefer SQL DDL can use `CREATE EXTERNAL TABLE … STORED AS KAFKA`.
154pub fn create_kafka_streaming_table(
155    schema: SchemaRef,
156    config: KafkaConfig,
157) -> DataFusionResult<Arc<dyn TableProvider>> {
158    let config = match kafka_auto_commit_interval_ms() {
159        Some(ms) => config.with_auto_commit(ms),
160        None => config,
161    };
162    let source = KafkaSource::new(config).map_err(|e| DataFusionError::External(Box::new(e)))?;
163    let partition = Arc::new(KafkaPartitionStream::new(schema.clone(), source));
164    let table = StreamingTable::try_new(schema, vec![partition])?;
165    Ok(Arc::new(table))
166}