krishiv_sql/
kafka_table.rs1use arrow::datatypes::SchemaRef;
2use arrow::record_batch::RecordBatch;
3use datafusion::catalog::TableProvider;
4use datafusion::catalog::streaming::StreamingTable;
5use std::sync::Arc;
6
7use datafusion::error::{DataFusionError, Result as DataFusionResult};
8use datafusion::physical_plan::SendableRecordBatchStream;
9use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
10use datafusion::physical_plan::streaming::PartitionStream;
11use krishiv_connectors::Source;
12use krishiv_connectors::kafka::{KafkaConfig, KafkaSource};
13
14const STREAMING_AUTO_COMMIT_MS: u64 = 1_000;
17
18pub(crate) fn kafka_auto_commit_interval_ms() -> Option<u64> {
19 let profile = std::env::var("KRISHIV_DURABILITY_PROFILE")
20 .ok()
21 .and_then(|v| v.parse().ok())
22 .unwrap_or(krishiv_common::DurabilityProfile::DevLocal);
23 if krishiv_common::requires_manual_kafka_commit(profile) {
24 None
25 } else {
26 Some(STREAMING_AUTO_COMMIT_MS)
27 }
28}
29
30pub(crate) struct KafkaPartitionStream {
31 schema: SchemaRef,
32 source: Arc<tokio::sync::Mutex<KafkaSource>>,
33 consumer_task: std::sync::Mutex<Option<tokio::task::JoinHandle<()>>>,
36}
37
38impl KafkaPartitionStream {
39 pub fn new(schema: SchemaRef, source: KafkaSource) -> Self {
40 Self {
41 schema,
42 source: Arc::new(tokio::sync::Mutex::new(source)),
43 consumer_task: std::sync::Mutex::new(None),
44 }
45 }
46}
47
48impl std::fmt::Debug for KafkaPartitionStream {
49 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
50 f.debug_struct("KafkaPartitionStream").finish()
51 }
52}
53
54impl PartitionStream for KafkaPartitionStream {
55 fn schema(&self) -> &SchemaRef {
56 &self.schema
57 }
58
59 fn execute(&self, _ctx: Arc<datafusion::execution::TaskContext>) -> SendableRecordBatchStream {
60 let source = self.source.clone();
61 let schema = self.schema.clone();
62 let manual_commit = kafka_auto_commit_interval_ms().is_none();
63
64 let (tx, rx) = tokio::sync::mpsc::channel::<Result<RecordBatch, DataFusionError>>(64);
68
69 let task = tokio::spawn(async move {
70 loop {
71 if tx.is_closed() {
75 break;
76 }
77 let res = {
78 let mut guard = source.lock().await;
79 guard.read_batch().await
80 };
81 match res {
82 Ok(Some(batch)) if batch.num_rows() == 0 => {
83 }
85 Ok(Some(batch)) => {
86 let send_result = match project_batch(&batch, &schema) {
87 Ok(projected) => tx.send(Ok(projected)).await,
88 Err(e) => {
89 tx.send(Err(DataFusionError::ArrowError(Box::new(e), None)))
90 .await
91 }
92 };
93 if send_result.is_err() {
94 break; }
96 if manual_commit {
97 let guard = source.lock().await;
98 guard.commit_current_offset();
99 }
100 }
101 Ok(None) => {
102 tokio::time::sleep(tokio::time::Duration::from_millis(20)).await;
104 }
105 Err(e) => {
106 let _ = tx.send(Err(DataFusionError::External(Box::new(e)))).await;
107 break;
108 }
109 }
110 }
111 });
112 *self.consumer_task.lock().unwrap_or_else(|p| p.into_inner()) = Some(task);
113
114 let recv_stream = tokio_stream::wrappers::ReceiverStream::new(rx);
115 Box::pin(RecordBatchStreamAdapter::new(
116 self.schema.clone(),
117 recv_stream,
118 ))
119 }
120}
121
122pub(crate) fn project_batch(
127 batch: &RecordBatch,
128 schema: &SchemaRef,
129) -> Result<RecordBatch, arrow::error::ArrowError> {
130 let mut cols = Vec::with_capacity(schema.fields().len());
131 for field in schema.fields() {
132 let col = if let Ok(idx) = batch.schema().index_of(field.name()) {
133 let src = batch.column(idx);
134 arrow::compute::cast(src, field.data_type()).map_err(|e| {
135 arrow::error::ArrowError::CastError(format!(
136 "Kafka column '{}': cast from {} to {} failed: {e}",
137 field.name(),
138 src.data_type(),
139 field.data_type(),
140 ))
141 })?
142 } else {
143 arrow::array::new_null_array(field.data_type(), batch.num_rows())
144 };
145 cols.push(col);
146 }
147 RecordBatch::try_new(schema.clone(), cols)
148}
149
150pub fn create_kafka_streaming_table(
155 schema: SchemaRef,
156 config: KafkaConfig,
157) -> DataFusionResult<Arc<dyn TableProvider>> {
158 let config = match kafka_auto_commit_interval_ms() {
159 Some(ms) => config.with_auto_commit(ms),
160 None => config,
161 };
162 let source = KafkaSource::new(config).map_err(|e| DataFusionError::External(Box::new(e)))?;
163 let partition = Arc::new(KafkaPartitionStream::new(schema.clone(), source));
164 let table = StreamingTable::try_new(schema, vec![partition])?;
165 Ok(Arc::new(table))
166}