async_duckdb/
client.rs

1use crate::Error;
2use std::{
3    path::{Path, PathBuf},
4    thread,
5};
6
7use crossbeam_channel::{bounded, unbounded, Sender};
8use duckdb::{Config, Connection};
9use futures_channel::oneshot;
10
11/// A `ClientBuilder` can be used to create a [`Client`] with custom
12/// configuration.
13///
14/// For more information on creating a duckdb connection, see the
15/// [duckdb docs](duckdb::Connection::open()).
16///
17/// # Examples
18///
19/// ```rust
20/// # use async_duckdb::ClientBuilder;
21/// # async fn run() -> Result<(), async_duckdb::Error> {
22/// let client = ClientBuilder::new().path("path/to/db.duckdb").open().await?;
23///
24/// // ...
25///
26/// client.close().await?;
27/// # Ok(())
28/// # }
29/// ```
30#[derive(Default)]
31pub struct ClientBuilder {
32    pub(crate) path: Option<PathBuf>,
33    pub(crate) flagsfn: Option<fn() -> duckdb::Result<Config>>,
34}
35
36impl ClientBuilder {
37    /// Returns a new [`ClientBuilder`] with the default settings.
38    #[must_use]
39    pub fn new() -> Self {
40        Self::default()
41    }
42
43    /// Specify the path of the duckdb database to open.
44    ///
45    /// By default, an in-memory database is used.
46    #[must_use]
47    pub fn path<P: AsRef<Path>>(mut self, path: P) -> Self {
48        self.path = Some(path.as_ref().into());
49        self
50    }
51
52    /// Specify the [`OpenFlags`] to use when opening a new connection.
53    ///
54    /// By default, [`OpenFlags::default()`] is used.
55    #[must_use]
56    pub fn flagsfn(mut self, flags: fn() -> duckdb::Result<Config>) -> Self {
57        self.flagsfn = Some(flags);
58        self
59    }
60
61    /// Returns a new [`Client`] that uses the `ClientBuilder` configuration.
62    ///
63    /// # Examples
64    ///
65    /// ```rust
66    /// # use async_duckdb::ClientBuilder;
67    /// # async fn run() -> Result<(), async_duckdb::Error> {
68    /// let client = ClientBuilder::new().open().await?;
69    /// # Ok(())
70    /// # }
71    /// ```
72    pub async fn open(self) -> Result<Client, Error> {
73        Client::open_async(self).await
74    }
75
76    /// Returns a new [`Client`] that uses the `ClientBuilder` configuration,
77    /// blocking the current thread.
78    ///
79    /// # Examples
80    ///
81    /// ```rust
82    /// # use async_duckdb::ClientBuilder;
83    /// # fn run() -> Result<(), async_duckdb::Error> {
84    /// let client = ClientBuilder::new().open_blocking()?;
85    /// # Ok(())
86    /// # }
87    /// ```
88    pub fn open_blocking(self) -> Result<Client, Error> {
89        Client::open_blocking(self)
90    }
91}
92
93enum Command {
94    Func(Box<dyn FnOnce(&mut Connection) + Send>),
95    Shutdown(Box<dyn FnOnce(Result<(), Error>) + Send>),
96}
97
98/// Client represents a single duckdb connection that can be used from async
99/// contexts.
100#[derive(Clone)]
101pub struct Client {
102    conn_tx: Sender<Command>,
103}
104
105impl Client {
106    async fn open_async(builder: ClientBuilder) -> Result<Self, Error> {
107        let (open_tx, open_rx) = oneshot::channel();
108        Self::open(builder, |res| {
109            _ = open_tx.send(res);
110        });
111        open_rx.await?
112    }
113
114    fn open_blocking(builder: ClientBuilder) -> Result<Self, Error> {
115        let (conn_tx, conn_rx) = bounded(1);
116        Self::open(builder, move |res| {
117            _ = conn_tx.send(res);
118        });
119        conn_rx.recv()?
120    }
121
122    fn open<F>(builder: ClientBuilder, func: F)
123    where
124        F: FnOnce(Result<Self, Error>) + Send + 'static,
125    {
126        thread::spawn(move || {
127            let (conn_tx, conn_rx) = unbounded();
128
129            let mut conn = match Client::create_conn(builder) {
130                Ok(conn) => conn,
131                Err(err) => {
132                    func(Err(err));
133                    return;
134                }
135            };
136
137            let client = Self { conn_tx };
138            func(Ok(client));
139
140            while let Ok(cmd) = conn_rx.recv() {
141                match cmd {
142                    Command::Func(func) => func(&mut conn),
143                    Command::Shutdown(func) => match conn.close() {
144                        Ok(()) => {
145                            func(Ok(()));
146                            return;
147                        }
148                        Err((c, e)) => {
149                            conn = c;
150                            func(Err(e.into()));
151                        }
152                    },
153                }
154            }
155        });
156    }
157
158    fn create_conn(mut builder: ClientBuilder) -> Result<Connection, Error> {
159        let path = builder.path.take().unwrap_or_else(|| ":memory:".into());
160        let config = if let Some(flagsfn) = builder.flagsfn {
161            flagsfn()?
162        } else {
163            Config::default()
164        };
165        let conn = Connection::open_with_flags(path, config)?;
166        Ok(conn)
167    }
168
169    /// Invokes the provided function with a [`duckdb::Connection`].
170    pub async fn conn<F, T>(&self, func: F) -> Result<T, Error>
171    where
172        F: FnOnce(&Connection) -> Result<T, duckdb::Error> + Send + 'static,
173        T: Send + 'static,
174    {
175        let (tx, rx) = oneshot::channel();
176        self.conn_tx.send(Command::Func(Box::new(move |conn| {
177            _ = tx.send(func(conn));
178        })))?;
179        Ok(rx.await??)
180    }
181
182    /// Invokes the provided function with a mutable [`duckdb::Connection`].
183    pub async fn conn_mut<F, T>(&self, func: F) -> Result<T, Error>
184    where
185        F: FnOnce(&mut Connection) -> Result<T, duckdb::Error> + Send + 'static,
186        T: Send + 'static,
187    {
188        let (tx, rx) = oneshot::channel();
189        self.conn_tx.send(Command::Func(Box::new(move |conn| {
190            _ = tx.send(func(conn));
191        })))?;
192        Ok(rx.await??)
193    }
194
195    /// Closes the underlying duckdb connection.
196    ///
197    /// After this method returns, all calls to `self::conn()` or
198    /// `self::conn_mut()` will return an [`Error::Closed`] error.
199    pub async fn close(&self) -> Result<(), Error> {
200        let (tx, rx) = oneshot::channel();
201        let func = Box::new(|res| _ = tx.send(res));
202        if self.conn_tx.send(Command::Shutdown(func)).is_err() {
203            // If the worker thread has already shut down, return Ok here.
204            return Ok(());
205        }
206        // If receiving fails, the connection is already closed.
207        rx.await.unwrap_or(Ok(()))
208    }
209
210    /// Invokes the provided function with a [`duckdb::Connection`], blocking
211    /// the current thread until completion.
212    pub fn conn_blocking<F, T>(&self, func: F) -> Result<T, Error>
213    where
214        F: FnOnce(&Connection) -> Result<T, duckdb::Error> + Send + 'static,
215        T: Send + 'static,
216    {
217        let (tx, rx) = bounded(1);
218        self.conn_tx.send(Command::Func(Box::new(move |conn| {
219            _ = tx.send(func(conn));
220        })))?;
221        Ok(rx.recv()??)
222    }
223
224    /// Invokes the provided function with a mutable [`duckdb::Connection`],
225    /// blocking the current thread until completion.
226    pub fn conn_mut_blocking<F, T>(&self, func: F) -> Result<T, Error>
227    where
228        F: FnOnce(&mut Connection) -> Result<T, duckdb::Error> + Send + 'static,
229        T: Send + 'static,
230    {
231        let (tx, rx) = bounded(1);
232        self.conn_tx.send(Command::Func(Box::new(move |conn| {
233            _ = tx.send(func(conn));
234        })))?;
235        Ok(rx.recv()??)
236    }
237
238    /// Closes the underlying duckdb connection, blocking the current thread
239    /// until complete.
240    ///
241    /// After this method returns, all calls to `self::conn_blocking()` or
242    /// `self::conn_mut_blocking()` will return an [`Error::Closed`] error.
243    pub fn close_blocking(&self) -> Result<(), Error> {
244        let (tx, rx) = bounded(1);
245        let func = Box::new(move |res| _ = tx.send(res));
246        if self.conn_tx.send(Command::Shutdown(func)).is_err() {
247            return Ok(());
248        }
249        // If receiving fails, the connection is already closed.
250        rx.recv().unwrap_or(Ok(()))
251    }
252}