duckdb/
arrow_scan.rs

1use std::ffi::CString;
2
3use arrow::ffi_stream::FFI_ArrowArrayStream;
4
5use crate::{error::error_from_duckdb_code, ffi, Connection, Error, Result};
6
7impl Connection {
8    /// Registers a temporary view in DuckDB based on an Arrow stream.
9    ///
10    /// Note the underlying `duckdb_arrow_scan` C API is marked for deprecation.
11    /// However, similar functionality will be preserved in a new yet-to-be-determined API.
12    ///
13    /// # Arguments
14    ///
15    /// * `view_name`: The name of the view to register
16    /// * `arrow_scan`: The Arrow stream to register
17    pub fn register_arrow_scan_view(&self, view_name: &str, arrow_scan: &FFI_ArrowArrayStream) -> Result<()> {
18        let conn = self.db.borrow_mut().con;
19        let c_str = CString::new(view_name).map_err(Error::NulError)?;
20        let transmuted_arrow_scan = arrow_scan as *const _ as ffi::duckdb_arrow_stream;
21        let r = unsafe { ffi::duckdb_arrow_scan(conn, c_str.as_ptr(), transmuted_arrow_scan) };
22        if r != ffi::DuckDBSuccess {
23            return error_from_duckdb_code(r, Some("duckdb_arrow_scan failed to register view".to_string()));
24        }
25        Ok(())
26    }
27}
28
29#[cfg(test)]
30mod tests {
31    use super::*;
32    use arrow::{
33        array::{Int32Array, StringArray},
34        datatypes::{DataType, Field, Schema, SchemaRef},
35        error::ArrowError,
36        record_batch::RecordBatch,
37    };
38    use std::sync::Arc;
39
40    /// A simple RecordBatchReader implementation for testing
41    struct TestRecordBatchReader {
42        schema: SchemaRef,
43        batches: Vec<RecordBatch>,
44        index: usize,
45    }
46
47    impl TestRecordBatchReader {
48        fn new(batches: Vec<RecordBatch>) -> Self {
49            // All batches should have the same schema, so we can use the first one
50            let schema = batches[0].schema();
51            TestRecordBatchReader {
52                schema,
53                batches,
54                index: 0,
55            }
56        }
57    }
58
59    impl Iterator for TestRecordBatchReader {
60        type Item = std::result::Result<RecordBatch, ArrowError>;
61
62        fn next(&mut self) -> Option<Self::Item> {
63            if self.index < self.batches.len() {
64                let batch = self.batches[self.index].clone();
65                self.index += 1;
66                Some(Ok(batch))
67            } else {
68                None
69            }
70        }
71    }
72
73    impl arrow::record_batch::RecordBatchReader for TestRecordBatchReader {
74        fn schema(&self) -> SchemaRef {
75            self.schema.clone()
76        }
77    }
78
79    #[test]
80    fn test_register_arrow_scan_view() -> Result<()> {
81        // Create a test database connection
82        let db = Connection::open_in_memory()?;
83
84        // Create Arrow arrays for test data
85        let id_array = Int32Array::from(vec![1, 2, 3, 4, 5]);
86        let name_array = StringArray::from(vec!["Alice", "Bob", "Charlie", "Dave", "Eve"]);
87
88        // Create a schema and record batch
89        let schema = Arc::new(Schema::new(vec![
90            Field::new("id", DataType::Int32, false),
91            Field::new("name", DataType::Utf8, false),
92        ]));
93
94        let record_batch = RecordBatch::try_new(schema, vec![Arc::new(id_array), Arc::new(name_array)])
95            .expect("Failed to create record batch");
96
97        // Create a RecordBatchReader
98        let reader = TestRecordBatchReader::new(vec![record_batch]);
99
100        // Convert to FFI_ArrowArrayStream - this needs to live longer than any queries to the view
101        let stream = arrow::ffi_stream::FFI_ArrowArrayStream::new(
102            Box::new(reader) as Box<dyn arrow::record_batch::RecordBatchReader + Send>
103        );
104
105        // Register the view
106        db.register_arrow_scan_view("test_view", &stream)?;
107
108        // Query the view to verify it works
109        let rows = db
110            .prepare("SELECT id, name FROM test_view ORDER BY id")?
111            .query_map([], |row| Ok((row.get::<_, i32>(0)?, row.get::<_, String>(1)?)))?
112            .collect::<Result<Vec<_>>>()?;
113
114        // Verify results
115        assert_eq!(rows.len(), 5);
116        assert_eq!(rows[0], (1, "Alice".to_string()));
117        assert_eq!(rows[1], (2, "Bob".to_string()));
118        assert_eq!(rows[2], (3, "Charlie".to_string()));
119        assert_eq!(rows[3], (4, "Dave".to_string()));
120        assert_eq!(rows[4], (5, "Eve".to_string()));
121
122        Ok(())
123    }
124
125    #[test]
126    fn test_register_arrow_scan_view_with_nulls() -> Result<()> {
127        // Create a test database connection
128        let db = Connection::open_in_memory()?;
129
130        // Create Arrow arrays with null values
131        let id_array = Int32Array::from(vec![Some(1), Some(2), None, Some(4), Some(5)]);
132        let name_array = StringArray::from(vec![Some("Alice"), None, Some("Charlie"), Some("Dave"), Some("Eve")]);
133
134        // Create a schema and record batch
135        let schema = Arc::new(Schema::new(vec![
136            Field::new("id", DataType::Int32, true),
137            Field::new("name", DataType::Utf8, true),
138        ]));
139
140        let record_batch = RecordBatch::try_new(schema, vec![Arc::new(id_array), Arc::new(name_array)])
141            .expect("Failed to create record batch");
142
143        // Create a RecordBatchReader
144        let reader = TestRecordBatchReader::new(vec![record_batch]);
145
146        // Convert to FFI_ArrowArrayStream
147        let stream = arrow::ffi_stream::FFI_ArrowArrayStream::new(
148            Box::new(reader) as Box<dyn arrow::record_batch::RecordBatchReader + Send>
149        );
150
151        // Register the view
152        db.register_arrow_scan_view("test_view_nulls", &stream)?;
153
154        // Query the view to verify it works, including handling of nulls
155        let rows = db
156            .prepare("SELECT id, name FROM test_view_nulls ORDER BY id NULLS LAST")?
157            .query_map([], |row| {
158                let id: Option<i32> = row.get(0)?;
159                let name: Option<String> = row.get(1)?;
160                Ok((id, name))
161            })?
162            .collect::<Result<Vec<_>>>()?;
163
164        // Verify results
165        assert_eq!(rows.len(), 5);
166        assert_eq!(rows[0], (Some(1), Some("Alice".to_string())));
167        assert_eq!(rows[1], (Some(2), None));
168        assert_eq!(rows[2], (Some(4), Some("Dave".to_string())));
169        assert_eq!(rows[3], (Some(5), Some("Eve".to_string())));
170        assert_eq!(rows[4], (None, Some("Charlie".to_string())));
171
172        Ok(())
173    }
174
175    #[test]
176    fn test_register_arrow_scan_view_multiple_batches() -> Result<()> {
177        // Create a test database connection
178        let db = Connection::open_in_memory()?;
179
180        // Create schema
181        let schema = Arc::new(Schema::new(vec![
182            Field::new("id", DataType::Int32, false),
183            Field::new("name", DataType::Utf8, false),
184        ]));
185
186        // Create first batch
187        let batch1 = RecordBatch::try_new(
188            schema.clone(),
189            vec![
190                Arc::new(Int32Array::from(vec![1, 2, 3])),
191                Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie"])),
192            ],
193        )
194        .expect("Failed to create record batch");
195
196        // Create second batch
197        let batch2 = RecordBatch::try_new(
198            schema.clone(),
199            vec![
200                Arc::new(Int32Array::from(vec![4, 5])),
201                Arc::new(StringArray::from(vec!["Dave", "Eve"])),
202            ],
203        )
204        .expect("Failed to create record batch");
205
206        // Create a RecordBatchReader with multiple batches
207        let reader = TestRecordBatchReader::new(vec![batch1, batch2]);
208
209        // Convert to FFI_ArrowArrayStream
210        let stream = arrow::ffi_stream::FFI_ArrowArrayStream::new(
211            Box::new(reader) as Box<dyn arrow::record_batch::RecordBatchReader + Send>
212        );
213
214        // Register the view
215        db.register_arrow_scan_view("test_view_multi", &stream)?;
216
217        // Query all data to verify correct ordering
218        let rows = db
219            .prepare("SELECT id, name FROM test_view_multi ORDER BY id")?
220            .query_map([], |row| Ok((row.get::<_, i32>(0)?, row.get::<_, String>(1)?)))?
221            .collect::<Result<Vec<_>>>()?;
222
223        // Verify results
224        assert_eq!(rows.len(), 5);
225        assert_eq!(rows[0], (1, "Alice".to_string()));
226        assert_eq!(rows[1], (2, "Bob".to_string()));
227        assert_eq!(rows[2], (3, "Charlie".to_string()));
228        assert_eq!(rows[3], (4, "Dave".to_string()));
229        assert_eq!(rows[4], (5, "Eve".to_string()));
230
231        Ok(())
232    }
233}