1use std::collections::HashMap;
7use std::fs::File;
8use std::io::{BufWriter, Write};
9use std::ops::Bound;
10use std::path::Path;
11
12use arrow::csv::WriterBuilder;
13use llkv_column_map::store::Projection;
14use llkv_column_map::types::LogicalFieldId;
15use llkv_result::{Error, Result as LlkvResult};
16use llkv_storage::pager::Pager;
17use simd_r_drive_entry_handle::EntryHandle;
18
19use llkv_table::expr::{Expr, Filter, Operator};
20use llkv_table::table::{ScanProjection, ScanStreamOptions};
21use llkv_table::{Table, types::FieldId};
22
23#[derive(Debug, Clone)]
25pub struct CsvWriteOptions {
26 pub include_header: bool,
28 pub delimiter: u8,
30 pub include_nulls: bool,
32}
33
34impl Default for CsvWriteOptions {
35 fn default() -> Self {
36 Self {
37 include_header: true,
38 delimiter: b',',
39 include_nulls: true,
40 }
41 }
42}
43
44#[derive(Debug, Clone)]
46pub struct CsvExportColumn {
47 pub field_id: FieldId,
48 pub alias: Option<String>,
49}
50
51impl CsvExportColumn {
52 pub fn new(field_id: FieldId) -> Self {
53 Self {
54 field_id,
55 alias: None,
56 }
57 }
58
59 pub fn with_alias<S: Into<String>>(field_id: FieldId, alias: S) -> Self {
60 Self {
61 field_id,
62 alias: Some(alias.into()),
63 }
64 }
65}
66
67#[derive(Clone)]
69pub struct CsvWriter<'a, P>
70where
71 P: Pager<Blob = EntryHandle> + Send + Sync,
72{
73 table: &'a Table<P>,
74 options: CsvWriteOptions,
75}
76
77impl<'a, P> CsvWriter<'a, P>
78where
79 P: Pager<Blob = EntryHandle> + Send + Sync,
80{
81 pub fn new(table: &'a Table<P>) -> Self {
82 Self {
83 table,
84 options: CsvWriteOptions::default(),
85 }
86 }
87
88 pub fn with_options(table: &'a Table<P>, options: CsvWriteOptions) -> Self {
89 Self { table, options }
90 }
91
92 pub fn options(&self) -> &CsvWriteOptions {
93 &self.options
94 }
95
96 pub fn options_mut(&mut self) -> &mut CsvWriteOptions {
97 &mut self.options
98 }
99
100 pub fn into_options(self) -> CsvWriteOptions {
101 self.options
102 }
103
104 pub fn write_columns_to_path<C>(
105 &self,
106 csv_path: C,
107 columns: &[CsvExportColumn],
108 ) -> LlkvResult<()>
109 where
110 C: AsRef<Path>,
111 {
112 tracing::trace!(
113 "[CSV_EXPORT] write_columns_to_path called with {} columns",
114 columns.len()
115 );
116 if columns.is_empty() {
117 return Err(Error::InvalidArgumentError(
118 "at least one column must be provided for CSV export".into(),
119 ));
120 }
121
122 let filter_expr = Expr::Pred(Filter {
123 field_id: columns[0].field_id,
124 op: Operator::Range {
125 lower: Bound::Unbounded,
126 upper: Bound::Unbounded,
127 },
128 });
129
130 tracing::trace!("[CSV_EXPORT] Calling write_columns_to_path_with_filter");
131 let result = self.write_columns_to_path_with_filter(csv_path, columns, &filter_expr);
132 tracing::trace!(
133 "[CSV_EXPORT] write_columns_to_path_with_filter returned: {:?}",
134 result
135 );
136 result
137 }
138
139 pub fn write_columns_to_path_with_filter<C>(
140 &self,
141 csv_path: C,
142 columns: &[CsvExportColumn],
143 filter_expr: &Expr<'_, FieldId>,
144 ) -> LlkvResult<()>
145 where
146 C: AsRef<Path>,
147 {
148 tracing::trace!("[CSV_EXPORT] write_columns_to_path_with_filter called");
149 tracing::trace!("[CSV_EXPORT] About to call build_column_projections");
150 let projections = build_column_projections(self.table, columns)?;
151 tracing::trace!("[CSV_EXPORT] build_column_projections returned successfully");
152 self.write_projections_to_path(csv_path, projections, filter_expr)
153 }
154
155 pub fn write_columns_to_writer<W>(
156 &self,
157 writer: W,
158 columns: &[CsvExportColumn],
159 filter_expr: &Expr<'_, FieldId>,
160 ) -> LlkvResult<()>
161 where
162 W: Write,
163 {
164 let projections = build_column_projections(self.table, columns)?;
165 self.write_projections_to_writer(writer, projections, filter_expr)
166 }
167
168 pub fn write_projections_to_path<C, I, SP>(
169 &self,
170 csv_path: C,
171 projections: I,
172 filter_expr: &Expr<'_, FieldId>,
173 ) -> LlkvResult<()>
174 where
175 C: AsRef<Path>,
176 I: IntoIterator<Item = SP>,
177 SP: Into<ScanProjection>,
178 {
179 tracing::trace!("[CSV_EXPORT] write_projections_to_path called");
180 let file = File::create(csv_path.as_ref()).map_err(|err| {
181 Error::Internal(format!(
182 "failed to create CSV file '{}': {err}",
183 csv_path.as_ref().display()
184 ))
185 })?;
186 let writer = BufWriter::new(file);
187 tracing::trace!("[CSV_EXPORT] About to call write_projections_to_writer");
188 let result = self.write_projections_to_writer(writer, projections, filter_expr);
189 tracing::trace!(
190 "[CSV_EXPORT] write_projections_to_writer returned: {:?}",
191 result
192 );
193 result
194 }
195
196 pub fn write_projections_to_writer<W, I, SP>(
197 &self,
198 writer: W,
199 projections: I,
200 filter_expr: &Expr<'_, FieldId>,
201 ) -> LlkvResult<()>
202 where
203 W: Write,
204 I: IntoIterator<Item = SP>,
205 SP: Into<ScanProjection>,
206 {
207 tracing::trace!("[CSV_EXPORT] write_projections_to_writer called");
208 let mut projections: Vec<ScanProjection> =
209 projections.into_iter().map(|p| p.into()).collect();
210
211 tracing::trace!("[CSV_EXPORT] Got {} projections", projections.len());
212 if projections.is_empty() {
213 return Err(Error::InvalidArgumentError(
214 "at least one projection must be provided for CSV export".into(),
215 ));
216 }
217
218 tracing::trace!("[CSV_EXPORT] About to call ensure_column_aliases");
219 let result = ensure_column_aliases(self.table, &mut projections);
220 tracing::trace!("[CSV_EXPORT] ensure_column_aliases returned: {:?}", result);
221 result?;
222
223 let mut builder = WriterBuilder::new();
224 builder = builder.with_delimiter(self.options.delimiter);
225 builder = builder.with_header(self.options.include_header);
226 let mut csv_writer = builder.build(writer);
227
228 let mut write_error: Option<Error> = None;
229 let scan_options = ScanStreamOptions {
230 include_nulls: self.options.include_nulls,
231 order: None,
232 row_id_filter: None,
233 };
234
235 tracing::trace!("[CSV_EXPORT] About to call scan_stream_with_exprs");
236 tracing::trace!("[CSV_EXPORT] filter_expr: {:?}", filter_expr);
237 let scan_result =
238 self.table
239 .scan_stream_with_exprs(&projections, filter_expr, scan_options, |batch| {
240 if write_error.is_some() {
241 return;
242 }
243 if let Err(err) = csv_writer.write(&batch) {
244 write_error =
245 Some(Error::Internal(format!("failed to write CSV batch: {err}")));
246 }
247 });
248 tracing::trace!(
249 "[CSV_EXPORT] scan_stream_with_exprs returned: {:?}",
250 scan_result
251 );
252 scan_result?;
253
254 if let Some(err) = write_error {
255 return Err(err);
256 }
257
258 let mut inner_writer = csv_writer.into_inner();
259 inner_writer
260 .flush()
261 .map_err(|err| Error::Internal(format!("failed to flush CSV writer: {err}")))?;
262
263 Ok(())
264 }
265}
266
267fn build_column_projections<P>(
268 table: &Table<P>,
269 columns: &[CsvExportColumn],
270) -> LlkvResult<Vec<ScanProjection>>
271where
272 P: Pager<Blob = EntryHandle> + Send + Sync,
273{
274 if columns.is_empty() {
275 return Err(Error::InvalidArgumentError(
276 "at least one column must be provided for CSV export".into(),
277 ));
278 }
279
280 let field_ids: Vec<FieldId> = columns.iter().map(|c| c.field_id).collect();
281 let column_meta = table.get_cols_meta(&field_ids);
282
283 let mut projections: Vec<ScanProjection> = Vec::with_capacity(columns.len());
284 for (idx, column) in columns.iter().enumerate() {
285 let lfid = LogicalFieldId::for_user(table.table_id(), column.field_id);
286 tracing::trace!(
287 "[CSV_EXPORT] Checking column field_id={}, lfid={:?}, table_id={}",
288 column.field_id,
289 lfid,
290 table.table_id()
291 );
292 match table.store().data_type(lfid) {
293 Ok(dt) => tracing::trace!("[CSV_EXPORT] Found data_type: {:?}", dt),
294 Err(e) => {
295 tracing::trace!("[CSV_EXPORT] data_type lookup failed: {:?}", e);
296 return Err(e);
297 }
298 }
299
300 let resolved_name = column
301 .alias
302 .clone()
303 .or_else(|| {
304 column_meta
305 .get(idx)
306 .and_then(|meta| meta.as_ref().and_then(|m| m.name.clone()))
307 })
308 .unwrap_or_else(|| format!("col_{}", column.field_id));
309
310 let projection = Projection::with_alias(lfid, resolved_name);
311 projections.push(ScanProjection::from(projection));
312 }
313
314 Ok(projections)
315}
316
317fn ensure_column_aliases<P>(table: &Table<P>, projections: &mut [ScanProjection]) -> LlkvResult<()>
318where
319 P: Pager<Blob = EntryHandle> + Send + Sync,
320{
321 let mut missing_field_ids: Vec<FieldId> = Vec::new();
322 for projection in projections.iter() {
323 if let ScanProjection::Column(col_proj) = projection {
324 if col_proj.logical_field_id.table_id() != table.table_id() {
325 return Err(Error::InvalidArgumentError(format!(
326 "projection targets table {} but export is on table {}",
327 col_proj.logical_field_id.table_id(),
328 table.table_id(),
329 )));
330 }
331
332 if col_proj.alias.is_none() {
333 missing_field_ids.push(col_proj.logical_field_id.field_id());
334 }
335 }
336 }
337
338 if missing_field_ids.is_empty() {
339 return Ok(());
340 }
341
342 let metas = table.get_cols_meta(&missing_field_ids);
343 let mut alias_map: HashMap<FieldId, String> = HashMap::new();
344 for (field_id, meta) in missing_field_ids.iter().zip(metas.into_iter()) {
345 let name = meta
346 .and_then(|m| m.name)
347 .unwrap_or_else(|| format!("col_{}", field_id));
348 alias_map.insert(*field_id, name);
349 }
350
351 for projection in projections.iter_mut() {
352 if let ScanProjection::Column(col_proj) = projection
353 && col_proj.alias.is_none()
354 {
355 let field_id = col_proj.logical_field_id.field_id();
356 if let Some(name) = alias_map.get(&field_id) {
357 col_proj.alias = Some(name.clone());
358 } else {
359 col_proj.alias = Some(format!("col_{}", field_id));
360 }
361 }
362 }
363
364 Ok(())
365}