1use super::{ffi, AppenderParams, Connection, Result, ValueRef};
2use std::{ffi::c_void, fmt, os::raw::c_char};
3
4use crate::{
5 error::result_from_duckdb_appender,
6 types::{ToSql, ToSqlOutput},
7 Error,
8};
9
10pub struct Appender<'conn> {
12 conn: &'conn Connection,
13 app: ffi::duckdb_appender,
14}
15
16#[cfg(feature = "appender-arrow")]
17mod arrow;
18
19impl Appender<'_> {
20 #[inline]
37 pub fn append_rows<P, I>(&mut self, rows: I) -> Result<()>
38 where
39 I: IntoIterator<Item = P>,
40 P: AppenderParams,
41 {
42 for row in rows {
43 self.append_row(row)?;
44 }
45 Ok(())
46 }
47
48 #[inline]
65 pub fn append_row<P: AppenderParams>(&mut self, params: P) -> Result<()> {
66 let _ = unsafe { ffi::duckdb_appender_begin_row(self.app) };
67 params.__bind_in(self)?;
68 let rc = unsafe { ffi::duckdb_appender_end_row(self.app) };
70 result_from_duckdb_appender(rc, &mut self.app)
71 }
72
73 #[inline]
74 pub(crate) fn bind_parameters<P>(&mut self, params: P) -> Result<()>
75 where
76 P: IntoIterator,
77 P::Item: ToSql,
78 {
79 for p in params.into_iter() {
80 self.bind_parameter(&p)?;
81 }
82 Ok(())
83 }
84
85 fn bind_parameter<P: ?Sized + ToSql>(&self, param: &P) -> Result<()> {
86 let value = param.to_sql()?;
87
88 let ptr = self.app;
89 let value = match value {
90 ToSqlOutput::Borrowed(v) => v,
91 ToSqlOutput::Owned(ref v) => ValueRef::from(v),
92 };
93 let rc = match value {
97 ValueRef::Null => unsafe { ffi::duckdb_append_null(ptr) },
98 ValueRef::Boolean(i) => unsafe { ffi::duckdb_append_bool(ptr, i) },
99 ValueRef::TinyInt(i) => unsafe { ffi::duckdb_append_int8(ptr, i) },
100 ValueRef::SmallInt(i) => unsafe { ffi::duckdb_append_int16(ptr, i) },
101 ValueRef::Int(i) => unsafe { ffi::duckdb_append_int32(ptr, i) },
102 ValueRef::BigInt(i) => unsafe { ffi::duckdb_append_int64(ptr, i) },
103 ValueRef::UTinyInt(i) => unsafe { ffi::duckdb_append_uint8(ptr, i) },
104 ValueRef::USmallInt(i) => unsafe { ffi::duckdb_append_uint16(ptr, i) },
105 ValueRef::UInt(i) => unsafe { ffi::duckdb_append_uint32(ptr, i) },
106 ValueRef::UBigInt(i) => unsafe { ffi::duckdb_append_uint64(ptr, i) },
107 ValueRef::HugeInt(i) => unsafe {
108 let hi = ffi::duckdb_hugeint {
109 lower: i as u64,
110 upper: (i >> 64) as i64,
111 };
112 ffi::duckdb_append_hugeint(ptr, hi)
113 },
114
115 ValueRef::Float(r) => unsafe { ffi::duckdb_append_float(ptr, r) },
116 ValueRef::Double(r) => unsafe { ffi::duckdb_append_double(ptr, r) },
117 ValueRef::Text(s) => unsafe {
118 ffi::duckdb_append_varchar_length(ptr, s.as_ptr() as *const c_char, s.len() as u64)
119 },
120 ValueRef::Timestamp(u, i) => unsafe {
121 ffi::duckdb_append_timestamp(ptr, ffi::duckdb_timestamp { micros: u.to_micros(i) })
122 },
123 ValueRef::Blob(b) => unsafe { ffi::duckdb_append_blob(ptr, b.as_ptr() as *const c_void, b.len() as u64) },
124 ValueRef::Date32(d) => unsafe { ffi::duckdb_append_date(ptr, ffi::duckdb_date { days: d }) },
125 ValueRef::Time64(u, v) => unsafe {
126 ffi::duckdb_append_time(ptr, ffi::duckdb_time { micros: u.to_micros(v) })
127 },
128 ValueRef::Interval { months, days, nanos } => unsafe {
129 ffi::duckdb_append_interval(
130 ptr,
131 ffi::duckdb_interval {
132 months,
133 days,
134 micros: nanos / 1000,
135 },
136 )
137 },
138 _ => unreachable!("not supported"),
139 };
140 if rc != 0 {
141 return Err(Error::AppendError);
142 }
143 Ok(())
144 }
145
146 #[inline]
147 pub(super) fn new(conn: &Connection, app: ffi::duckdb_appender) -> Appender<'_> {
148 Appender { conn, app }
149 }
150
151 #[inline]
153 pub fn flush(&mut self) -> Result<()> {
154 unsafe {
155 let res = ffi::duckdb_appender_flush(self.app);
156 result_from_duckdb_appender(res, &mut self.app)
157 }
158 }
159}
160
161impl Drop for Appender<'_> {
162 fn drop(&mut self) {
163 if !self.app.is_null() {
164 let _ = self.flush(); unsafe {
166 ffi::duckdb_appender_close(self.app);
167 ffi::duckdb_appender_destroy(&mut self.app);
168 }
169 }
170 }
171}
172
173impl fmt::Debug for Appender<'_> {
174 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
175 f.debug_struct("Appender").field("conn", self.conn).finish()
176 }
177}
178
179#[cfg(test)]
180mod test {
181 use crate::{Connection, Result};
182
183 #[test]
184 fn test_append_one_row() -> Result<()> {
185 let db = Connection::open_in_memory()?;
186 db.execute_batch("CREATE TABLE foo(x INTEGER)")?;
187
188 {
189 let mut app = db.appender("foo")?;
190 app.append_row([42])?;
191 }
192
193 let val = db.query_row("SELECT x FROM foo", [], |row| <(i32,)>::try_from(row))?;
194 assert_eq!(val, (42,));
195 Ok(())
196 }
197
198 #[test]
199 fn test_append_rows() -> Result<()> {
200 let db = Connection::open_in_memory()?;
201 db.execute_batch("CREATE TABLE foo(x INTEGER, y INTEGER)")?;
202
203 {
204 let mut app = db.appender("foo")?;
205 app.append_rows([[1, 2], [3, 4], [5, 6], [7, 8], [9, 10]])?;
206 }
207
208 let val = db.query_row("SELECT sum(x), sum(y) FROM foo", [], |row| <(i32, i32)>::try_from(row))?;
209 assert_eq!(val, (25, 30));
210 Ok(())
211 }
212
213 #[cfg(feature = "uuid")]
215 #[test]
216 #[ignore = "not supported for now"]
217 fn test_append_uuid() -> Result<()> {
218 use uuid::Uuid;
219
220 let db = Connection::open_in_memory()?;
221 db.execute_batch("CREATE TABLE foo(x UUID)")?;
222
223 let id = Uuid::new_v4();
224 {
225 let mut app = db.appender("foo")?;
226 app.append_row([id])?;
227 }
228
229 let val = db.query_row("SELECT x FROM foo", [], |row| <(Uuid,)>::try_from(row))?;
230 assert_eq!(val, (id,));
231 Ok(())
232 }
233
234 #[test]
235 fn test_append_string_as_ts_row() -> Result<()> {
236 let db = Connection::open_in_memory()?;
237 db.execute_batch("CREATE TABLE foo(x TIMESTAMP)")?;
238
239 {
240 let mut app = db.appender("foo")?;
241 app.append_row(["2022-04-09 15:56:37.544"])?;
242 }
243
244 let val = db.query_row("SELECT x FROM foo", [], |row| <(i64,)>::try_from(row))?;
245 assert_eq!(val, (1649519797544000,));
246 Ok(())
247 }
248
249 #[test]
250 fn test_append_timestamp() -> Result<()> {
251 use std::time::Duration;
252 let db = Connection::open_in_memory()?;
253 db.execute_batch("CREATE TABLE foo(x TIMESTAMP)")?;
254
255 let d = Duration::from_secs(1);
256 {
257 let mut app = db.appender("foo")?;
258 app.append_row([d])?;
259 }
260
261 let val = db.query_row("SELECT x FROM foo where x=?", [d], |row| <(i32,)>::try_from(row))?;
262 assert_eq!(val, (d.as_micros() as i32,));
263 Ok(())
264 }
265
266 #[test]
267 #[cfg(feature = "chrono")]
268 fn test_append_datetime() -> Result<()> {
269 use crate::params;
270 use chrono::{NaiveDate, NaiveDateTime};
271
272 let db = Connection::open_in_memory()?;
273 db.execute_batch("CREATE TABLE foo(x DATE, y TIMESTAMP)")?;
274
275 let date = NaiveDate::from_ymd_opt(2024, 6, 5).unwrap();
276 let timestamp = date.and_hms_opt(18, 26, 53).unwrap();
277 {
278 let mut app = db.appender("foo")?;
279 app.append_row(params![date, timestamp])?;
280 }
281 let (date2, timestamp2) = db.query_row("SELECT x, y FROM foo", [], |row| {
282 Ok((row.get::<_, NaiveDate>(0)?, row.get::<_, NaiveDateTime>(1)?))
283 })?;
284 assert_eq!(date, date2);
285 assert_eq!(timestamp, timestamp2);
286 Ok(())
287 }
288
289 #[test]
290 fn test_appender_error() -> Result<(), crate::Error> {
291 use crate::params;
292 let conn = Connection::open_in_memory()?;
293 conn.execute(
294 r"CREATE TABLE foo (
295 foobar TEXT,
296 foobar_int INT,
297 foobar_split TEXT[] AS (split(trim(foobar), ','))
298 );",
299 [],
300 )?;
301 let mut appender = conn.appender("foo")?;
302 match appender.append_row(params!["foo"]) {
303 Err(crate::Error::DuckDBFailure(.., Some(msg))) => {
304 assert_eq!(msg, "Call to EndRow before all columns have been appended to!")
305 }
306 Err(err) => panic!("unexpected error: {:?}", err),
307 Ok(_) => panic!("expected an error but got Ok"),
308 }
309 Ok(())
310 }
311}