1use std::fs;
8use std::path::{Path, PathBuf};
9
10use alopex_core::columnar::encoding::{Column, LogicalType};
11use alopex_core::columnar::encoding_v2::Bitmap;
12use alopex_core::columnar::kvs_bridge::key_layout;
13use alopex_core::columnar::segment_v2::{
14 ColumnSchema, ColumnSegmentV2, RecordBatch, Schema, SegmentConfigV2, SegmentWriterV2,
15};
16use alopex_core::kv::{KVStore, KVTransaction};
17use alopex_core::storage::compression::CompressionV2;
18use alopex_core::storage::format::bincode_config;
19use bincode::config::Options;
20
21use crate::ast::ddl::IndexMethod;
22use crate::catalog::{
23 Catalog, ColumnMetadata, Compression, IndexMetadata, RowIdMode, TableMetadata,
24};
25use crate::columnar::statistics::compute_row_group_statistics;
26use crate::executor::hnsw_bridge::HnswBridge;
27use crate::executor::{ExecutionResult, ExecutorError, Result};
28use crate::planner::types::ResolvedType;
29use crate::storage::{SqlTransaction, SqlValue, StorageError};
30
31mod csv;
32mod parquet;
33
34pub use csv::CsvReader;
35pub use parquet::ParquetReader;
36
37#[derive(Debug, Clone, Copy, PartialEq, Eq)]
39pub enum FileFormat {
40 Csv,
41 Parquet,
42}
43
44#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
46pub struct CopyOptions {
47 pub header: bool,
49}
50
51#[derive(Debug, Clone, PartialEq, Eq, Default)]
53pub struct CopySecurityConfig {
54 pub allowed_base_dirs: Option<Vec<PathBuf>>,
56 pub allow_symlinks: bool,
58}
59
60#[derive(Debug, Clone, PartialEq, Eq)]
62pub struct CopyField {
63 pub name: Option<String>,
64 pub data_type: Option<ResolvedType>,
65}
66
67#[derive(Debug, Clone, PartialEq, Eq)]
69pub struct CopySchema {
70 pub fields: Vec<CopyField>,
71}
72
73impl CopySchema {
74 pub fn from_table(table: &TableMetadata) -> Self {
75 let fields = table
76 .columns
77 .iter()
78 .map(|c| CopyField {
79 name: Some(c.name.clone()),
80 data_type: Some(c.data_type.clone()),
81 })
82 .collect();
83 Self { fields }
84 }
85}
86
87pub trait BulkReader {
89 fn schema(&self) -> &CopySchema;
91 fn next_batch(&mut self, max_rows: usize) -> Result<Option<Vec<Vec<SqlValue>>>>;
93}
94
95pub fn execute_copy<S: KVStore, C: Catalog>(
97 txn: &mut SqlTransaction<'_, S>,
98 catalog: &C,
99 table_name: &str,
100 file_path: &str,
101 format: FileFormat,
102 options: CopyOptions,
103 config: &CopySecurityConfig,
104) -> Result<ExecutionResult> {
105 let table_meta = catalog
106 .get_table(table_name)
107 .cloned()
108 .ok_or_else(|| ExecutorError::TableNotFound(table_name.to_string()))?;
109
110 validate_file_path(file_path, config)?;
111
112 if !Path::new(file_path).exists() {
113 return Err(ExecutorError::FileNotFound(file_path.to_string()));
114 }
115
116 let reader: Box<dyn BulkReader> = match format {
117 FileFormat::Parquet => {
118 Box::new(ParquetReader::open(file_path, &table_meta, options.header)?)
119 }
120 FileFormat::Csv => Box::new(CsvReader::open(file_path, &table_meta, options.header)?),
121 };
122
123 validate_schema(reader.schema(), &table_meta)?;
124
125 let rows_loaded = match table_meta.storage_options.storage_type {
126 crate::catalog::StorageType::Columnar => {
127 bulk_load_columnar(txn, catalog, &table_meta, reader)?
128 }
129 crate::catalog::StorageType::Row => bulk_load_row(txn, catalog, &table_meta, reader)?,
130 };
131
132 Ok(ExecutionResult::RowsAffected(rows_loaded))
133}
134
135pub fn validate_file_path(file_path: &str, config: &CopySecurityConfig) -> Result<()> {
137 let path = Path::new(file_path);
138
139 if !path.exists() {
141 return Err(ExecutorError::FileNotFound(file_path.into()));
142 }
143
144 let canonical = path
145 .canonicalize()
146 .map_err(|e| ExecutorError::PathValidationFailed {
147 path: file_path.into(),
148 reason: format!("failed to canonicalize: {e}"),
149 })?;
150
151 if let Some(base_dirs) = &config.allowed_base_dirs {
152 let allowed = base_dirs.iter().any(|base| canonical.starts_with(base));
153 if !allowed {
154 return Err(ExecutorError::PathValidationFailed {
155 path: file_path.into(),
156 reason: format!("path not in allowed directories: {:?}", base_dirs),
157 });
158 }
159 }
160
161 if !config.allow_symlinks && path.is_symlink() {
162 return Err(ExecutorError::PathValidationFailed {
163 path: file_path.into(),
164 reason: "symbolic links not allowed".into(),
165 });
166 }
167
168 let metadata = fs::metadata(&canonical).map_err(|e| ExecutorError::PathValidationFailed {
169 path: file_path.into(),
170 reason: format!("cannot access file: {e}"),
171 })?;
172
173 if !metadata.is_file() {
174 return Err(ExecutorError::PathValidationFailed {
175 path: file_path.into(),
176 reason: "path is not a regular file".into(),
177 });
178 }
179
180 #[cfg(unix)]
181 {
182 use std::os::unix::fs::PermissionsExt;
183 if metadata.permissions().mode() & 0o444 == 0 {
184 return Err(ExecutorError::PathValidationFailed {
185 path: file_path.into(),
186 reason: "file is not readable".into(),
187 });
188 }
189 }
190
191 Ok(())
192}
193
194pub fn validate_schema(schema: &CopySchema, table_meta: &TableMetadata) -> Result<()> {
196 if schema.fields.len() != table_meta.columns.len() {
197 return Err(ExecutorError::SchemaMismatch {
198 expected: table_meta.columns.len(),
199 actual: schema.fields.len(),
200 reason: "column count mismatch".into(),
201 });
202 }
203
204 for (idx, (field, col)) in schema
205 .fields
206 .iter()
207 .zip(table_meta.columns.iter())
208 .enumerate()
209 {
210 if let Some(dt) = &field.data_type
211 && !is_type_compatible(dt, &col.data_type)
212 {
213 return Err(ExecutorError::SchemaMismatch {
214 expected: table_meta.columns.len(),
215 actual: schema.fields.len(),
216 reason: format!(
217 "type mismatch for column '{}': expected {:?}, got {:?}",
218 col.name, col.data_type, dt
219 ),
220 });
221 }
222 if let Some(name) = &field.name
223 && name != &col.name
224 {
225 return Err(ExecutorError::SchemaMismatch {
226 expected: table_meta.columns.len(),
227 actual: schema.fields.len(),
228 reason: format!(
229 "column name mismatch at position {}: expected '{}', got '{}'",
230 idx, col.name, name
231 ),
232 });
233 }
234 }
235
236 Ok(())
237}
238
239fn bulk_load_row<S: KVStore, C: Catalog>(
241 txn: &mut SqlTransaction<'_, S>,
242 catalog: &C,
243 table: &TableMetadata,
244 mut reader: Box<dyn BulkReader>,
245) -> Result<u64> {
246 let indexes: Vec<IndexMetadata> = catalog
247 .get_indexes_for_table(&table.name)
248 .into_iter()
249 .cloned()
250 .collect();
251 let (hnsw_indexes, btree_indexes): (Vec<_>, Vec<_>) = indexes
252 .into_iter()
253 .partition(|idx| matches!(idx.method, Some(IndexMethod::Hnsw)));
254
255 let mut staged: Vec<(u64, Vec<SqlValue>)> = Vec::new();
256 {
257 let mut storage = txn.table_storage(table);
258 while let Some(batch) = reader.next_batch(1024)? {
259 for row in batch {
260 if row.len() != table.column_count() {
261 return Err(ExecutorError::BulkLoad(format!(
262 "row has {} columns, expected {}",
263 row.len(),
264 table.column_count()
265 )));
266 }
267 let row_id = storage
268 .next_row_id()
269 .map_err(|e| map_storage_error(table, e))?;
270 storage
271 .insert(row_id, &row)
272 .map_err(|e| map_storage_error(table, e))?;
273 staged.push((row_id, row));
274 }
275 }
276 }
277
278 populate_indexes(txn, &btree_indexes, &staged)?;
279 populate_hnsw_indexes(txn, table, &hnsw_indexes, &staged)?;
280
281 Ok(staged.len() as u64)
282}
283
284fn bulk_load_columnar<S: KVStore, C: Catalog>(
286 txn: &mut SqlTransaction<'_, S>,
287 catalog: &C,
288 table: &TableMetadata,
289 mut reader: Box<dyn BulkReader>,
290) -> Result<u64> {
291 let _ = catalog; let row_group_size = table.storage_options.row_group_size.max(1) as usize;
294 let compression = map_compression(table.storage_options.compression);
295 let mut writer = SegmentWriterV2::new(SegmentConfigV2 {
296 row_group_size: row_group_size as u64,
297 compression,
298 ..Default::default()
299 });
300 let schema = build_segment_schema(table)?;
301
302 let mut row_group_stats = Vec::new();
303 let mut total_rows = 0u64;
304 while let Some(batch) = reader.next_batch(row_group_size)? {
305 if batch.is_empty() {
306 continue;
307 }
308 let stats = compute_row_group_statistics(&batch);
309 let record_batch = build_record_batch(&schema, table, &batch)?;
310 writer
311 .write_batch(record_batch)
312 .map_err(|e| ExecutorError::Columnar(e.to_string()))?;
313 row_group_stats.push(stats);
314 total_rows += batch.len() as u64;
315 }
316
317 if total_rows == 0 {
318 return Ok(0);
319 }
320
321 let segment = writer
322 .finish()
323 .map_err(|e| ExecutorError::Columnar(e.to_string()))?;
324 let _segment_id = persist_segment(txn, table, segment, &row_group_stats)?;
325
326 Ok(total_rows)
327}
328
329fn map_compression(compression: Compression) -> CompressionV2 {
330 let desired = match compression {
331 Compression::None => CompressionV2::None,
332 Compression::Lz4 => CompressionV2::Lz4,
333 Compression::Zstd => CompressionV2::Zstd { level: 3 },
334 };
335
336 if desired.is_available() {
337 desired
338 } else {
339 CompressionV2::None
340 }
341}
342
343fn build_segment_schema(table: &TableMetadata) -> Result<Schema> {
344 let mut columns = Vec::with_capacity(table.column_count());
345 for col in &table.columns {
346 let logical_type = logical_type_for(&col.data_type)?;
347 columns.push(ColumnSchema {
348 name: col.name.clone(),
349 logical_type,
350 nullable: !col.not_null,
351 fixed_len: fixed_len_for(&col.data_type),
352 });
353 }
354 Ok(Schema { columns })
355}
356
357fn logical_type_for(ty: &ResolvedType) -> Result<LogicalType> {
358 match ty {
359 ResolvedType::Integer | ResolvedType::BigInt | ResolvedType::Timestamp => {
360 Ok(LogicalType::Int64)
361 }
362 ResolvedType::Vector { dimension, .. } => {
363 Ok(LogicalType::Fixed(dimension.checked_mul(4).ok_or_else(|| {
364 ExecutorError::Columnar("vector dimension overflow when computing fixed len".into())
365 })? as u16))
366 }
367 ResolvedType::Float => Ok(LogicalType::Float32),
368 ResolvedType::Double => Ok(LogicalType::Float64),
369 ResolvedType::Boolean => Ok(LogicalType::Bool),
370 ResolvedType::Text | ResolvedType::Blob => Ok(LogicalType::Binary),
371 ResolvedType::Null => Err(ExecutorError::Columnar(
372 "NULL column type is not supported for columnar storage".into(),
373 )),
374 }
375}
376
377fn fixed_len_for(ty: &ResolvedType) -> Option<u32> {
378 match ty {
379 ResolvedType::Vector { dimension, .. } => Some(dimension.saturating_mul(4)),
380 _ => None,
381 }
382}
383
384fn build_record_batch(
385 schema: &Schema,
386 table: &TableMetadata,
387 rows: &[Vec<SqlValue>],
388) -> Result<RecordBatch> {
389 for row in rows {
390 if row.len() != table.column_count() {
391 return Err(ExecutorError::BulkLoad(format!(
392 "row has {} columns, expected {}",
393 row.len(),
394 table.column_count()
395 )));
396 }
397 }
398
399 let mut columns = Vec::with_capacity(table.column_count());
400 let mut bitmaps = Vec::with_capacity(table.column_count());
401 for (idx, col_meta) in table.columns.iter().enumerate() {
402 let (col, bitmap) = build_column(idx, col_meta, rows)?;
403 columns.push(col);
404 bitmaps.push(bitmap);
405 }
406
407 Ok(RecordBatch::new(schema.clone(), columns, bitmaps))
408}
409
410fn validity_bitmap(validity: &[bool]) -> Option<Bitmap> {
411 if validity.iter().all(|v| *v) {
412 None
413 } else {
414 Some(Bitmap::from_bools(validity))
415 }
416}
417
418fn build_column(
419 col_idx: usize,
420 col_meta: &ColumnMetadata,
421 rows: &[Vec<SqlValue>],
422) -> Result<(Column, Option<Bitmap>)> {
423 match &col_meta.data_type {
424 ResolvedType::Integer => {
425 let mut validity = Vec::with_capacity(rows.len());
426 let mut values = Vec::with_capacity(rows.len());
427 for row in rows {
428 match row
429 .get(col_idx)
430 .ok_or_else(|| ExecutorError::BulkLoad("row too short".into()))?
431 {
432 SqlValue::Null => {
433 validity.push(false);
434 values.push(0);
435 }
436 SqlValue::Integer(v) => {
437 validity.push(true);
438 values.push(*v as i64);
439 }
440 SqlValue::BigInt(v) => {
441 validity.push(true);
442 values.push(*v);
443 }
444 other => {
445 return Err(ExecutorError::BulkLoad(format!(
446 "type mismatch for column '{}': expected Integer, got {}",
447 col_meta.name,
448 other.type_name()
449 )));
450 }
451 }
452 }
453 Ok((Column::Int64(values), validity_bitmap(&validity)))
454 }
455 ResolvedType::BigInt | ResolvedType::Timestamp => {
456 let mut validity = Vec::with_capacity(rows.len());
457 let mut values = Vec::with_capacity(rows.len());
458 for row in rows {
459 match row
460 .get(col_idx)
461 .ok_or_else(|| ExecutorError::BulkLoad("row too short".into()))?
462 {
463 SqlValue::Null => {
464 validity.push(false);
465 values.push(0);
466 }
467 SqlValue::BigInt(v) | SqlValue::Timestamp(v) => {
468 validity.push(true);
469 values.push(*v);
470 }
471 SqlValue::Integer(v) => {
472 validity.push(true);
473 values.push(*v as i64);
474 }
475 other => {
476 return Err(ExecutorError::BulkLoad(format!(
477 "type mismatch for column '{}': expected BigInt/Timestamp, got {}",
478 col_meta.name,
479 other.type_name()
480 )));
481 }
482 }
483 }
484 Ok((Column::Int64(values), validity_bitmap(&validity)))
485 }
486 ResolvedType::Float => {
487 let mut validity = Vec::with_capacity(rows.len());
488 let mut values = Vec::with_capacity(rows.len());
489 for row in rows {
490 match row
491 .get(col_idx)
492 .ok_or_else(|| ExecutorError::BulkLoad("row too short".into()))?
493 {
494 SqlValue::Null => {
495 validity.push(false);
496 values.push(0.0);
497 }
498 SqlValue::Float(v) => {
499 validity.push(true);
500 values.push(*v);
501 }
502 other => {
503 return Err(ExecutorError::BulkLoad(format!(
504 "type mismatch for column '{}': expected Float, got {}",
505 col_meta.name,
506 other.type_name()
507 )));
508 }
509 }
510 }
511 Ok((Column::Float32(values), validity_bitmap(&validity)))
512 }
513 ResolvedType::Double => {
514 let mut validity = Vec::with_capacity(rows.len());
515 let mut values = Vec::with_capacity(rows.len());
516 for row in rows {
517 match row
518 .get(col_idx)
519 .ok_or_else(|| ExecutorError::BulkLoad("row too short".into()))?
520 {
521 SqlValue::Null => {
522 validity.push(false);
523 values.push(0.0);
524 }
525 SqlValue::Double(v) => {
526 validity.push(true);
527 values.push(*v);
528 }
529 other => {
530 return Err(ExecutorError::BulkLoad(format!(
531 "type mismatch for column '{}': expected Double, got {}",
532 col_meta.name,
533 other.type_name()
534 )));
535 }
536 }
537 }
538 Ok((Column::Float64(values), validity_bitmap(&validity)))
539 }
540 ResolvedType::Boolean => {
541 let mut validity = Vec::with_capacity(rows.len());
542 let mut values = Vec::with_capacity(rows.len());
543 for row in rows {
544 match row
545 .get(col_idx)
546 .ok_or_else(|| ExecutorError::BulkLoad("row too short".into()))?
547 {
548 SqlValue::Null => {
549 validity.push(false);
550 values.push(false);
551 }
552 SqlValue::Boolean(v) => {
553 validity.push(true);
554 values.push(*v);
555 }
556 other => {
557 return Err(ExecutorError::BulkLoad(format!(
558 "type mismatch for column '{}': expected Boolean, got {}",
559 col_meta.name,
560 other.type_name()
561 )));
562 }
563 }
564 }
565 Ok((Column::Bool(values), validity_bitmap(&validity)))
566 }
567 ResolvedType::Text => {
568 let mut validity = Vec::with_capacity(rows.len());
569 let mut values = Vec::with_capacity(rows.len());
570 for row in rows {
571 match row
572 .get(col_idx)
573 .ok_or_else(|| ExecutorError::BulkLoad("row too short".into()))?
574 {
575 SqlValue::Null => {
576 validity.push(false);
577 values.push(Vec::new());
578 }
579 SqlValue::Text(v) => {
580 validity.push(true);
581 values.push(v.as_bytes().to_vec());
582 }
583 other => {
584 return Err(ExecutorError::BulkLoad(format!(
585 "type mismatch for column '{}': expected Text, got {}",
586 col_meta.name,
587 other.type_name()
588 )));
589 }
590 }
591 }
592 Ok((Column::Binary(values), validity_bitmap(&validity)))
593 }
594 ResolvedType::Blob => {
595 let mut validity = Vec::with_capacity(rows.len());
596 let mut values = Vec::with_capacity(rows.len());
597 for row in rows {
598 match row
599 .get(col_idx)
600 .ok_or_else(|| ExecutorError::BulkLoad("row too short".into()))?
601 {
602 SqlValue::Null => {
603 validity.push(false);
604 values.push(Vec::new());
605 }
606 SqlValue::Blob(v) => {
607 validity.push(true);
608 values.push(v.clone());
609 }
610 other => {
611 return Err(ExecutorError::BulkLoad(format!(
612 "type mismatch for column '{}': expected Blob, got {}",
613 col_meta.name,
614 other.type_name()
615 )));
616 }
617 }
618 }
619 Ok((Column::Binary(values), validity_bitmap(&validity)))
620 }
621 ResolvedType::Vector { dimension, .. } => {
622 let fixed_len = dimension.saturating_mul(4) as usize;
623 let mut validity = Vec::with_capacity(rows.len());
624 let mut values = Vec::with_capacity(rows.len());
625 for row in rows {
626 match row
627 .get(col_idx)
628 .ok_or_else(|| ExecutorError::BulkLoad("row too short".into()))?
629 {
630 SqlValue::Null => {
631 validity.push(false);
632 values.push(vec![0u8; fixed_len]);
633 }
634 SqlValue::Vector(v) => {
635 if v.len() as u32 != *dimension {
636 return Err(ExecutorError::BulkLoad(format!(
637 "vector dimension mismatch for column '{}': expected {}, got {}",
638 col_meta.name,
639 dimension,
640 v.len()
641 )));
642 }
643 validity.push(true);
644 let mut buf = Vec::with_capacity(fixed_len);
645 for f in v {
646 buf.extend_from_slice(&f.to_le_bytes());
647 }
648 values.push(buf);
649 }
650 other => {
651 return Err(ExecutorError::BulkLoad(format!(
652 "type mismatch for column '{}': expected Vector, got {}",
653 col_meta.name,
654 other.type_name()
655 )));
656 }
657 }
658 }
659 Ok((
660 Column::Fixed {
661 len: fixed_len,
662 values,
663 },
664 validity_bitmap(&validity),
665 ))
666 }
667 ResolvedType::Null => Err(ExecutorError::Columnar(
668 "NULL column type is not supported for columnar storage".into(),
669 )),
670 }
671}
672
673fn persist_segment<S: KVStore>(
674 txn: &mut SqlTransaction<'_, S>,
675 table: &TableMetadata,
676 mut segment: ColumnSegmentV2,
677 row_group_stats: &[crate::columnar::statistics::RowGroupStatistics],
678) -> Result<u64> {
679 if row_group_stats.len() != segment.meta.row_groups.len() {
680 return Err(ExecutorError::Columnar(
681 "row group statistics length mismatch".into(),
682 ));
683 }
684
685 let table_id = table.table_id;
686 let index_key = key_layout::segment_index_key(table_id);
687 let existing = txn.inner_mut().get(&index_key)?;
688 let mut index: Vec<u64> = if let Some(bytes) = existing {
689 bincode_config()
690 .deserialize(&bytes)
691 .map_err(|e| ExecutorError::Columnar(e.to_string()))?
692 } else {
693 Vec::new()
694 };
695 let segment_id = index
696 .last()
697 .copied()
698 .map(|id| id.saturating_add(1))
699 .unwrap_or(0);
700
701 let mut row_group_stats = row_group_stats.to_vec();
702 if table.storage_options.row_id_mode == RowIdMode::Direct {
703 let total_rows = usize::try_from(segment.meta.num_rows)
704 .map_err(|_| ExecutorError::Columnar("segment row count exceeds usize::MAX".into()))?;
705 segment.row_ids = (0..total_rows)
706 .map(|idx| {
707 alopex_core::columnar::segment_v2::encode_row_id(segment_id, idx as u64)
708 .map_err(|e| ExecutorError::Columnar(e.to_string()))
709 })
710 .collect::<Result<Vec<u64>>>()?;
711
712 for (idx, meta) in segment.meta.row_groups.iter().enumerate() {
713 let start = usize::try_from(meta.row_start)
714 .map_err(|_| ExecutorError::Columnar("row_start exceeds usize::MAX".into()))?;
715 let count = usize::try_from(meta.row_count)
716 .map_err(|_| ExecutorError::Columnar("row_count exceeds usize::MAX".into()))?;
717 if count == 0 {
718 continue;
719 }
720 let end = start
721 .checked_add(count)
722 .ok_or_else(|| ExecutorError::Columnar("row_id range overflow".into()))?;
723 if end > segment.row_ids.len() {
724 return Err(ExecutorError::Columnar(
725 "row_ids length is smaller than row_group range".into(),
726 ));
727 }
728 row_group_stats[idx].row_id_min = segment.row_ids.get(start).copied();
729 row_group_stats[idx].row_id_max = segment.row_ids.get(end - 1).copied();
730 }
731 } else {
732 segment.row_ids.clear();
733 }
734
735 let segment_bytes = bincode_config()
736 .serialize(&segment)
737 .map_err(|e| ExecutorError::Columnar(e.to_string()))?;
738 txn.inner_mut().put(
739 key_layout::column_segment_key(table_id, segment_id, 0),
740 segment_bytes,
741 )?;
742
743 let meta_bytes = bincode_config()
744 .serialize(&segment.meta)
745 .map_err(|e| ExecutorError::Columnar(e.to_string()))?;
746 txn.inner_mut()
747 .put(key_layout::statistics_key(table_id, segment_id), meta_bytes)?;
748
749 let rg_bytes = bincode_config()
750 .serialize(&row_group_stats)
751 .map_err(|e| ExecutorError::Columnar(e.to_string()))?;
752 txn.inner_mut().put(
753 key_layout::row_group_stats_key(table_id, segment_id),
754 rg_bytes,
755 )?;
756
757 index.push(segment_id);
758 let index_bytes = bincode_config()
759 .serialize(&index)
760 .map_err(|e| ExecutorError::Columnar(e.to_string()))?;
761 txn.inner_mut().put(index_key, index_bytes)?;
762 Ok(segment_id)
763}
764
765pub(crate) fn parse_value(raw: &str, ty: &ResolvedType) -> Result<SqlValue> {
767 let trimmed = raw.trim();
768 if trimmed.eq_ignore_ascii_case("null") {
769 return Ok(SqlValue::Null);
770 }
771
772 match ty {
773 ResolvedType::Integer => trimmed
774 .parse::<i32>()
775 .map(SqlValue::Integer)
776 .map_err(|e| parse_error(trimmed, ty, e)),
777 ResolvedType::BigInt => trimmed
778 .parse::<i64>()
779 .map(SqlValue::BigInt)
780 .map_err(|e| parse_error(trimmed, ty, e)),
781 ResolvedType::Float => trimmed
782 .parse::<f32>()
783 .map(SqlValue::Float)
784 .map_err(|e| parse_error(trimmed, ty, e)),
785 ResolvedType::Double => trimmed
786 .parse::<f64>()
787 .map(SqlValue::Double)
788 .map_err(|e| parse_error(trimmed, ty, e)),
789 ResolvedType::Boolean => {
790 let parsed = trimmed
791 .parse::<bool>()
792 .or(match trimmed {
793 "1" => Ok(true),
794 "0" => Ok(false),
795 _ => Err(()),
796 })
797 .map_err(|_| {
798 ExecutorError::BulkLoad(format!(
799 "failed to parse value '{trimmed}' as {}: invalid boolean",
800 ty.type_name()
801 ))
802 })?;
803 Ok(SqlValue::Boolean(parsed))
804 }
805 ResolvedType::Timestamp => trimmed
806 .parse::<i64>()
807 .map(SqlValue::Timestamp)
808 .map_err(|e| parse_error(trimmed, ty, e)),
809 ResolvedType::Text => Ok(SqlValue::Text(trimmed.to_string())),
810 ResolvedType::Blob => Ok(SqlValue::Blob(trimmed.as_bytes().to_vec())),
811 ResolvedType::Vector { dimension, .. } => {
812 let body = trimmed.trim_matches(['[', ']']);
813 if body.is_empty() {
814 return Err(ExecutorError::BulkLoad(
815 "vector literal cannot be empty".into(),
816 ));
817 }
818 let mut values = Vec::new();
819 for part in body.split(',') {
820 let v = part
821 .trim()
822 .parse::<f32>()
823 .map_err(|e| ExecutorError::BulkLoad(format!("invalid vector value: {e}")))?;
824 values.push(v);
825 }
826 if values.len() as u32 != *dimension {
827 return Err(ExecutorError::BulkLoad(format!(
828 "vector dimension mismatch: expected {}, got {}",
829 dimension,
830 values.len()
831 )));
832 }
833 Ok(SqlValue::Vector(values))
834 }
835 ResolvedType::Null => Ok(SqlValue::Null),
836 }
837}
838
839fn parse_error(trimmed: &str, ty: &ResolvedType, err: impl std::fmt::Display) -> ExecutorError {
840 ExecutorError::BulkLoad(format!(
841 "failed to parse value '{trimmed}' as {}: {err}",
842 ty.type_name()
843 ))
844}
845
846fn is_type_compatible(file_type: &ResolvedType, table_type: &ResolvedType) -> bool {
847 match (file_type, table_type) {
848 (
849 ResolvedType::Vector {
850 dimension: f_dim,
851 metric: f_metric,
852 },
853 ResolvedType::Vector {
854 dimension: t_dim,
855 metric: t_metric,
856 },
857 ) => f_dim == t_dim && f_metric == t_metric,
858 (ft, tt) => ft == tt || ft.can_cast_to(tt),
859 }
860}
861
862fn map_storage_error(table: &TableMetadata, err: StorageError) -> ExecutorError {
863 match err {
864 StorageError::NullConstraintViolation { column } => {
865 ExecutorError::ConstraintViolation(crate::executor::ConstraintViolation::NotNull {
866 column,
867 })
868 }
869 StorageError::PrimaryKeyViolation { .. } => {
870 ExecutorError::ConstraintViolation(crate::executor::ConstraintViolation::PrimaryKey {
871 columns: table.primary_key.clone().unwrap_or_default(),
872 value: None,
873 })
874 }
875 StorageError::TransactionConflict => ExecutorError::TransactionConflict,
876 other => ExecutorError::Storage(other),
877 }
878}
879
880fn map_index_error(index: &IndexMetadata, err: StorageError) -> ExecutorError {
881 match err {
882 StorageError::UniqueViolation { .. } => {
883 if index.name.starts_with("__pk_") {
884 ExecutorError::ConstraintViolation(
885 crate::executor::ConstraintViolation::PrimaryKey {
886 columns: index.columns.clone(),
887 value: None,
888 },
889 )
890 } else {
891 ExecutorError::ConstraintViolation(crate::executor::ConstraintViolation::Unique {
892 index_name: index.name.clone(),
893 columns: index.columns.clone(),
894 value: None,
895 })
896 }
897 }
898 StorageError::NullConstraintViolation { column } => {
899 ExecutorError::ConstraintViolation(crate::executor::ConstraintViolation::NotNull {
900 column,
901 })
902 }
903 StorageError::TransactionConflict => ExecutorError::TransactionConflict,
904 other => ExecutorError::Storage(other),
905 }
906}
907
908fn populate_indexes<S: KVStore>(
909 txn: &mut SqlTransaction<'_, S>,
910 indexes: &[IndexMetadata],
911 rows: &[(u64, Vec<SqlValue>)],
912) -> Result<()> {
913 for index in indexes {
914 let mut storage =
915 txn.index_storage(index.index_id, index.unique, index.column_indices.clone());
916 for (row_id, row) in rows {
917 if should_skip_unique_index_for_null(index, row) {
918 continue;
919 }
920 storage
921 .insert(row, *row_id)
922 .map_err(|e| map_index_error(index, e))?;
923 }
924 }
925 Ok(())
926}
927
928fn populate_hnsw_indexes<S: KVStore>(
929 txn: &mut SqlTransaction<'_, S>,
930 table: &TableMetadata,
931 indexes: &[IndexMetadata],
932 rows: &[(u64, Vec<SqlValue>)],
933) -> Result<()> {
934 for index in indexes {
935 for (row_id, row) in rows {
936 HnswBridge::on_insert(txn, table, index, *row_id, row)?;
937 }
938 }
939 Ok(())
940}
941
942fn should_skip_unique_index_for_null(index: &IndexMetadata, row: &[SqlValue]) -> bool {
943 index.unique
944 && index
945 .column_indices
946 .iter()
947 .any(|&idx| row.get(idx).is_none_or(SqlValue::is_null))
948}
949
950#[cfg(test)]
951mod tests {
952 use super::*;
953 use crate::catalog::{ColumnMetadata, MemoryCatalog, StorageType};
954 use crate::executor::ddl::create_table::execute_create_table;
955 use crate::planner::types::ResolvedType;
956 use crate::storage::TxnBridge;
957 use ::parquet::arrow::ArrowWriter;
958 use alopex_core::kv::memory::MemoryKV;
959 use arrow_array::{Int32Array, RecordBatch, StringArray};
960 use arrow_schema::{DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema};
961 use std::fs::File;
962 use std::io::Write;
963 use std::path::Path;
964 use std::sync::Arc;
965
966 fn bridge() -> (TxnBridge<MemoryKV>, MemoryCatalog) {
967 (
968 TxnBridge::new(Arc::new(MemoryKV::new())),
969 MemoryCatalog::new(),
970 )
971 }
972
973 fn create_table(
974 bridge: &TxnBridge<MemoryKV>,
975 catalog: &mut MemoryCatalog,
976 storage: StorageType,
977 ) {
978 let mut table = TableMetadata::new(
979 "users",
980 vec![
981 ColumnMetadata::new("id", ResolvedType::Integer).with_primary_key(true),
982 ColumnMetadata::new("name", ResolvedType::Text),
983 ],
984 )
985 .with_primary_key(vec!["id".into()]);
986 table.storage_options.storage_type = storage;
987
988 let mut txn = bridge.begin_write().unwrap();
989 execute_create_table(&mut txn, catalog, table, vec![], false).unwrap();
990 txn.commit().unwrap();
991 }
992
993 #[test]
994 fn validate_file_path_rejects_symlink_and_directory() {
995 let dir = std::env::temp_dir();
996 let dir_path = dir.join("alopex_copy_dir");
997 std::fs::create_dir_all(&dir_path).unwrap();
998
999 let config = CopySecurityConfig {
1000 allowed_base_dirs: Some(vec![dir.clone()]),
1001 allow_symlinks: false,
1002 };
1003
1004 let err = validate_file_path(dir_path.to_str().unwrap(), &config).unwrap_err();
1006 assert!(matches!(err, ExecutorError::PathValidationFailed { .. }));
1007
1008 #[cfg(unix)]
1010 {
1011 use std::os::unix::fs::symlink;
1012 let file_path = dir.join("alopex_copy_file.txt");
1013 fs::write(&file_path, "1,alice\n").unwrap();
1014 let link = dir.join("alopex_copy_link.txt");
1015 let _ = fs::remove_file(&link);
1016 symlink(&file_path, &link).unwrap();
1017 let err = validate_file_path(link.to_str().unwrap(), &config).unwrap_err();
1018 assert!(matches!(err, ExecutorError::PathValidationFailed { .. }));
1019 }
1020 }
1021
1022 #[test]
1023 fn validate_schema_checks_names_and_types() {
1024 let (bridge, mut catalog) = bridge();
1025 create_table(&bridge, &mut catalog, StorageType::Row);
1026 let table = catalog.get_table("users").unwrap();
1027
1028 let schema = CopySchema {
1029 fields: vec![
1030 CopyField {
1031 name: Some("users".into()),
1032 data_type: Some(ResolvedType::Integer),
1033 },
1034 CopyField {
1035 name: Some("name".into()),
1036 data_type: Some(ResolvedType::Text),
1037 },
1038 ],
1039 };
1040
1041 let err = validate_schema(&schema, table).unwrap_err();
1042 assert!(matches!(err, ExecutorError::SchemaMismatch { .. }));
1043 }
1044
1045 #[test]
1046 fn execute_copy_csv_inserts_rows() {
1047 let dir = std::env::temp_dir();
1048 let file_path = dir.join("alopex_copy_test.csv");
1049 let mut file = File::create(&file_path).unwrap();
1050 writeln!(file, "id,name").unwrap();
1051 writeln!(file, "1,alice").unwrap();
1052 writeln!(file, "2,bob").unwrap();
1053
1054 let (bridge, mut catalog) = bridge();
1055 create_table(&bridge, &mut catalog, StorageType::Row);
1056
1057 let mut txn = bridge.begin_write().unwrap();
1058 let result = execute_copy(
1059 &mut txn,
1060 &catalog,
1061 "users",
1062 file_path.to_str().unwrap(),
1063 FileFormat::Csv,
1064 CopyOptions { header: true },
1065 &CopySecurityConfig::default(),
1066 )
1067 .unwrap();
1068 txn.commit().unwrap();
1069 assert_eq!(result, ExecutionResult::RowsAffected(2));
1070
1071 let table = catalog.get_table("users").unwrap().clone();
1073 let mut read_txn = bridge.begin_read().unwrap();
1074 let mut storage = read_txn.table_storage(&table);
1075 let rows: Vec<_> = storage.scan().unwrap().map(|r| r.unwrap().1).collect();
1076 assert_eq!(rows.len(), 2);
1077 assert!(rows.contains(&vec![SqlValue::Integer(1), SqlValue::Text("alice".into())]));
1078 }
1079
1080 #[test]
1081 fn execute_copy_parquet_reads_schema_and_rows() {
1082 let dir = std::env::temp_dir();
1083 let file_path = dir.join("alopex_copy_test.parquet");
1084 write_parquet_sample(&file_path, 2);
1085
1086 let (bridge, mut catalog) = bridge();
1087 create_table(&bridge, &mut catalog, StorageType::Row);
1088
1089 let mut txn = bridge.begin_write().unwrap();
1090 let result = execute_copy(
1091 &mut txn,
1092 &catalog,
1093 "users",
1094 file_path.to_str().unwrap(),
1095 FileFormat::Parquet,
1096 CopyOptions::default(),
1097 &CopySecurityConfig::default(),
1098 )
1099 .unwrap();
1100 txn.commit().unwrap();
1101 assert_eq!(result, ExecutionResult::RowsAffected(2));
1102
1103 let table = catalog.get_table("users").unwrap().clone();
1105 let mut read_txn = bridge.begin_read().unwrap();
1106 let mut storage = read_txn.table_storage(&table);
1107 let rows: Vec<_> = storage.scan().unwrap().map(|r| r.unwrap().1).collect();
1108 assert_eq!(rows.len(), 2);
1109 assert!(rows.contains(&vec![SqlValue::Integer(1), SqlValue::Text("user0".into())]));
1110 }
1111
1112 #[test]
1113 fn parquet_reader_streams_batches() {
1114 let dir = std::env::temp_dir();
1115 let file_path = dir.join("alopex_copy_stream.parquet");
1116 write_parquet_sample(&file_path, 1500);
1117
1118 let (bridge, mut catalog) = bridge();
1119 create_table(&bridge, &mut catalog, StorageType::Row);
1120 let table = catalog.get_table("users").unwrap().clone();
1121
1122 let mut reader = ParquetReader::open(file_path.to_str().unwrap(), &table, false).unwrap();
1123 let mut batches = 0;
1124 let mut total = 0;
1125 while let Some(batch) = reader.next_batch(512).unwrap() {
1126 total += batch.len();
1127 batches += 1;
1128 }
1129 assert!(
1130 batches >= 2,
1131 "複数バッチを期待しましたが {batches} バッチでした"
1132 );
1133 assert_eq!(total, 1500);
1134 }
1135
1136 fn write_parquet_sample(path: &Path, count: usize) {
1137 let schema = Arc::new(ArrowSchema::new(vec![
1138 ArrowField::new("id", ArrowDataType::Int32, false),
1139 ArrowField::new("name", ArrowDataType::Utf8, false),
1140 ]));
1141
1142 let file = File::create(path).unwrap();
1143 let mut writer = ArrowWriter::try_new(file, schema.clone(), None).unwrap();
1144
1145 let chunk_size = 700;
1146 let mut start = 0;
1147 while start < count {
1148 let end = (start + chunk_size).min(count);
1149 let ids: Vec<i32> = ((start + 1) as i32..=end as i32).collect();
1150 let names: Vec<String> = (start..end).map(|i| format!("user{i}")).collect();
1151
1152 let batch = RecordBatch::try_new(
1153 schema.clone(),
1154 vec![
1155 Arc::new(Int32Array::from(ids)) as Arc<_>,
1156 Arc::new(StringArray::from(names)) as Arc<_>,
1157 ],
1158 )
1159 .unwrap();
1160 writer.write(&batch).unwrap();
1161 start = end;
1162 }
1163
1164 writer.close().unwrap();
1165 }
1166}