use duckdb::{
Connection, Result,
arrow::util::pretty::print_batches,
core::{DataChunkHandle, Inserter, LogicalTypeHandle, LogicalTypeId},
vtab::{BindInfo, InitInfo, TableFunctionInfo, VTab},
};
use std::{
error::Error,
sync::atomic::{AtomicUsize, Ordering},
};
struct Numbers;
struct NumbersBind {
count: usize,
}
struct NumbersInit {
cursor: AtomicUsize,
}
impl VTab for Numbers {
type BindData = NumbersBind;
type InitData = NumbersInit;
fn bind(bind: &BindInfo) -> Result<Self::BindData, Box<dyn Error>> {
bind.add_result_column("n", LogicalTypeId::Bigint.into());
bind.add_result_column("sqrt_n", LogicalTypeId::Double.into());
bind.add_result_column("label", LogicalTypeId::Varchar.into());
let bigint: LogicalTypeHandle = LogicalTypeId::Bigint.into();
bind.add_result_column("divisors", LogicalTypeHandle::list(&bigint));
let parity = LogicalTypeHandle::struct_type(&[
("is_even", LogicalTypeId::Boolean.into()),
("name", LogicalTypeId::Varchar.into()),
]);
bind.add_result_column("parity", parity);
let count = bind.get_parameter(0).to_int64().max(0) as usize;
Ok(NumbersBind { count })
}
fn init(_: &InitInfo) -> Result<Self::InitData, Box<dyn Error>> {
Ok(NumbersInit {
cursor: AtomicUsize::new(0),
})
}
fn func(func: &TableFunctionInfo<Self>, output: &mut DataChunkHandle) -> Result<(), Box<dyn Error>> {
let count = func.get_bind_data().count;
let cursor = &func.get_init_data().cursor;
let capacity = output.flat_vector(0).capacity();
let start = cursor.fetch_add(capacity, Ordering::Relaxed);
if start >= count {
output.set_len(0); return Ok(());
}
let rows = capacity.min(count - start);
{
let mut v = output.flat_vector(0);
let slice = unsafe { v.as_mut_slice_with_len::<i64>(rows) };
for (i, slot) in slice.iter_mut().enumerate() {
*slot = (start + i) as i64;
}
}
{
let mut v = output.flat_vector(1);
let slice = unsafe { v.as_mut_slice_with_len::<f64>(rows) };
for (i, slot) in slice.iter_mut().enumerate() {
*slot = ((start + i) as f64).sqrt();
}
}
{
let mut v = output.flat_vector(2);
for i in 0..rows {
let n = start + i;
if n % 3 == 0 {
v.set_null(i);
} else {
v.insert(i, format!("number {n}").as_str());
}
}
}
{
let mut list = output.list_vector(3);
let mut children: Vec<i64> = Vec::new();
for i in 0..rows {
let divisors = divisors((start + i) as i64);
list.set_entry(i, children.len(), divisors.len());
children.extend_from_slice(&divisors);
}
unsafe { list.set_child(&children) };
}
{
let parity = output.struct_vector(4);
{
let mut is_even = parity.child(0, rows);
let slice = unsafe { is_even.as_mut_slice_with_len::<bool>(rows) };
for (i, slot) in slice.iter_mut().enumerate() {
*slot = (start + i) % 2 == 0;
}
}
{
let name = parity.child(1, rows);
for i in 0..rows {
name.insert(i, if (start + i) % 2 == 0 { "even" } else { "odd" });
}
}
}
output.set_len(rows);
Ok(())
}
fn parameters() -> Option<Vec<LogicalTypeHandle>> {
Some(vec![LogicalTypeId::Bigint.into()])
}
}
fn divisors(n: i64) -> Vec<i64> {
(1..=n).filter(|d| n % d == 0).collect()
}
fn main() -> Result<()> {
let conn = Connection::open_in_memory()?;
conn.register_table_function::<Numbers>("numbers")?;
let batches: Vec<_> = conn.prepare("SELECT * FROM numbers(6)")?.query_arrow([])?.collect();
print_batches(&batches).unwrap();
let total: i64 = conn.query_row("SELECT count(*) FROM numbers(5000)", [], |row| row.get(0))?;
assert_eq!(total, 5000);
Ok(())
}