Skip to main content

datafusion_catalog/
stream.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! TableProvider for stream sources, such as FIFO files
19
20use std::fmt::Formatter;
21use std::fs::{File, OpenOptions};
22use std::io::BufReader;
23use std::path::PathBuf;
24use std::str::FromStr;
25use std::sync::Arc;
26
27use crate::{Session, TableProvider, TableProviderFactory};
28use arrow::array::{RecordBatch, RecordBatchReader, RecordBatchWriter};
29use arrow::datatypes::SchemaRef;
30use datafusion_common::{Constraints, DataFusionError, Result, config_err, plan_err};
31use datafusion_common_runtime::SpawnedTask;
32use datafusion_datasource::sink::{DataSink, DataSinkExec};
33use datafusion_execution::{SendableRecordBatchStream, TaskContext};
34use datafusion_expr::dml::InsertOp;
35use datafusion_expr::{CreateExternalTable, Expr, SortExpr, TableType};
36use datafusion_physical_expr::create_lex_ordering;
37use datafusion_physical_plan::stream::RecordBatchReceiverStreamBuilder;
38use datafusion_physical_plan::streaming::{PartitionStream, StreamingTableExec};
39use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan};
40
41use async_trait::async_trait;
42use futures::StreamExt;
43
44/// A [`TableProviderFactory`] for [`StreamTable`]
45#[derive(Debug, Default)]
46pub struct StreamTableFactory {}
47
48#[async_trait]
49impl TableProviderFactory for StreamTableFactory {
50    async fn create(
51        &self,
52        state: &dyn Session,
53        cmd: &CreateExternalTable,
54    ) -> Result<Arc<dyn TableProvider>> {
55        let schema: SchemaRef = Arc::clone(cmd.schema.inner());
56        let location = cmd.location.clone();
57        let encoding = cmd.file_type.parse()?;
58        let header = if let Ok(opt) = cmd
59            .options
60            .get("format.has_header")
61            .map(|has_header| bool::from_str(has_header.to_lowercase().as_str()))
62            .transpose()
63        {
64            opt.unwrap_or(false)
65        } else {
66            return config_err!(
67                "Valid values for format.has_header option are 'true' or 'false'"
68            );
69        };
70
71        let source = FileStreamProvider::new_file(schema, location.into())
72            .with_encoding(encoding)
73            .with_batch_size(state.config().batch_size())
74            .with_header(header);
75
76        let config = StreamConfig::new(Arc::new(source))
77            .with_order(cmd.order_exprs.clone())
78            .with_constraints(cmd.constraints.clone());
79
80        Ok(Arc::new(StreamTable(Arc::new(config))))
81    }
82}
83
84/// The data encoding for [`StreamTable`]
85#[derive(Debug, Clone)]
86pub enum StreamEncoding {
87    /// CSV records
88    Csv,
89    /// Newline-delimited JSON records
90    Json,
91}
92
93impl FromStr for StreamEncoding {
94    type Err = DataFusionError;
95
96    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
97        match s.to_ascii_lowercase().as_str() {
98            "csv" => Ok(Self::Csv),
99            "json" => Ok(Self::Json),
100            _ => plan_err!("Unrecognized StreamEncoding {}", s),
101        }
102    }
103}
104
105/// The StreamProvider trait is used as a generic interface for reading and writing from streaming
106/// data sources (such as FIFO, Websocket, Kafka, etc.).  Implementations of the provider are
107/// responsible for providing a `RecordBatchReader` and optionally a `RecordBatchWriter`.
108pub trait StreamProvider: std::fmt::Debug + Send + Sync {
109    /// Get a reference to the schema for this stream
110    fn schema(&self) -> &SchemaRef;
111    /// Provide `RecordBatchReader`
112    fn reader(&self) -> Result<Box<dyn RecordBatchReader>>;
113    /// Provide `RecordBatchWriter`
114    fn writer(&self) -> Result<Box<dyn RecordBatchWriter>> {
115        unimplemented!()
116    }
117    /// Display implementation when using as a DataSink
118    fn stream_write_display(
119        &self,
120        t: DisplayFormatType,
121        f: &mut Formatter,
122    ) -> std::fmt::Result;
123}
124
125/// Stream data from the file at `location`
126///
127/// * Data will be read sequentially from the provided `location`
128/// * New data will be appended to the end of the file
129///
130/// The encoding can be configured with [`Self::with_encoding`] and
131/// defaults to [`StreamEncoding::Csv`]
132#[derive(Debug)]
133pub struct FileStreamProvider {
134    location: PathBuf,
135    encoding: StreamEncoding,
136    /// Get a reference to the schema for this file stream
137    pub schema: SchemaRef,
138    header: bool,
139    batch_size: usize,
140}
141
142impl FileStreamProvider {
143    /// Stream data from the file at `location`
144    ///
145    /// * Data will be read sequentially from the provided `location`
146    /// * New data will be appended to the end of the file
147    ///
148    /// The encoding can be configured with [`Self::with_encoding`] and
149    /// defaults to [`StreamEncoding::Csv`]
150    pub fn new_file(schema: SchemaRef, location: PathBuf) -> Self {
151        Self {
152            schema,
153            location,
154            batch_size: 1024,
155            encoding: StreamEncoding::Csv,
156            header: false,
157        }
158    }
159
160    /// Set the batch size (the number of rows to load at one time)
161    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
162        self.batch_size = batch_size;
163        self
164    }
165
166    /// Specify whether the file has a header (only applicable for [`StreamEncoding::Csv`])
167    pub fn with_header(mut self, header: bool) -> Self {
168        self.header = header;
169        self
170    }
171
172    /// Specify an encoding for the stream
173    pub fn with_encoding(mut self, encoding: StreamEncoding) -> Self {
174        self.encoding = encoding;
175        self
176    }
177}
178
179impl StreamProvider for FileStreamProvider {
180    fn schema(&self) -> &SchemaRef {
181        &self.schema
182    }
183
184    fn reader(&self) -> Result<Box<dyn RecordBatchReader>> {
185        let file = File::open(&self.location)?;
186        let schema = Arc::clone(&self.schema);
187        match &self.encoding {
188            StreamEncoding::Csv => {
189                let reader = arrow::csv::ReaderBuilder::new(schema)
190                    .with_header(self.header)
191                    .with_batch_size(self.batch_size)
192                    .build(file)?;
193
194                Ok(Box::new(reader))
195            }
196            StreamEncoding::Json => {
197                let reader = arrow::json::ReaderBuilder::new(schema)
198                    .with_batch_size(self.batch_size)
199                    .build(BufReader::new(file))?;
200
201                Ok(Box::new(reader))
202            }
203        }
204    }
205
206    fn writer(&self) -> Result<Box<dyn RecordBatchWriter>> {
207        match &self.encoding {
208            StreamEncoding::Csv => {
209                let header = self.header && !self.location.exists();
210                let file = OpenOptions::new()
211                    .create(true)
212                    .append(true)
213                    .open(&self.location)?;
214                let writer = arrow::csv::WriterBuilder::new()
215                    .with_header(header)
216                    .build(file);
217
218                Ok(Box::new(writer))
219            }
220            StreamEncoding::Json => {
221                let file = OpenOptions::new()
222                    .create(true)
223                    .append(true)
224                    .open(&self.location)?;
225                Ok(Box::new(arrow::json::LineDelimitedWriter::new(file)))
226            }
227        }
228    }
229
230    fn stream_write_display(
231        &self,
232        _t: DisplayFormatType,
233        f: &mut Formatter,
234    ) -> std::fmt::Result {
235        f.debug_struct("StreamWrite")
236            .field("location", &self.location)
237            .field("batch_size", &self.batch_size)
238            .field("encoding", &self.encoding)
239            .field("header", &self.header)
240            .finish_non_exhaustive()
241    }
242}
243
244/// The configuration for a [`StreamTable`]
245#[derive(Debug)]
246pub struct StreamConfig {
247    source: Arc<dyn StreamProvider>,
248    order: Vec<Vec<SortExpr>>,
249    constraints: Constraints,
250}
251
252impl StreamConfig {
253    /// Create a new `StreamConfig` from a `StreamProvider`
254    pub fn new(source: Arc<dyn StreamProvider>) -> Self {
255        Self {
256            source,
257            order: vec![],
258            constraints: Constraints::default(),
259        }
260    }
261
262    /// Specify a sort order for the stream
263    pub fn with_order(mut self, order: Vec<Vec<SortExpr>>) -> Self {
264        self.order = order;
265        self
266    }
267
268    /// Assign constraints
269    pub fn with_constraints(mut self, constraints: Constraints) -> Self {
270        self.constraints = constraints;
271        self
272    }
273
274    fn reader(&self) -> Result<Box<dyn RecordBatchReader>> {
275        self.source.reader()
276    }
277
278    fn writer(&self) -> Result<Box<dyn RecordBatchWriter>> {
279        self.source.writer()
280    }
281}
282
283/// A [`TableProvider`] for an unbounded stream source
284///
285/// Currently only reading from / appending to a single file in-place is supported, but
286/// other stream sources and sinks may be added in future.
287///
288/// Applications looking to read/write datasets comprising multiple files, e.g. [Hadoop]-style
289/// data stored in object storage, should instead consider [`ListingTable`].
290///
291/// [Hadoop]: https://hadoop.apache.org/
292/// [`ListingTable`]: https://docs.rs/datafusion/latest/datafusion/datasource/listing/struct.ListingTable.html
293#[derive(Debug)]
294pub struct StreamTable(Arc<StreamConfig>);
295
296impl StreamTable {
297    /// Create a new [`StreamTable`] for the given [`StreamConfig`]
298    pub fn new(config: Arc<StreamConfig>) -> Self {
299        Self(config)
300    }
301}
302
303#[async_trait]
304impl TableProvider for StreamTable {
305    fn schema(&self) -> SchemaRef {
306        Arc::clone(self.0.source.schema())
307    }
308
309    fn constraints(&self) -> Option<&Constraints> {
310        Some(&self.0.constraints)
311    }
312
313    fn table_type(&self) -> TableType {
314        TableType::Base
315    }
316
317    async fn scan(
318        &self,
319        state: &dyn Session,
320        projection: Option<&Vec<usize>>,
321        _filters: &[Expr],
322        limit: Option<usize>,
323    ) -> Result<Arc<dyn ExecutionPlan>> {
324        let projected_schema = match projection {
325            Some(p) => {
326                let projected = Arc::new(self.0.source.schema().project(p)?);
327                create_lex_ordering(&projected, &self.0.order, state.execution_props())?
328            }
329            None => create_lex_ordering(
330                self.0.source.schema(),
331                &self.0.order,
332                state.execution_props(),
333            )?,
334        };
335
336        Ok(Arc::new(StreamingTableExec::try_new(
337            Arc::clone(self.0.source.schema()),
338            vec![Arc::new(StreamRead(Arc::clone(&self.0))) as _],
339            projection,
340            projected_schema,
341            true,
342            limit,
343        )?))
344    }
345
346    async fn insert_into(
347        &self,
348        _state: &dyn Session,
349        input: Arc<dyn ExecutionPlan>,
350        _insert_op: InsertOp,
351    ) -> Result<Arc<dyn ExecutionPlan>> {
352        let schema = self.0.source.schema();
353        let orders =
354            create_lex_ordering(schema, &self.0.order, _state.execution_props())?;
355        // It is sufficient to pass only one of the equivalent orderings:
356        let ordering = orders.into_iter().next().map(Into::into);
357
358        Ok(Arc::new(DataSinkExec::new(
359            input,
360            Arc::new(StreamWrite(Arc::clone(&self.0))),
361            ordering,
362        )))
363    }
364}
365
366#[derive(Debug)]
367struct StreamRead(Arc<StreamConfig>);
368
369impl PartitionStream for StreamRead {
370    fn schema(&self) -> &SchemaRef {
371        self.0.source.schema()
372    }
373
374    fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
375        let config = Arc::clone(&self.0);
376        let schema = Arc::clone(self.0.source.schema());
377        let mut builder = RecordBatchReceiverStreamBuilder::new(schema, 2);
378        let tx = builder.tx();
379        builder.spawn_blocking(move || {
380            let reader = config.reader()?;
381            for b in reader {
382                if tx.blocking_send(b.map_err(Into::into)).is_err() {
383                    break;
384                }
385            }
386            Ok(())
387        });
388        builder.build()
389    }
390}
391
392#[derive(Debug)]
393struct StreamWrite(Arc<StreamConfig>);
394
395impl DisplayAs for StreamWrite {
396    fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
397        self.0.source.stream_write_display(t, f)
398    }
399}
400
401#[async_trait]
402impl DataSink for StreamWrite {
403    fn schema(&self) -> &SchemaRef {
404        self.0.source.schema()
405    }
406
407    async fn write_all(
408        &self,
409        mut data: SendableRecordBatchStream,
410        _context: &Arc<TaskContext>,
411    ) -> Result<u64> {
412        let config = Arc::clone(&self.0);
413        let (sender, mut receiver) = tokio::sync::mpsc::channel::<RecordBatch>(2);
414        // Note: FIFO Files support poll so this could use AsyncFd
415        let write_task = SpawnedTask::spawn_blocking(move || {
416            let mut count = 0_u64;
417            let mut writer = config.writer()?;
418            while let Some(batch) = receiver.blocking_recv() {
419                count += batch.num_rows() as u64;
420                writer.write(&batch)?;
421            }
422            Ok(count)
423        });
424
425        while let Some(b) = data.next().await.transpose()? {
426            if sender.send(b).await.is_err() {
427                break;
428            }
429        }
430        drop(sender);
431        write_task
432            .join_unwind()
433            .await
434            .map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))?
435    }
436}