1use std::ffi::CString;
2
3use arrow::ffi_stream::FFI_ArrowArrayStream;
4
5use crate::{error::error_from_duckdb_code, ffi, Connection, Error, Result};
6
7impl Connection {
8 pub fn register_arrow_scan_view(&self, view_name: &str, arrow_scan: &FFI_ArrowArrayStream) -> Result<()> {
18 let conn = self.db.borrow_mut().con;
19 let c_str = CString::new(view_name).map_err(Error::NulError)?;
20 let transmuted_arrow_scan = arrow_scan as *const _ as ffi::duckdb_arrow_stream;
21 let r = unsafe { ffi::duckdb_arrow_scan(conn, c_str.as_ptr(), transmuted_arrow_scan) };
22 if r != ffi::DuckDBSuccess {
23 return error_from_duckdb_code(r, Some("duckdb_arrow_scan failed to register view".to_string()));
24 }
25 Ok(())
26 }
27}
28
29#[cfg(test)]
30mod tests {
31 use super::*;
32 use arrow::{
33 array::{Int32Array, StringArray},
34 datatypes::{DataType, Field, Schema, SchemaRef},
35 error::ArrowError,
36 record_batch::RecordBatch,
37 };
38 use std::sync::Arc;
39
40 struct TestRecordBatchReader {
42 schema: SchemaRef,
43 batches: Vec<RecordBatch>,
44 index: usize,
45 }
46
47 impl TestRecordBatchReader {
48 fn new(batches: Vec<RecordBatch>) -> Self {
49 let schema = batches[0].schema();
51 TestRecordBatchReader {
52 schema,
53 batches,
54 index: 0,
55 }
56 }
57 }
58
59 impl Iterator for TestRecordBatchReader {
60 type Item = std::result::Result<RecordBatch, ArrowError>;
61
62 fn next(&mut self) -> Option<Self::Item> {
63 if self.index < self.batches.len() {
64 let batch = self.batches[self.index].clone();
65 self.index += 1;
66 Some(Ok(batch))
67 } else {
68 None
69 }
70 }
71 }
72
73 impl arrow::record_batch::RecordBatchReader for TestRecordBatchReader {
74 fn schema(&self) -> SchemaRef {
75 self.schema.clone()
76 }
77 }
78
79 #[test]
80 fn test_register_arrow_scan_view() -> Result<()> {
81 let db = Connection::open_in_memory()?;
83
84 let id_array = Int32Array::from(vec![1, 2, 3, 4, 5]);
86 let name_array = StringArray::from(vec!["Alice", "Bob", "Charlie", "Dave", "Eve"]);
87
88 let schema = Arc::new(Schema::new(vec![
90 Field::new("id", DataType::Int32, false),
91 Field::new("name", DataType::Utf8, false),
92 ]));
93
94 let record_batch = RecordBatch::try_new(schema, vec![Arc::new(id_array), Arc::new(name_array)])
95 .expect("Failed to create record batch");
96
97 let reader = TestRecordBatchReader::new(vec![record_batch]);
99
100 let stream = arrow::ffi_stream::FFI_ArrowArrayStream::new(
102 Box::new(reader) as Box<dyn arrow::record_batch::RecordBatchReader + Send>
103 );
104
105 db.register_arrow_scan_view("test_view", &stream)?;
107
108 let rows = db
110 .prepare("SELECT id, name FROM test_view ORDER BY id")?
111 .query_map([], |row| Ok((row.get::<_, i32>(0)?, row.get::<_, String>(1)?)))?
112 .collect::<Result<Vec<_>>>()?;
113
114 assert_eq!(rows.len(), 5);
116 assert_eq!(rows[0], (1, "Alice".to_string()));
117 assert_eq!(rows[1], (2, "Bob".to_string()));
118 assert_eq!(rows[2], (3, "Charlie".to_string()));
119 assert_eq!(rows[3], (4, "Dave".to_string()));
120 assert_eq!(rows[4], (5, "Eve".to_string()));
121
122 Ok(())
123 }
124
125 #[test]
126 fn test_register_arrow_scan_view_with_nulls() -> Result<()> {
127 let db = Connection::open_in_memory()?;
129
130 let id_array = Int32Array::from(vec![Some(1), Some(2), None, Some(4), Some(5)]);
132 let name_array = StringArray::from(vec![Some("Alice"), None, Some("Charlie"), Some("Dave"), Some("Eve")]);
133
134 let schema = Arc::new(Schema::new(vec![
136 Field::new("id", DataType::Int32, true),
137 Field::new("name", DataType::Utf8, true),
138 ]));
139
140 let record_batch = RecordBatch::try_new(schema, vec![Arc::new(id_array), Arc::new(name_array)])
141 .expect("Failed to create record batch");
142
143 let reader = TestRecordBatchReader::new(vec![record_batch]);
145
146 let stream = arrow::ffi_stream::FFI_ArrowArrayStream::new(
148 Box::new(reader) as Box<dyn arrow::record_batch::RecordBatchReader + Send>
149 );
150
151 db.register_arrow_scan_view("test_view_nulls", &stream)?;
153
154 let rows = db
156 .prepare("SELECT id, name FROM test_view_nulls ORDER BY id NULLS LAST")?
157 .query_map([], |row| {
158 let id: Option<i32> = row.get(0)?;
159 let name: Option<String> = row.get(1)?;
160 Ok((id, name))
161 })?
162 .collect::<Result<Vec<_>>>()?;
163
164 assert_eq!(rows.len(), 5);
166 assert_eq!(rows[0], (Some(1), Some("Alice".to_string())));
167 assert_eq!(rows[1], (Some(2), None));
168 assert_eq!(rows[2], (Some(4), Some("Dave".to_string())));
169 assert_eq!(rows[3], (Some(5), Some("Eve".to_string())));
170 assert_eq!(rows[4], (None, Some("Charlie".to_string())));
171
172 Ok(())
173 }
174
175 #[test]
176 fn test_register_arrow_scan_view_multiple_batches() -> Result<()> {
177 let db = Connection::open_in_memory()?;
179
180 let schema = Arc::new(Schema::new(vec![
182 Field::new("id", DataType::Int32, false),
183 Field::new("name", DataType::Utf8, false),
184 ]));
185
186 let batch1 = RecordBatch::try_new(
188 schema.clone(),
189 vec![
190 Arc::new(Int32Array::from(vec![1, 2, 3])),
191 Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie"])),
192 ],
193 )
194 .expect("Failed to create record batch");
195
196 let batch2 = RecordBatch::try_new(
198 schema.clone(),
199 vec![
200 Arc::new(Int32Array::from(vec![4, 5])),
201 Arc::new(StringArray::from(vec!["Dave", "Eve"])),
202 ],
203 )
204 .expect("Failed to create record batch");
205
206 let reader = TestRecordBatchReader::new(vec![batch1, batch2]);
208
209 let stream = arrow::ffi_stream::FFI_ArrowArrayStream::new(
211 Box::new(reader) as Box<dyn arrow::record_batch::RecordBatchReader + Send>
212 );
213
214 db.register_arrow_scan_view("test_view_multi", &stream)?;
216
217 let rows = db
219 .prepare("SELECT id, name FROM test_view_multi ORDER BY id")?
220 .query_map([], |row| Ok((row.get::<_, i32>(0)?, row.get::<_, String>(1)?)))?
221 .collect::<Result<Vec<_>>>()?;
222
223 assert_eq!(rows.len(), 5);
225 assert_eq!(rows[0], (1, "Alice".to_string()));
226 assert_eq!(rows[1], (2, "Bob".to_string()));
227 assert_eq!(rows[2], (3, "Charlie".to_string()));
228 assert_eq!(rows[3], (4, "Dave".to_string()));
229 assert_eq!(rows[4], (5, "Eve".to_string()));
230
231 Ok(())
232 }
233}