duckdb/appender/
mod.rs

1use super::{ffi, AppenderParams, Connection, Result, ValueRef};
2use std::{ffi::c_void, fmt, os::raw::c_char};
3
4use crate::{
5    error::result_from_duckdb_appender,
6    types::{ToSql, ToSqlOutput},
7    Error,
8};
9
10/// Appender for fast import data
11pub struct Appender<'conn> {
12    conn: &'conn Connection,
13    app: ffi::duckdb_appender,
14}
15
16#[cfg(feature = "appender-arrow")]
17mod arrow;
18
19impl Appender<'_> {
20    /// Append multiple rows from Iterator
21    ///
22    /// ## Example
23    ///
24    /// ```rust,no_run
25    /// # use duckdb::{Connection, Result, params};
26    /// fn insert_rows(conn: &Connection) -> Result<()> {
27    ///     let mut app = conn.appender("foo")?;
28    ///     app.append_rows([[1, 2], [3, 4], [5, 6], [7, 8], [9, 10]])?;
29    ///     Ok(())
30    /// }
31    /// ```
32    ///
33    /// # Failure
34    ///
35    /// Will return `Err` if append column count not the same with the table schema
36    #[inline]
37    pub fn append_rows<P, I>(&mut self, rows: I) -> Result<()>
38    where
39        I: IntoIterator<Item = P>,
40        P: AppenderParams,
41    {
42        for row in rows {
43            self.append_row(row)?;
44        }
45        Ok(())
46    }
47
48    /// Append one row
49    ///
50    /// ## Example
51    ///
52    /// ```rust,no_run
53    /// # use duckdb::{Connection, Result, params};
54    /// fn insert_row(conn: &Connection) -> Result<()> {
55    ///     let mut app = conn.appender("foo")?;
56    ///     app.append_row([1, 2])?;
57    ///     Ok(())
58    /// }
59    /// ```
60    ///
61    /// # Failure
62    ///
63    /// Will return `Err` if append column count not the same with the table schema
64    #[inline]
65    pub fn append_row<P: AppenderParams>(&mut self, params: P) -> Result<()> {
66        let _ = unsafe { ffi::duckdb_appender_begin_row(self.app) };
67        params.__bind_in(self)?;
68        // NOTE: we only check end_row return value
69        let rc = unsafe { ffi::duckdb_appender_end_row(self.app) };
70        result_from_duckdb_appender(rc, &mut self.app)
71    }
72
73    #[inline]
74    pub(crate) fn bind_parameters<P>(&mut self, params: P) -> Result<()>
75    where
76        P: IntoIterator,
77        P::Item: ToSql,
78    {
79        for p in params.into_iter() {
80            self.bind_parameter(&p)?;
81        }
82        Ok(())
83    }
84
85    fn bind_parameter<P: ?Sized + ToSql>(&self, param: &P) -> Result<()> {
86        let value = param.to_sql()?;
87
88        let ptr = self.app;
89        let value = match value {
90            ToSqlOutput::Borrowed(v) => v,
91            ToSqlOutput::Owned(ref v) => ValueRef::from(v),
92        };
93        // NOTE: we ignore the return value here
94        //       because if anything failed, end_row will fail
95        // TODO: append more
96        let rc = match value {
97            ValueRef::Null => unsafe { ffi::duckdb_append_null(ptr) },
98            ValueRef::Boolean(i) => unsafe { ffi::duckdb_append_bool(ptr, i) },
99            ValueRef::TinyInt(i) => unsafe { ffi::duckdb_append_int8(ptr, i) },
100            ValueRef::SmallInt(i) => unsafe { ffi::duckdb_append_int16(ptr, i) },
101            ValueRef::Int(i) => unsafe { ffi::duckdb_append_int32(ptr, i) },
102            ValueRef::BigInt(i) => unsafe { ffi::duckdb_append_int64(ptr, i) },
103            ValueRef::UTinyInt(i) => unsafe { ffi::duckdb_append_uint8(ptr, i) },
104            ValueRef::USmallInt(i) => unsafe { ffi::duckdb_append_uint16(ptr, i) },
105            ValueRef::UInt(i) => unsafe { ffi::duckdb_append_uint32(ptr, i) },
106            ValueRef::UBigInt(i) => unsafe { ffi::duckdb_append_uint64(ptr, i) },
107            ValueRef::HugeInt(i) => unsafe {
108                let hi = ffi::duckdb_hugeint {
109                    lower: i as u64,
110                    upper: (i >> 64) as i64,
111                };
112                ffi::duckdb_append_hugeint(ptr, hi)
113            },
114
115            ValueRef::Float(r) => unsafe { ffi::duckdb_append_float(ptr, r) },
116            ValueRef::Double(r) => unsafe { ffi::duckdb_append_double(ptr, r) },
117            ValueRef::Text(s) => unsafe {
118                ffi::duckdb_append_varchar_length(ptr, s.as_ptr() as *const c_char, s.len() as u64)
119            },
120            ValueRef::Timestamp(u, i) => unsafe {
121                ffi::duckdb_append_timestamp(ptr, ffi::duckdb_timestamp { micros: u.to_micros(i) })
122            },
123            ValueRef::Blob(b) => unsafe { ffi::duckdb_append_blob(ptr, b.as_ptr() as *const c_void, b.len() as u64) },
124            ValueRef::Date32(d) => unsafe { ffi::duckdb_append_date(ptr, ffi::duckdb_date { days: d }) },
125            ValueRef::Time64(u, v) => unsafe {
126                ffi::duckdb_append_time(ptr, ffi::duckdb_time { micros: u.to_micros(v) })
127            },
128            ValueRef::Interval { months, days, nanos } => unsafe {
129                ffi::duckdb_append_interval(
130                    ptr,
131                    ffi::duckdb_interval {
132                        months,
133                        days,
134                        micros: nanos / 1000,
135                    },
136                )
137            },
138            _ => unreachable!("not supported"),
139        };
140        if rc != 0 {
141            return Err(Error::AppendError);
142        }
143        Ok(())
144    }
145
146    #[inline]
147    pub(super) fn new(conn: &Connection, app: ffi::duckdb_appender) -> Appender<'_> {
148        Appender { conn, app }
149    }
150
151    /// Flush data into DB
152    #[inline]
153    pub fn flush(&mut self) -> Result<()> {
154        unsafe {
155            let res = ffi::duckdb_appender_flush(self.app);
156            result_from_duckdb_appender(res, &mut self.app)
157        }
158    }
159}
160
161impl Drop for Appender<'_> {
162    fn drop(&mut self) {
163        if !self.app.is_null() {
164            let _ = self.flush(); // can't safely handle failures here
165            unsafe {
166                ffi::duckdb_appender_close(self.app);
167                ffi::duckdb_appender_destroy(&mut self.app);
168            }
169        }
170    }
171}
172
173impl fmt::Debug for Appender<'_> {
174    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
175        f.debug_struct("Appender").field("conn", self.conn).finish()
176    }
177}
178
179#[cfg(test)]
180mod test {
181    use crate::{Connection, Result};
182
183    #[test]
184    fn test_append_one_row() -> Result<()> {
185        let db = Connection::open_in_memory()?;
186        db.execute_batch("CREATE TABLE foo(x INTEGER)")?;
187
188        {
189            let mut app = db.appender("foo")?;
190            app.append_row([42])?;
191        }
192
193        let val = db.query_row("SELECT x FROM foo", [], |row| <(i32,)>::try_from(row))?;
194        assert_eq!(val, (42,));
195        Ok(())
196    }
197
198    #[test]
199    fn test_append_rows() -> Result<()> {
200        let db = Connection::open_in_memory()?;
201        db.execute_batch("CREATE TABLE foo(x INTEGER, y INTEGER)")?;
202
203        {
204            let mut app = db.appender("foo")?;
205            app.append_rows([[1, 2], [3, 4], [5, 6], [7, 8], [9, 10]])?;
206        }
207
208        let val = db.query_row("SELECT sum(x), sum(y) FROM foo", [], |row| <(i32, i32)>::try_from(row))?;
209        assert_eq!(val, (25, 30));
210        Ok(())
211    }
212
213    // Waiting https://github.com/duckdb/duckdb/pull/3405
214    #[cfg(feature = "uuid")]
215    #[test]
216    #[ignore = "not supported for now"]
217    fn test_append_uuid() -> Result<()> {
218        use uuid::Uuid;
219
220        let db = Connection::open_in_memory()?;
221        db.execute_batch("CREATE TABLE foo(x UUID)")?;
222
223        let id = Uuid::new_v4();
224        {
225            let mut app = db.appender("foo")?;
226            app.append_row([id])?;
227        }
228
229        let val = db.query_row("SELECT x FROM foo", [], |row| <(Uuid,)>::try_from(row))?;
230        assert_eq!(val, (id,));
231        Ok(())
232    }
233
234    #[test]
235    fn test_append_string_as_ts_row() -> Result<()> {
236        let db = Connection::open_in_memory()?;
237        db.execute_batch("CREATE TABLE foo(x TIMESTAMP)")?;
238
239        {
240            let mut app = db.appender("foo")?;
241            app.append_row(["2022-04-09 15:56:37.544"])?;
242        }
243
244        let val = db.query_row("SELECT x FROM foo", [], |row| <(i64,)>::try_from(row))?;
245        assert_eq!(val, (1649519797544000,));
246        Ok(())
247    }
248
249    #[test]
250    fn test_append_timestamp() -> Result<()> {
251        use std::time::Duration;
252        let db = Connection::open_in_memory()?;
253        db.execute_batch("CREATE TABLE foo(x TIMESTAMP)")?;
254
255        let d = Duration::from_secs(1);
256        {
257            let mut app = db.appender("foo")?;
258            app.append_row([d])?;
259        }
260
261        let val = db.query_row("SELECT x FROM foo where x=?", [d], |row| <(i32,)>::try_from(row))?;
262        assert_eq!(val, (d.as_micros() as i32,));
263        Ok(())
264    }
265
266    #[test]
267    #[cfg(feature = "chrono")]
268    fn test_append_datetime() -> Result<()> {
269        use crate::params;
270        use chrono::{NaiveDate, NaiveDateTime};
271
272        let db = Connection::open_in_memory()?;
273        db.execute_batch("CREATE TABLE foo(x DATE, y TIMESTAMP)")?;
274
275        let date = NaiveDate::from_ymd_opt(2024, 6, 5).unwrap();
276        let timestamp = date.and_hms_opt(18, 26, 53).unwrap();
277        {
278            let mut app = db.appender("foo")?;
279            app.append_row(params![date, timestamp])?;
280        }
281        let (date2, timestamp2) = db.query_row("SELECT x, y FROM foo", [], |row| {
282            Ok((row.get::<_, NaiveDate>(0)?, row.get::<_, NaiveDateTime>(1)?))
283        })?;
284        assert_eq!(date, date2);
285        assert_eq!(timestamp, timestamp2);
286        Ok(())
287    }
288
289    #[test]
290    fn test_appender_error() -> Result<(), crate::Error> {
291        use crate::params;
292        let conn = Connection::open_in_memory()?;
293        conn.execute(
294            r"CREATE TABLE foo (
295            foobar TEXT,
296            foobar_int INT,
297            foobar_split TEXT[] AS (split(trim(foobar), ','))
298            );",
299            [],
300        )?;
301        let mut appender = conn.appender("foo")?;
302        match appender.append_row(params!["foo"]) {
303            Err(crate::Error::DuckDBFailure(.., Some(msg))) => {
304                assert_eq!(msg, "Call to EndRow before all columns have been appended to!")
305            }
306            Err(err) => panic!("unexpected error: {:?}", err),
307            Ok(_) => panic!("expected an error but got Ok"),
308        }
309        Ok(())
310    }
311}