1use std::fmt::Formatter;
21use std::fs::{File, OpenOptions};
22use std::io::BufReader;
23use std::path::PathBuf;
24use std::str::FromStr;
25use std::sync::Arc;
26
27use crate::{Session, TableProvider, TableProviderFactory};
28use arrow::array::{RecordBatch, RecordBatchReader, RecordBatchWriter};
29use arrow::datatypes::SchemaRef;
30use datafusion_common::{Constraints, DataFusionError, Result, config_err, plan_err};
31use datafusion_common_runtime::SpawnedTask;
32use datafusion_datasource::sink::{DataSink, DataSinkExec};
33use datafusion_execution::{SendableRecordBatchStream, TaskContext};
34use datafusion_expr::dml::InsertOp;
35use datafusion_expr::{CreateExternalTable, Expr, SortExpr, TableType};
36use datafusion_physical_expr::create_lex_ordering;
37use datafusion_physical_plan::stream::RecordBatchReceiverStreamBuilder;
38use datafusion_physical_plan::streaming::{PartitionStream, StreamingTableExec};
39use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan};
40
41use async_trait::async_trait;
42use futures::StreamExt;
43
44#[derive(Debug, Default)]
46pub struct StreamTableFactory {}
47
48#[async_trait]
49impl TableProviderFactory for StreamTableFactory {
50 async fn create(
51 &self,
52 state: &dyn Session,
53 cmd: &CreateExternalTable,
54 ) -> Result<Arc<dyn TableProvider>> {
55 let schema: SchemaRef = Arc::clone(cmd.schema.inner());
56 let location = cmd.location.clone();
57 let encoding = cmd.file_type.parse()?;
58 let header = if let Ok(opt) = cmd
59 .options
60 .get("format.has_header")
61 .map(|has_header| bool::from_str(has_header.to_lowercase().as_str()))
62 .transpose()
63 {
64 opt.unwrap_or(false)
65 } else {
66 return config_err!(
67 "Valid values for format.has_header option are 'true' or 'false'"
68 );
69 };
70
71 let source = FileStreamProvider::new_file(schema, location.into())
72 .with_encoding(encoding)
73 .with_batch_size(state.config().batch_size())
74 .with_header(header);
75
76 let config = StreamConfig::new(Arc::new(source))
77 .with_order(cmd.order_exprs.clone())
78 .with_constraints(cmd.constraints.clone());
79
80 Ok(Arc::new(StreamTable(Arc::new(config))))
81 }
82}
83
84#[derive(Debug, Clone)]
86pub enum StreamEncoding {
87 Csv,
89 Json,
91}
92
93impl FromStr for StreamEncoding {
94 type Err = DataFusionError;
95
96 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
97 match s.to_ascii_lowercase().as_str() {
98 "csv" => Ok(Self::Csv),
99 "json" => Ok(Self::Json),
100 _ => plan_err!("Unrecognized StreamEncoding {}", s),
101 }
102 }
103}
104
105pub trait StreamProvider: std::fmt::Debug + Send + Sync {
109 fn schema(&self) -> &SchemaRef;
111 fn reader(&self) -> Result<Box<dyn RecordBatchReader>>;
113 fn writer(&self) -> Result<Box<dyn RecordBatchWriter>> {
115 unimplemented!()
116 }
117 fn stream_write_display(
119 &self,
120 t: DisplayFormatType,
121 f: &mut Formatter,
122 ) -> std::fmt::Result;
123}
124
125#[derive(Debug)]
133pub struct FileStreamProvider {
134 location: PathBuf,
135 encoding: StreamEncoding,
136 pub schema: SchemaRef,
138 header: bool,
139 batch_size: usize,
140}
141
142impl FileStreamProvider {
143 pub fn new_file(schema: SchemaRef, location: PathBuf) -> Self {
151 Self {
152 schema,
153 location,
154 batch_size: 1024,
155 encoding: StreamEncoding::Csv,
156 header: false,
157 }
158 }
159
160 pub fn with_batch_size(mut self, batch_size: usize) -> Self {
162 self.batch_size = batch_size;
163 self
164 }
165
166 pub fn with_header(mut self, header: bool) -> Self {
168 self.header = header;
169 self
170 }
171
172 pub fn with_encoding(mut self, encoding: StreamEncoding) -> Self {
174 self.encoding = encoding;
175 self
176 }
177}
178
179impl StreamProvider for FileStreamProvider {
180 fn schema(&self) -> &SchemaRef {
181 &self.schema
182 }
183
184 fn reader(&self) -> Result<Box<dyn RecordBatchReader>> {
185 let file = File::open(&self.location)?;
186 let schema = Arc::clone(&self.schema);
187 match &self.encoding {
188 StreamEncoding::Csv => {
189 let reader = arrow::csv::ReaderBuilder::new(schema)
190 .with_header(self.header)
191 .with_batch_size(self.batch_size)
192 .build(file)?;
193
194 Ok(Box::new(reader))
195 }
196 StreamEncoding::Json => {
197 let reader = arrow::json::ReaderBuilder::new(schema)
198 .with_batch_size(self.batch_size)
199 .build(BufReader::new(file))?;
200
201 Ok(Box::new(reader))
202 }
203 }
204 }
205
206 fn writer(&self) -> Result<Box<dyn RecordBatchWriter>> {
207 match &self.encoding {
208 StreamEncoding::Csv => {
209 let header = self.header && !self.location.exists();
210 let file = OpenOptions::new()
211 .create(true)
212 .append(true)
213 .open(&self.location)?;
214 let writer = arrow::csv::WriterBuilder::new()
215 .with_header(header)
216 .build(file);
217
218 Ok(Box::new(writer))
219 }
220 StreamEncoding::Json => {
221 let file = OpenOptions::new()
222 .create(true)
223 .append(true)
224 .open(&self.location)?;
225 Ok(Box::new(arrow::json::LineDelimitedWriter::new(file)))
226 }
227 }
228 }
229
230 fn stream_write_display(
231 &self,
232 _t: DisplayFormatType,
233 f: &mut Formatter,
234 ) -> std::fmt::Result {
235 f.debug_struct("StreamWrite")
236 .field("location", &self.location)
237 .field("batch_size", &self.batch_size)
238 .field("encoding", &self.encoding)
239 .field("header", &self.header)
240 .finish_non_exhaustive()
241 }
242}
243
244#[derive(Debug)]
246pub struct StreamConfig {
247 source: Arc<dyn StreamProvider>,
248 order: Vec<Vec<SortExpr>>,
249 constraints: Constraints,
250}
251
252impl StreamConfig {
253 pub fn new(source: Arc<dyn StreamProvider>) -> Self {
255 Self {
256 source,
257 order: vec![],
258 constraints: Constraints::default(),
259 }
260 }
261
262 pub fn with_order(mut self, order: Vec<Vec<SortExpr>>) -> Self {
264 self.order = order;
265 self
266 }
267
268 pub fn with_constraints(mut self, constraints: Constraints) -> Self {
270 self.constraints = constraints;
271 self
272 }
273
274 fn reader(&self) -> Result<Box<dyn RecordBatchReader>> {
275 self.source.reader()
276 }
277
278 fn writer(&self) -> Result<Box<dyn RecordBatchWriter>> {
279 self.source.writer()
280 }
281}
282
283#[derive(Debug)]
294pub struct StreamTable(Arc<StreamConfig>);
295
296impl StreamTable {
297 pub fn new(config: Arc<StreamConfig>) -> Self {
299 Self(config)
300 }
301}
302
303#[async_trait]
304impl TableProvider for StreamTable {
305 fn schema(&self) -> SchemaRef {
306 Arc::clone(self.0.source.schema())
307 }
308
309 fn constraints(&self) -> Option<&Constraints> {
310 Some(&self.0.constraints)
311 }
312
313 fn table_type(&self) -> TableType {
314 TableType::Base
315 }
316
317 async fn scan(
318 &self,
319 state: &dyn Session,
320 projection: Option<&Vec<usize>>,
321 _filters: &[Expr],
322 limit: Option<usize>,
323 ) -> Result<Arc<dyn ExecutionPlan>> {
324 let projected_schema = match projection {
325 Some(p) => {
326 let projected = Arc::new(self.0.source.schema().project(p)?);
327 create_lex_ordering(&projected, &self.0.order, state.execution_props())?
328 }
329 None => create_lex_ordering(
330 self.0.source.schema(),
331 &self.0.order,
332 state.execution_props(),
333 )?,
334 };
335
336 Ok(Arc::new(StreamingTableExec::try_new(
337 Arc::clone(self.0.source.schema()),
338 vec![Arc::new(StreamRead(Arc::clone(&self.0))) as _],
339 projection,
340 projected_schema,
341 true,
342 limit,
343 )?))
344 }
345
346 async fn insert_into(
347 &self,
348 _state: &dyn Session,
349 input: Arc<dyn ExecutionPlan>,
350 _insert_op: InsertOp,
351 ) -> Result<Arc<dyn ExecutionPlan>> {
352 let schema = self.0.source.schema();
353 let orders =
354 create_lex_ordering(schema, &self.0.order, _state.execution_props())?;
355 let ordering = orders.into_iter().next().map(Into::into);
357
358 Ok(Arc::new(DataSinkExec::new(
359 input,
360 Arc::new(StreamWrite(Arc::clone(&self.0))),
361 ordering,
362 )))
363 }
364}
365
366#[derive(Debug)]
367struct StreamRead(Arc<StreamConfig>);
368
369impl PartitionStream for StreamRead {
370 fn schema(&self) -> &SchemaRef {
371 self.0.source.schema()
372 }
373
374 fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
375 let config = Arc::clone(&self.0);
376 let schema = Arc::clone(self.0.source.schema());
377 let mut builder = RecordBatchReceiverStreamBuilder::new(schema, 2);
378 let tx = builder.tx();
379 builder.spawn_blocking(move || {
380 let reader = config.reader()?;
381 for b in reader {
382 if tx.blocking_send(b.map_err(Into::into)).is_err() {
383 break;
384 }
385 }
386 Ok(())
387 });
388 builder.build()
389 }
390}
391
392#[derive(Debug)]
393struct StreamWrite(Arc<StreamConfig>);
394
395impl DisplayAs for StreamWrite {
396 fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
397 self.0.source.stream_write_display(t, f)
398 }
399}
400
401#[async_trait]
402impl DataSink for StreamWrite {
403 fn schema(&self) -> &SchemaRef {
404 self.0.source.schema()
405 }
406
407 async fn write_all(
408 &self,
409 mut data: SendableRecordBatchStream,
410 _context: &Arc<TaskContext>,
411 ) -> Result<u64> {
412 let config = Arc::clone(&self.0);
413 let (sender, mut receiver) = tokio::sync::mpsc::channel::<RecordBatch>(2);
414 let write_task = SpawnedTask::spawn_blocking(move || {
416 let mut count = 0_u64;
417 let mut writer = config.writer()?;
418 while let Some(batch) = receiver.blocking_recv() {
419 count += batch.num_rows() as u64;
420 writer.write(&batch)?;
421 }
422 Ok(count)
423 });
424
425 while let Some(b) = data.next().await.transpose()? {
426 if sender.send(b).await.is_err() {
427 break;
428 }
429 }
430 drop(sender);
431 write_task
432 .join_unwind()
433 .await
434 .map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))?
435 }
436}