1use anyhow::Result;
2use async_trait::async_trait;
3use duckdb::DuckdbConnectionManager;
4
5#[async_trait]
6pub trait Database: Send + Sync {
7 async fn execute(&self, sql: &str) -> Result<()>;
8 async fn get_json(&self, sql: &str) -> Result<Vec<u8>>;
9 async fn get_arrow(&self, sql: &str) -> Result<Vec<u8>>;
10}
11
12pub struct ConnectionPool {
13 pool: r2d2::Pool<DuckdbConnectionManager>,
14}
15
16impl ConnectionPool {
17 pub fn new(db_path: &str, pool_size: u32) -> Result<Self> {
18 let manager = DuckdbConnectionManager::file(db_path)?;
19 let pool = r2d2::Pool::builder().max_size(pool_size).build(manager)?;
20 Ok(Self { pool })
21 }
22
23 pub fn get(&self) -> Result<r2d2::PooledConnection<DuckdbConnectionManager>> {
24 Ok(self.pool.get()?)
25 }
26}
27
28#[async_trait]
29impl Database for ConnectionPool {
30 async fn execute(&self, sql: &str) -> Result<()> {
31 let conn = self.get()?;
32 conn.execute_batch(sql)?;
33 Ok(())
34 }
35
36 async fn get_json(&self, sql: &str) -> Result<Vec<u8>> {
37 let conn = self.get()?;
38 let mut stmt = conn.prepare(sql)?;
39 let arrow = stmt.query_arrow([])?;
40
41 let buf = Vec::new();
42 let mut writer = arrow::json::ArrayWriter::new(buf);
43 for batch in arrow {
44 writer.write(&batch)?;
45 }
46 writer.finish()?;
47 Ok(writer.into_inner())
48 }
49
50 async fn get_arrow(&self, sql: &str) -> Result<Vec<u8>> {
51 let conn = self.get()?;
52 let mut stmt = conn.prepare(sql)?;
53 let arrow = stmt.query_arrow([])?;
54 let schema = arrow.get_schema();
55
56 let mut buffer: Vec<u8> = Vec::new();
57 {
58 let schema_ref = schema.as_ref();
59 let mut writer = arrow::ipc::writer::FileWriter::try_new(&mut buffer, schema_ref)?;
60
61 for batch in arrow {
62 writer.write(&batch)?;
63 }
64
65 writer.finish()?;
66 }
67
68 Ok(buffer)
69 }
70}