llkv_csv/
writer.rs

1//! CSV export writer built on top of table scan projections.
2//!
3//! [`CsvWriter`] adapts table scans into Arrow CSV writers, handling alias resolution and
4//! optional filtering while streaming batches directly to disk or an arbitrary `Write` sink.
5
6use std::collections::HashMap;
7use std::fs::File;
8use std::io::{BufWriter, Write};
9use std::ops::Bound;
10use std::path::Path;
11
12use arrow::csv::WriterBuilder;
13use llkv_column_map::store::Projection;
14use llkv_column_map::types::LogicalFieldId;
15use llkv_result::{Error, Result as LlkvResult};
16use llkv_storage::pager::Pager;
17use simd_r_drive_entry_handle::EntryHandle;
18
19use llkv_table::expr::{Expr, Filter, Operator};
20use llkv_table::table::{ScanProjection, ScanStreamOptions};
21use llkv_table::{Table, types::FieldId};
22
23/// Configuration for writing CSV files.
24#[derive(Debug, Clone)]
25pub struct CsvWriteOptions {
26    /// Write a header row with column names when true.
27    pub include_header: bool,
28    /// Delimiter to use between fields.
29    pub delimiter: u8,
30    /// Preserve rows that are entirely null when true.
31    pub include_nulls: bool,
32}
33
34impl Default for CsvWriteOptions {
35    fn default() -> Self {
36        Self {
37            include_header: true,
38            delimiter: b',',
39            include_nulls: true,
40        }
41    }
42}
43
44/// Column specification for CSV export.
45#[derive(Debug, Clone)]
46pub struct CsvExportColumn {
47    pub field_id: FieldId,
48    pub alias: Option<String>,
49}
50
51impl CsvExportColumn {
52    pub fn new(field_id: FieldId) -> Self {
53        Self {
54            field_id,
55            alias: None,
56        }
57    }
58
59    pub fn with_alias<S: Into<String>>(field_id: FieldId, alias: S) -> Self {
60        Self {
61            field_id,
62            alias: Some(alias.into()),
63        }
64    }
65}
66
67/// Builder-style helper for writing CSV data from a table.
68#[derive(Clone)]
69pub struct CsvWriter<'a, P>
70where
71    P: Pager<Blob = EntryHandle> + Send + Sync,
72{
73    table: &'a Table<P>,
74    options: CsvWriteOptions,
75}
76
77impl<'a, P> CsvWriter<'a, P>
78where
79    P: Pager<Blob = EntryHandle> + Send + Sync,
80{
81    pub fn new(table: &'a Table<P>) -> Self {
82        Self {
83            table,
84            options: CsvWriteOptions::default(),
85        }
86    }
87
88    pub fn with_options(table: &'a Table<P>, options: CsvWriteOptions) -> Self {
89        Self { table, options }
90    }
91
92    pub fn options(&self) -> &CsvWriteOptions {
93        &self.options
94    }
95
96    pub fn options_mut(&mut self) -> &mut CsvWriteOptions {
97        &mut self.options
98    }
99
100    pub fn into_options(self) -> CsvWriteOptions {
101        self.options
102    }
103
104    pub fn write_columns_to_path<C>(
105        &self,
106        csv_path: C,
107        columns: &[CsvExportColumn],
108    ) -> LlkvResult<()>
109    where
110        C: AsRef<Path>,
111    {
112        tracing::trace!(
113            "[CSV_EXPORT] write_columns_to_path called with {} columns",
114            columns.len()
115        );
116        if columns.is_empty() {
117            return Err(Error::InvalidArgumentError(
118                "at least one column must be provided for CSV export".into(),
119            ));
120        }
121
122        let filter_expr = Expr::Pred(Filter {
123            field_id: columns[0].field_id,
124            op: Operator::Range {
125                lower: Bound::Unbounded,
126                upper: Bound::Unbounded,
127            },
128        });
129
130        tracing::trace!("[CSV_EXPORT] Calling write_columns_to_path_with_filter");
131        let result = self.write_columns_to_path_with_filter(csv_path, columns, &filter_expr);
132        tracing::trace!(
133            "[CSV_EXPORT] write_columns_to_path_with_filter returned: {:?}",
134            result
135        );
136        result
137    }
138
139    pub fn write_columns_to_path_with_filter<C>(
140        &self,
141        csv_path: C,
142        columns: &[CsvExportColumn],
143        filter_expr: &Expr<'_, FieldId>,
144    ) -> LlkvResult<()>
145    where
146        C: AsRef<Path>,
147    {
148        tracing::trace!("[CSV_EXPORT] write_columns_to_path_with_filter called");
149        tracing::trace!("[CSV_EXPORT] About to call build_column_projections");
150        let projections = build_column_projections(self.table, columns)?;
151        tracing::trace!("[CSV_EXPORT] build_column_projections returned successfully");
152        self.write_projections_to_path(csv_path, projections, filter_expr)
153    }
154
155    pub fn write_columns_to_writer<W>(
156        &self,
157        writer: W,
158        columns: &[CsvExportColumn],
159        filter_expr: &Expr<'_, FieldId>,
160    ) -> LlkvResult<()>
161    where
162        W: Write,
163    {
164        let projections = build_column_projections(self.table, columns)?;
165        self.write_projections_to_writer(writer, projections, filter_expr)
166    }
167
168    pub fn write_projections_to_path<C, I, SP>(
169        &self,
170        csv_path: C,
171        projections: I,
172        filter_expr: &Expr<'_, FieldId>,
173    ) -> LlkvResult<()>
174    where
175        C: AsRef<Path>,
176        I: IntoIterator<Item = SP>,
177        SP: Into<ScanProjection>,
178    {
179        tracing::trace!("[CSV_EXPORT] write_projections_to_path called");
180        let file = File::create(csv_path.as_ref()).map_err(|err| {
181            Error::Internal(format!(
182                "failed to create CSV file '{}': {err}",
183                csv_path.as_ref().display()
184            ))
185        })?;
186        let writer = BufWriter::new(file);
187        tracing::trace!("[CSV_EXPORT] About to call write_projections_to_writer");
188        let result = self.write_projections_to_writer(writer, projections, filter_expr);
189        tracing::trace!(
190            "[CSV_EXPORT] write_projections_to_writer returned: {:?}",
191            result
192        );
193        result
194    }
195
196    pub fn write_projections_to_writer<W, I, SP>(
197        &self,
198        writer: W,
199        projections: I,
200        filter_expr: &Expr<'_, FieldId>,
201    ) -> LlkvResult<()>
202    where
203        W: Write,
204        I: IntoIterator<Item = SP>,
205        SP: Into<ScanProjection>,
206    {
207        tracing::trace!("[CSV_EXPORT] write_projections_to_writer called");
208        let mut projections: Vec<ScanProjection> =
209            projections.into_iter().map(|p| p.into()).collect();
210
211        tracing::trace!("[CSV_EXPORT] Got {} projections", projections.len());
212        if projections.is_empty() {
213            return Err(Error::InvalidArgumentError(
214                "at least one projection must be provided for CSV export".into(),
215            ));
216        }
217
218        tracing::trace!("[CSV_EXPORT] About to call ensure_column_aliases");
219        let result = ensure_column_aliases(self.table, &mut projections);
220        tracing::trace!("[CSV_EXPORT] ensure_column_aliases returned: {:?}", result);
221        result?;
222
223        let mut builder = WriterBuilder::new();
224        builder = builder.with_delimiter(self.options.delimiter);
225        builder = builder.with_header(self.options.include_header);
226        let mut csv_writer = builder.build(writer);
227
228        let mut write_error: Option<Error> = None;
229        let scan_options = ScanStreamOptions {
230            include_nulls: self.options.include_nulls,
231            order: None,
232            row_id_filter: None,
233        };
234
235        tracing::trace!("[CSV_EXPORT] About to call scan_stream_with_exprs");
236        tracing::trace!("[CSV_EXPORT] filter_expr: {:?}", filter_expr);
237        let scan_result =
238            self.table
239                .scan_stream_with_exprs(&projections, filter_expr, scan_options, |batch| {
240                    if write_error.is_some() {
241                        return;
242                    }
243                    if let Err(err) = csv_writer.write(&batch) {
244                        write_error =
245                            Some(Error::Internal(format!("failed to write CSV batch: {err}")));
246                    }
247                });
248        tracing::trace!(
249            "[CSV_EXPORT] scan_stream_with_exprs returned: {:?}",
250            scan_result
251        );
252        scan_result?;
253
254        if let Some(err) = write_error {
255            return Err(err);
256        }
257
258        let mut inner_writer = csv_writer.into_inner();
259        inner_writer
260            .flush()
261            .map_err(|err| Error::Internal(format!("failed to flush CSV writer: {err}")))?;
262
263        Ok(())
264    }
265}
266
267fn build_column_projections<P>(
268    table: &Table<P>,
269    columns: &[CsvExportColumn],
270) -> LlkvResult<Vec<ScanProjection>>
271where
272    P: Pager<Blob = EntryHandle> + Send + Sync,
273{
274    if columns.is_empty() {
275        return Err(Error::InvalidArgumentError(
276            "at least one column must be provided for CSV export".into(),
277        ));
278    }
279
280    let field_ids: Vec<FieldId> = columns.iter().map(|c| c.field_id).collect();
281    let column_meta = table.get_cols_meta(&field_ids);
282
283    let mut projections: Vec<ScanProjection> = Vec::with_capacity(columns.len());
284    for (idx, column) in columns.iter().enumerate() {
285        let lfid = LogicalFieldId::for_user(table.table_id(), column.field_id);
286        tracing::trace!(
287            "[CSV_EXPORT] Checking column field_id={}, lfid={:?}, table_id={}",
288            column.field_id,
289            lfid,
290            table.table_id()
291        );
292        match table.store().data_type(lfid) {
293            Ok(dt) => tracing::trace!("[CSV_EXPORT] Found data_type: {:?}", dt),
294            Err(e) => {
295                tracing::trace!("[CSV_EXPORT] data_type lookup failed: {:?}", e);
296                return Err(e);
297            }
298        }
299
300        let resolved_name = column
301            .alias
302            .clone()
303            .or_else(|| {
304                column_meta
305                    .get(idx)
306                    .and_then(|meta| meta.as_ref().and_then(|m| m.name.clone()))
307            })
308            .unwrap_or_else(|| format!("col_{}", column.field_id));
309
310        let projection = Projection::with_alias(lfid, resolved_name);
311        projections.push(ScanProjection::from(projection));
312    }
313
314    Ok(projections)
315}
316
317fn ensure_column_aliases<P>(table: &Table<P>, projections: &mut [ScanProjection]) -> LlkvResult<()>
318where
319    P: Pager<Blob = EntryHandle> + Send + Sync,
320{
321    let mut missing_field_ids: Vec<FieldId> = Vec::new();
322    for projection in projections.iter() {
323        if let ScanProjection::Column(col_proj) = projection {
324            if col_proj.logical_field_id.table_id() != table.table_id() {
325                return Err(Error::InvalidArgumentError(format!(
326                    "projection targets table {} but export is on table {}",
327                    col_proj.logical_field_id.table_id(),
328                    table.table_id(),
329                )));
330            }
331
332            if col_proj.alias.is_none() {
333                missing_field_ids.push(col_proj.logical_field_id.field_id());
334            }
335        }
336    }
337
338    if missing_field_ids.is_empty() {
339        return Ok(());
340    }
341
342    let metas = table.get_cols_meta(&missing_field_ids);
343    let mut alias_map: HashMap<FieldId, String> = HashMap::new();
344    for (field_id, meta) in missing_field_ids.iter().zip(metas.into_iter()) {
345        let name = meta
346            .and_then(|m| m.name)
347            .unwrap_or_else(|| format!("col_{}", field_id));
348        alias_map.insert(*field_id, name);
349    }
350
351    for projection in projections.iter_mut() {
352        if let ScanProjection::Column(col_proj) = projection
353            && col_proj.alias.is_none()
354        {
355            let field_id = col_proj.logical_field_id.field_id();
356            if let Some(name) = alias_map.get(&field_id) {
357                col_proj.alias = Some(name.clone());
358            } else {
359                col_proj.alias = Some(format!("col_{}", field_id));
360            }
361        }
362    }
363
364    Ok(())
365}