duckdb_server/
db.rs

1use anyhow::Result;
2use async_trait::async_trait;
3use duckdb::DuckdbConnectionManager;
4
5#[async_trait]
6pub trait Database: Send + Sync {
7    async fn execute(&self, sql: &str) -> Result<()>;
8    async fn get_json(&self, sql: &str) -> Result<Vec<u8>>;
9    async fn get_arrow(&self, sql: &str) -> Result<Vec<u8>>;
10}
11
12pub struct ConnectionPool {
13    pool: r2d2::Pool<DuckdbConnectionManager>,
14}
15
16impl ConnectionPool {
17    pub fn new(db_path: &str, pool_size: u32) -> Result<Self> {
18        let manager = DuckdbConnectionManager::file(db_path)?;
19        let pool = r2d2::Pool::builder().max_size(pool_size).build(manager)?;
20        Ok(Self { pool })
21    }
22
23    pub fn get(&self) -> Result<r2d2::PooledConnection<DuckdbConnectionManager>> {
24        Ok(self.pool.get()?)
25    }
26}
27
28#[async_trait]
29impl Database for ConnectionPool {
30    async fn execute(&self, sql: &str) -> Result<()> {
31        let conn = self.get()?;
32        conn.execute_batch(sql)?;
33        Ok(())
34    }
35
36    async fn get_json(&self, sql: &str) -> Result<Vec<u8>> {
37        let conn = self.get()?;
38        let mut stmt = conn.prepare(sql)?;
39        let arrow = stmt.query_arrow([])?;
40
41        let buf = Vec::new();
42        let mut writer = arrow::json::ArrayWriter::new(buf);
43        for batch in arrow {
44            writer.write(&batch)?;
45        }
46        writer.finish()?;
47        Ok(writer.into_inner())
48    }
49
50    async fn get_arrow(&self, sql: &str) -> Result<Vec<u8>> {
51        let conn = self.get()?;
52        let mut stmt = conn.prepare(sql)?;
53        let arrow = stmt.query_arrow([])?;
54        let schema = arrow.get_schema();
55
56        let mut buffer: Vec<u8> = Vec::new();
57        {
58            let schema_ref = schema.as_ref();
59            let mut writer = arrow::ipc::writer::FileWriter::try_new(&mut buffer, schema_ref)?;
60
61            for batch in arrow {
62                writer.write(&batch)?;
63            }
64
65            writer.finish()?;
66        }
67
68        Ok(buffer)
69    }
70}