async_duckdb/
client.rs
1use crate::Error;
2use std::{
3 path::{Path, PathBuf},
4 thread,
5};
6
7use crossbeam_channel::{bounded, unbounded, Sender};
8use duckdb::{Config, Connection};
9use futures_channel::oneshot;
10
11#[derive(Default)]
31pub struct ClientBuilder {
32 pub(crate) path: Option<PathBuf>,
33 pub(crate) flagsfn: Option<fn() -> duckdb::Result<Config>>,
34}
35
36impl ClientBuilder {
37 #[must_use]
39 pub fn new() -> Self {
40 Self::default()
41 }
42
43 #[must_use]
47 pub fn path<P: AsRef<Path>>(mut self, path: P) -> Self {
48 self.path = Some(path.as_ref().into());
49 self
50 }
51
52 #[must_use]
56 pub fn flagsfn(mut self, flags: fn() -> duckdb::Result<Config>) -> Self {
57 self.flagsfn = Some(flags);
58 self
59 }
60
61 pub async fn open(self) -> Result<Client, Error> {
73 Client::open_async(self).await
74 }
75
76 pub fn open_blocking(self) -> Result<Client, Error> {
89 Client::open_blocking(self)
90 }
91}
92
93enum Command {
94 Func(Box<dyn FnOnce(&mut Connection) + Send>),
95 Shutdown(Box<dyn FnOnce(Result<(), Error>) + Send>),
96}
97
98#[derive(Clone)]
101pub struct Client {
102 conn_tx: Sender<Command>,
103}
104
105impl Client {
106 async fn open_async(builder: ClientBuilder) -> Result<Self, Error> {
107 let (open_tx, open_rx) = oneshot::channel();
108 Self::open(builder, |res| {
109 _ = open_tx.send(res);
110 });
111 open_rx.await?
112 }
113
114 fn open_blocking(builder: ClientBuilder) -> Result<Self, Error> {
115 let (conn_tx, conn_rx) = bounded(1);
116 Self::open(builder, move |res| {
117 _ = conn_tx.send(res);
118 });
119 conn_rx.recv()?
120 }
121
122 fn open<F>(builder: ClientBuilder, func: F)
123 where
124 F: FnOnce(Result<Self, Error>) + Send + 'static,
125 {
126 thread::spawn(move || {
127 let (conn_tx, conn_rx) = unbounded();
128
129 let mut conn = match Client::create_conn(builder) {
130 Ok(conn) => conn,
131 Err(err) => {
132 func(Err(err));
133 return;
134 }
135 };
136
137 let client = Self { conn_tx };
138 func(Ok(client));
139
140 while let Ok(cmd) = conn_rx.recv() {
141 match cmd {
142 Command::Func(func) => func(&mut conn),
143 Command::Shutdown(func) => match conn.close() {
144 Ok(()) => {
145 func(Ok(()));
146 return;
147 }
148 Err((c, e)) => {
149 conn = c;
150 func(Err(e.into()));
151 }
152 },
153 }
154 }
155 });
156 }
157
158 fn create_conn(mut builder: ClientBuilder) -> Result<Connection, Error> {
159 let path = builder.path.take().unwrap_or_else(|| ":memory:".into());
160 let config = if let Some(flagsfn) = builder.flagsfn {
161 flagsfn()?
162 } else {
163 Config::default()
164 };
165 let conn = Connection::open_with_flags(path, config)?;
166 Ok(conn)
167 }
168
169 pub async fn conn<F, T>(&self, func: F) -> Result<T, Error>
171 where
172 F: FnOnce(&Connection) -> Result<T, duckdb::Error> + Send + 'static,
173 T: Send + 'static,
174 {
175 let (tx, rx) = oneshot::channel();
176 self.conn_tx.send(Command::Func(Box::new(move |conn| {
177 _ = tx.send(func(conn));
178 })))?;
179 Ok(rx.await??)
180 }
181
182 pub async fn conn_mut<F, T>(&self, func: F) -> Result<T, Error>
184 where
185 F: FnOnce(&mut Connection) -> Result<T, duckdb::Error> + Send + 'static,
186 T: Send + 'static,
187 {
188 let (tx, rx) = oneshot::channel();
189 self.conn_tx.send(Command::Func(Box::new(move |conn| {
190 _ = tx.send(func(conn));
191 })))?;
192 Ok(rx.await??)
193 }
194
195 pub async fn close(&self) -> Result<(), Error> {
200 let (tx, rx) = oneshot::channel();
201 let func = Box::new(|res| _ = tx.send(res));
202 if self.conn_tx.send(Command::Shutdown(func)).is_err() {
203 return Ok(());
205 }
206 rx.await.unwrap_or(Ok(()))
208 }
209
210 pub fn conn_blocking<F, T>(&self, func: F) -> Result<T, Error>
213 where
214 F: FnOnce(&Connection) -> Result<T, duckdb::Error> + Send + 'static,
215 T: Send + 'static,
216 {
217 let (tx, rx) = bounded(1);
218 self.conn_tx.send(Command::Func(Box::new(move |conn| {
219 _ = tx.send(func(conn));
220 })))?;
221 Ok(rx.recv()??)
222 }
223
224 pub fn conn_mut_blocking<F, T>(&self, func: F) -> Result<T, Error>
227 where
228 F: FnOnce(&mut Connection) -> Result<T, duckdb::Error> + Send + 'static,
229 T: Send + 'static,
230 {
231 let (tx, rx) = bounded(1);
232 self.conn_tx.send(Command::Func(Box::new(move |conn| {
233 _ = tx.send(func(conn));
234 })))?;
235 Ok(rx.recv()??)
236 }
237
238 pub fn close_blocking(&self) -> Result<(), Error> {
244 let (tx, rx) = bounded(1);
245 let func = Box::new(move |res| _ = tx.send(res));
246 if self.conn_tx.send(Command::Shutdown(func)).is_err() {
247 return Ok(());
248 }
249 rx.recv().unwrap_or(Ok(()))
251 }
252}