alopex_sql/executor/bulk/
mod.rs

1//! COPY / Bulk Load 実装。
2//!
3//! 現段階では CSV/Parquet を簡易的に読み込み、テーブルスキーマに従って
4//! `SqlValue` へ変換する。Columnar ストレージも Row ストレージと同じ経路で
5//! 取り込み、将来の columnar エンジン実装で差し替え可能な構造にしている。
6
7use std::fs;
8use std::path::{Path, PathBuf};
9
10use alopex_core::columnar::encoding::{Column, LogicalType};
11use alopex_core::columnar::encoding_v2::Bitmap;
12use alopex_core::columnar::kvs_bridge::key_layout;
13use alopex_core::columnar::segment_v2::{
14    ColumnSchema, ColumnSegmentV2, RecordBatch, Schema, SegmentConfigV2, SegmentWriterV2,
15};
16use alopex_core::kv::{KVStore, KVTransaction};
17use alopex_core::storage::compression::CompressionV2;
18use alopex_core::storage::format::bincode_config;
19use bincode::config::Options;
20
21use crate::ast::ddl::IndexMethod;
22use crate::catalog::{
23    Catalog, ColumnMetadata, Compression, IndexMetadata, RowIdMode, TableMetadata,
24};
25use crate::columnar::statistics::compute_row_group_statistics;
26use crate::executor::hnsw_bridge::HnswBridge;
27use crate::executor::{ExecutionResult, ExecutorError, Result};
28use crate::planner::types::ResolvedType;
29use crate::storage::{SqlTransaction, SqlValue, StorageError};
30
31mod csv;
32mod parquet;
33
34pub use csv::CsvReader;
35pub use parquet::ParquetReader;
36
37/// ファイル形式。
38#[derive(Debug, Clone, Copy, PartialEq, Eq)]
39pub enum FileFormat {
40    Csv,
41    Parquet,
42}
43
44/// COPY オプション。
45#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
46pub struct CopyOptions {
47    /// CSV ヘッダ行の有無。
48    pub header: bool,
49}
50
51/// COPY セキュリティ設定。
52#[derive(Debug, Clone, PartialEq, Eq, Default)]
53pub struct CopySecurityConfig {
54    /// 許可するベースディレクトリ一覧(None なら無制限)。
55    pub allowed_base_dirs: Option<Vec<PathBuf>>,
56    /// シンボリックリンクを許可するか。
57    pub allow_symlinks: bool,
58}
59
60/// 入力スキーマのフィールド。
61#[derive(Debug, Clone, PartialEq, Eq)]
62pub struct CopyField {
63    pub name: Option<String>,
64    pub data_type: Option<ResolvedType>,
65}
66
67/// 入力スキーマ。
68#[derive(Debug, Clone, PartialEq, Eq)]
69pub struct CopySchema {
70    pub fields: Vec<CopyField>,
71}
72
73impl CopySchema {
74    pub fn from_table(table: &TableMetadata) -> Self {
75        let fields = table
76            .columns
77            .iter()
78            .map(|c| CopyField {
79                name: Some(c.name.clone()),
80                data_type: Some(c.data_type.clone()),
81            })
82            .collect();
83        Self { fields }
84    }
85}
86
87/// バッチリーダー。
88pub trait BulkReader {
89    /// 入力スキーマを返す。
90    fn schema(&self) -> &CopySchema;
91    /// 最大 `max_rows` 行のバッチを返す。終端で None。
92    fn next_batch(&mut self, max_rows: usize) -> Result<Option<Vec<Vec<SqlValue>>>>;
93}
94
95/// COPY 文を実行する。
96pub fn execute_copy<S: KVStore, C: Catalog>(
97    txn: &mut SqlTransaction<'_, S>,
98    catalog: &C,
99    table_name: &str,
100    file_path: &str,
101    format: FileFormat,
102    options: CopyOptions,
103    config: &CopySecurityConfig,
104) -> Result<ExecutionResult> {
105    let table_meta = catalog
106        .get_table(table_name)
107        .cloned()
108        .ok_or_else(|| ExecutorError::TableNotFound(table_name.to_string()))?;
109
110    validate_file_path(file_path, config)?;
111
112    if !Path::new(file_path).exists() {
113        return Err(ExecutorError::FileNotFound(file_path.to_string()));
114    }
115
116    let reader: Box<dyn BulkReader> = match format {
117        FileFormat::Parquet => {
118            Box::new(ParquetReader::open(file_path, &table_meta, options.header)?)
119        }
120        FileFormat::Csv => Box::new(CsvReader::open(file_path, &table_meta, options.header)?),
121    };
122
123    validate_schema(reader.schema(), &table_meta)?;
124
125    let rows_loaded = match table_meta.storage_options.storage_type {
126        crate::catalog::StorageType::Columnar => {
127            bulk_load_columnar(txn, catalog, &table_meta, reader)?
128        }
129        crate::catalog::StorageType::Row => bulk_load_row(txn, catalog, &table_meta, reader)?,
130    };
131
132    Ok(ExecutionResult::RowsAffected(rows_loaded))
133}
134
135/// パスセキュリティ検証。
136pub fn validate_file_path(file_path: &str, config: &CopySecurityConfig) -> Result<()> {
137    let path = Path::new(file_path);
138
139    // 先に存在確認を行い、設計どおり FileNotFound を優先する。
140    if !path.exists() {
141        return Err(ExecutorError::FileNotFound(file_path.into()));
142    }
143
144    let canonical = path
145        .canonicalize()
146        .map_err(|e| ExecutorError::PathValidationFailed {
147            path: file_path.into(),
148            reason: format!("failed to canonicalize: {e}"),
149        })?;
150
151    if let Some(base_dirs) = &config.allowed_base_dirs {
152        let allowed = base_dirs.iter().any(|base| canonical.starts_with(base));
153        if !allowed {
154            return Err(ExecutorError::PathValidationFailed {
155                path: file_path.into(),
156                reason: format!("path not in allowed directories: {:?}", base_dirs),
157            });
158        }
159    }
160
161    if !config.allow_symlinks && path.is_symlink() {
162        return Err(ExecutorError::PathValidationFailed {
163            path: file_path.into(),
164            reason: "symbolic links not allowed".into(),
165        });
166    }
167
168    let metadata = fs::metadata(&canonical).map_err(|e| ExecutorError::PathValidationFailed {
169        path: file_path.into(),
170        reason: format!("cannot access file: {e}"),
171    })?;
172
173    if !metadata.is_file() {
174        return Err(ExecutorError::PathValidationFailed {
175            path: file_path.into(),
176            reason: "path is not a regular file".into(),
177        });
178    }
179
180    #[cfg(unix)]
181    {
182        use std::os::unix::fs::PermissionsExt;
183        if metadata.permissions().mode() & 0o444 == 0 {
184            return Err(ExecutorError::PathValidationFailed {
185                path: file_path.into(),
186                reason: "file is not readable".into(),
187            });
188        }
189    }
190
191    Ok(())
192}
193
194/// スキーマ整合性検証。
195pub fn validate_schema(schema: &CopySchema, table_meta: &TableMetadata) -> Result<()> {
196    if schema.fields.len() != table_meta.columns.len() {
197        return Err(ExecutorError::SchemaMismatch {
198            expected: table_meta.columns.len(),
199            actual: schema.fields.len(),
200            reason: "column count mismatch".into(),
201        });
202    }
203
204    for (idx, (field, col)) in schema
205        .fields
206        .iter()
207        .zip(table_meta.columns.iter())
208        .enumerate()
209    {
210        if let Some(dt) = &field.data_type
211            && !is_type_compatible(dt, &col.data_type)
212        {
213            return Err(ExecutorError::SchemaMismatch {
214                expected: table_meta.columns.len(),
215                actual: schema.fields.len(),
216                reason: format!(
217                    "type mismatch for column '{}': expected {:?}, got {:?}",
218                    col.name, col.data_type, dt
219                ),
220            });
221        }
222        if let Some(name) = &field.name
223            && name != &col.name
224        {
225            return Err(ExecutorError::SchemaMismatch {
226                expected: table_meta.columns.len(),
227                actual: schema.fields.len(),
228                reason: format!(
229                    "column name mismatch at position {}: expected '{}', got '{}'",
230                    idx, col.name, name
231                ),
232            });
233        }
234    }
235
236    Ok(())
237}
238
239/// Row ストレージへの書き込み。
240fn bulk_load_row<S: KVStore, C: Catalog>(
241    txn: &mut SqlTransaction<'_, S>,
242    catalog: &C,
243    table: &TableMetadata,
244    mut reader: Box<dyn BulkReader>,
245) -> Result<u64> {
246    let indexes: Vec<IndexMetadata> = catalog
247        .get_indexes_for_table(&table.name)
248        .into_iter()
249        .cloned()
250        .collect();
251    let (hnsw_indexes, btree_indexes): (Vec<_>, Vec<_>) = indexes
252        .into_iter()
253        .partition(|idx| matches!(idx.method, Some(IndexMethod::Hnsw)));
254
255    let mut staged: Vec<(u64, Vec<SqlValue>)> = Vec::new();
256    {
257        let mut storage = txn.table_storage(table);
258        while let Some(batch) = reader.next_batch(1024)? {
259            for row in batch {
260                if row.len() != table.column_count() {
261                    return Err(ExecutorError::BulkLoad(format!(
262                        "row has {} columns, expected {}",
263                        row.len(),
264                        table.column_count()
265                    )));
266                }
267                let row_id = storage
268                    .next_row_id()
269                    .map_err(|e| map_storage_error(table, e))?;
270                storage
271                    .insert(row_id, &row)
272                    .map_err(|e| map_storage_error(table, e))?;
273                staged.push((row_id, row));
274            }
275        }
276    }
277
278    populate_indexes(txn, &btree_indexes, &staged)?;
279    populate_hnsw_indexes(txn, table, &hnsw_indexes, &staged)?;
280
281    Ok(staged.len() as u64)
282}
283
284/// Columnar ストレージへの書き込み(現状は Row と同経路で処理)。
285fn bulk_load_columnar<S: KVStore, C: Catalog>(
286    txn: &mut SqlTransaction<'_, S>,
287    catalog: &C,
288    table: &TableMetadata,
289    mut reader: Box<dyn BulkReader>,
290) -> Result<u64> {
291    let _ = catalog; // reserved for future index integration
292
293    let row_group_size = table.storage_options.row_group_size.max(1) as usize;
294    let compression = map_compression(table.storage_options.compression);
295    let mut writer = SegmentWriterV2::new(SegmentConfigV2 {
296        row_group_size: row_group_size as u64,
297        compression,
298        ..Default::default()
299    });
300    let schema = build_segment_schema(table)?;
301
302    let mut row_group_stats = Vec::new();
303    let mut total_rows = 0u64;
304    while let Some(batch) = reader.next_batch(row_group_size)? {
305        if batch.is_empty() {
306            continue;
307        }
308        let stats = compute_row_group_statistics(&batch);
309        let record_batch = build_record_batch(&schema, table, &batch)?;
310        writer
311            .write_batch(record_batch)
312            .map_err(|e| ExecutorError::Columnar(e.to_string()))?;
313        row_group_stats.push(stats);
314        total_rows += batch.len() as u64;
315    }
316
317    if total_rows == 0 {
318        return Ok(0);
319    }
320
321    let segment = writer
322        .finish()
323        .map_err(|e| ExecutorError::Columnar(e.to_string()))?;
324    let _segment_id = persist_segment(txn, table, segment, &row_group_stats)?;
325
326    Ok(total_rows)
327}
328
329fn map_compression(compression: Compression) -> CompressionV2 {
330    let desired = match compression {
331        Compression::None => CompressionV2::None,
332        Compression::Lz4 => CompressionV2::Lz4,
333        Compression::Zstd => CompressionV2::Zstd { level: 3 },
334    };
335
336    if desired.is_available() {
337        desired
338    } else {
339        CompressionV2::None
340    }
341}
342
343fn build_segment_schema(table: &TableMetadata) -> Result<Schema> {
344    let mut columns = Vec::with_capacity(table.column_count());
345    for col in &table.columns {
346        let logical_type = logical_type_for(&col.data_type)?;
347        columns.push(ColumnSchema {
348            name: col.name.clone(),
349            logical_type,
350            nullable: !col.not_null,
351            fixed_len: fixed_len_for(&col.data_type),
352        });
353    }
354    Ok(Schema { columns })
355}
356
357fn logical_type_for(ty: &ResolvedType) -> Result<LogicalType> {
358    match ty {
359        ResolvedType::Integer | ResolvedType::BigInt | ResolvedType::Timestamp => {
360            Ok(LogicalType::Int64)
361        }
362        ResolvedType::Vector { dimension, .. } => {
363            Ok(LogicalType::Fixed(dimension.checked_mul(4).ok_or_else(|| {
364                ExecutorError::Columnar("vector dimension overflow when computing fixed len".into())
365            })? as u16))
366        }
367        ResolvedType::Float => Ok(LogicalType::Float32),
368        ResolvedType::Double => Ok(LogicalType::Float64),
369        ResolvedType::Boolean => Ok(LogicalType::Bool),
370        ResolvedType::Text | ResolvedType::Blob => Ok(LogicalType::Binary),
371        ResolvedType::Null => Err(ExecutorError::Columnar(
372            "NULL column type is not supported for columnar storage".into(),
373        )),
374    }
375}
376
377fn fixed_len_for(ty: &ResolvedType) -> Option<u32> {
378    match ty {
379        ResolvedType::Vector { dimension, .. } => Some(dimension.saturating_mul(4)),
380        _ => None,
381    }
382}
383
384fn build_record_batch(
385    schema: &Schema,
386    table: &TableMetadata,
387    rows: &[Vec<SqlValue>],
388) -> Result<RecordBatch> {
389    for row in rows {
390        if row.len() != table.column_count() {
391            return Err(ExecutorError::BulkLoad(format!(
392                "row has {} columns, expected {}",
393                row.len(),
394                table.column_count()
395            )));
396        }
397    }
398
399    let mut columns = Vec::with_capacity(table.column_count());
400    let mut bitmaps = Vec::with_capacity(table.column_count());
401    for (idx, col_meta) in table.columns.iter().enumerate() {
402        let (col, bitmap) = build_column(idx, col_meta, rows)?;
403        columns.push(col);
404        bitmaps.push(bitmap);
405    }
406
407    Ok(RecordBatch::new(schema.clone(), columns, bitmaps))
408}
409
410fn validity_bitmap(validity: &[bool]) -> Option<Bitmap> {
411    if validity.iter().all(|v| *v) {
412        None
413    } else {
414        Some(Bitmap::from_bools(validity))
415    }
416}
417
418fn build_column(
419    col_idx: usize,
420    col_meta: &ColumnMetadata,
421    rows: &[Vec<SqlValue>],
422) -> Result<(Column, Option<Bitmap>)> {
423    match &col_meta.data_type {
424        ResolvedType::Integer => {
425            let mut validity = Vec::with_capacity(rows.len());
426            let mut values = Vec::with_capacity(rows.len());
427            for row in rows {
428                match row
429                    .get(col_idx)
430                    .ok_or_else(|| ExecutorError::BulkLoad("row too short".into()))?
431                {
432                    SqlValue::Null => {
433                        validity.push(false);
434                        values.push(0);
435                    }
436                    SqlValue::Integer(v) => {
437                        validity.push(true);
438                        values.push(*v as i64);
439                    }
440                    SqlValue::BigInt(v) => {
441                        validity.push(true);
442                        values.push(*v);
443                    }
444                    other => {
445                        return Err(ExecutorError::BulkLoad(format!(
446                            "type mismatch for column '{}': expected Integer, got {}",
447                            col_meta.name,
448                            other.type_name()
449                        )));
450                    }
451                }
452            }
453            Ok((Column::Int64(values), validity_bitmap(&validity)))
454        }
455        ResolvedType::BigInt | ResolvedType::Timestamp => {
456            let mut validity = Vec::with_capacity(rows.len());
457            let mut values = Vec::with_capacity(rows.len());
458            for row in rows {
459                match row
460                    .get(col_idx)
461                    .ok_or_else(|| ExecutorError::BulkLoad("row too short".into()))?
462                {
463                    SqlValue::Null => {
464                        validity.push(false);
465                        values.push(0);
466                    }
467                    SqlValue::BigInt(v) | SqlValue::Timestamp(v) => {
468                        validity.push(true);
469                        values.push(*v);
470                    }
471                    SqlValue::Integer(v) => {
472                        validity.push(true);
473                        values.push(*v as i64);
474                    }
475                    other => {
476                        return Err(ExecutorError::BulkLoad(format!(
477                            "type mismatch for column '{}': expected BigInt/Timestamp, got {}",
478                            col_meta.name,
479                            other.type_name()
480                        )));
481                    }
482                }
483            }
484            Ok((Column::Int64(values), validity_bitmap(&validity)))
485        }
486        ResolvedType::Float => {
487            let mut validity = Vec::with_capacity(rows.len());
488            let mut values = Vec::with_capacity(rows.len());
489            for row in rows {
490                match row
491                    .get(col_idx)
492                    .ok_or_else(|| ExecutorError::BulkLoad("row too short".into()))?
493                {
494                    SqlValue::Null => {
495                        validity.push(false);
496                        values.push(0.0);
497                    }
498                    SqlValue::Float(v) => {
499                        validity.push(true);
500                        values.push(*v);
501                    }
502                    other => {
503                        return Err(ExecutorError::BulkLoad(format!(
504                            "type mismatch for column '{}': expected Float, got {}",
505                            col_meta.name,
506                            other.type_name()
507                        )));
508                    }
509                }
510            }
511            Ok((Column::Float32(values), validity_bitmap(&validity)))
512        }
513        ResolvedType::Double => {
514            let mut validity = Vec::with_capacity(rows.len());
515            let mut values = Vec::with_capacity(rows.len());
516            for row in rows {
517                match row
518                    .get(col_idx)
519                    .ok_or_else(|| ExecutorError::BulkLoad("row too short".into()))?
520                {
521                    SqlValue::Null => {
522                        validity.push(false);
523                        values.push(0.0);
524                    }
525                    SqlValue::Double(v) => {
526                        validity.push(true);
527                        values.push(*v);
528                    }
529                    other => {
530                        return Err(ExecutorError::BulkLoad(format!(
531                            "type mismatch for column '{}': expected Double, got {}",
532                            col_meta.name,
533                            other.type_name()
534                        )));
535                    }
536                }
537            }
538            Ok((Column::Float64(values), validity_bitmap(&validity)))
539        }
540        ResolvedType::Boolean => {
541            let mut validity = Vec::with_capacity(rows.len());
542            let mut values = Vec::with_capacity(rows.len());
543            for row in rows {
544                match row
545                    .get(col_idx)
546                    .ok_or_else(|| ExecutorError::BulkLoad("row too short".into()))?
547                {
548                    SqlValue::Null => {
549                        validity.push(false);
550                        values.push(false);
551                    }
552                    SqlValue::Boolean(v) => {
553                        validity.push(true);
554                        values.push(*v);
555                    }
556                    other => {
557                        return Err(ExecutorError::BulkLoad(format!(
558                            "type mismatch for column '{}': expected Boolean, got {}",
559                            col_meta.name,
560                            other.type_name()
561                        )));
562                    }
563                }
564            }
565            Ok((Column::Bool(values), validity_bitmap(&validity)))
566        }
567        ResolvedType::Text => {
568            let mut validity = Vec::with_capacity(rows.len());
569            let mut values = Vec::with_capacity(rows.len());
570            for row in rows {
571                match row
572                    .get(col_idx)
573                    .ok_or_else(|| ExecutorError::BulkLoad("row too short".into()))?
574                {
575                    SqlValue::Null => {
576                        validity.push(false);
577                        values.push(Vec::new());
578                    }
579                    SqlValue::Text(v) => {
580                        validity.push(true);
581                        values.push(v.as_bytes().to_vec());
582                    }
583                    other => {
584                        return Err(ExecutorError::BulkLoad(format!(
585                            "type mismatch for column '{}': expected Text, got {}",
586                            col_meta.name,
587                            other.type_name()
588                        )));
589                    }
590                }
591            }
592            Ok((Column::Binary(values), validity_bitmap(&validity)))
593        }
594        ResolvedType::Blob => {
595            let mut validity = Vec::with_capacity(rows.len());
596            let mut values = Vec::with_capacity(rows.len());
597            for row in rows {
598                match row
599                    .get(col_idx)
600                    .ok_or_else(|| ExecutorError::BulkLoad("row too short".into()))?
601                {
602                    SqlValue::Null => {
603                        validity.push(false);
604                        values.push(Vec::new());
605                    }
606                    SqlValue::Blob(v) => {
607                        validity.push(true);
608                        values.push(v.clone());
609                    }
610                    other => {
611                        return Err(ExecutorError::BulkLoad(format!(
612                            "type mismatch for column '{}': expected Blob, got {}",
613                            col_meta.name,
614                            other.type_name()
615                        )));
616                    }
617                }
618            }
619            Ok((Column::Binary(values), validity_bitmap(&validity)))
620        }
621        ResolvedType::Vector { dimension, .. } => {
622            let fixed_len = dimension.saturating_mul(4) as usize;
623            let mut validity = Vec::with_capacity(rows.len());
624            let mut values = Vec::with_capacity(rows.len());
625            for row in rows {
626                match row
627                    .get(col_idx)
628                    .ok_or_else(|| ExecutorError::BulkLoad("row too short".into()))?
629                {
630                    SqlValue::Null => {
631                        validity.push(false);
632                        values.push(vec![0u8; fixed_len]);
633                    }
634                    SqlValue::Vector(v) => {
635                        if v.len() as u32 != *dimension {
636                            return Err(ExecutorError::BulkLoad(format!(
637                                "vector dimension mismatch for column '{}': expected {}, got {}",
638                                col_meta.name,
639                                dimension,
640                                v.len()
641                            )));
642                        }
643                        validity.push(true);
644                        let mut buf = Vec::with_capacity(fixed_len);
645                        for f in v {
646                            buf.extend_from_slice(&f.to_le_bytes());
647                        }
648                        values.push(buf);
649                    }
650                    other => {
651                        return Err(ExecutorError::BulkLoad(format!(
652                            "type mismatch for column '{}': expected Vector, got {}",
653                            col_meta.name,
654                            other.type_name()
655                        )));
656                    }
657                }
658            }
659            Ok((
660                Column::Fixed {
661                    len: fixed_len,
662                    values,
663                },
664                validity_bitmap(&validity),
665            ))
666        }
667        ResolvedType::Null => Err(ExecutorError::Columnar(
668            "NULL column type is not supported for columnar storage".into(),
669        )),
670    }
671}
672
673fn persist_segment<S: KVStore>(
674    txn: &mut SqlTransaction<'_, S>,
675    table: &TableMetadata,
676    mut segment: ColumnSegmentV2,
677    row_group_stats: &[crate::columnar::statistics::RowGroupStatistics],
678) -> Result<u64> {
679    if row_group_stats.len() != segment.meta.row_groups.len() {
680        return Err(ExecutorError::Columnar(
681            "row group statistics length mismatch".into(),
682        ));
683    }
684
685    let table_id = table.table_id;
686    let index_key = key_layout::segment_index_key(table_id);
687    let existing = txn.inner_mut().get(&index_key)?;
688    let mut index: Vec<u64> = if let Some(bytes) = existing {
689        bincode_config()
690            .deserialize(&bytes)
691            .map_err(|e| ExecutorError::Columnar(e.to_string()))?
692    } else {
693        Vec::new()
694    };
695    let segment_id = index
696        .last()
697        .copied()
698        .map(|id| id.saturating_add(1))
699        .unwrap_or(0);
700
701    let mut row_group_stats = row_group_stats.to_vec();
702    if table.storage_options.row_id_mode == RowIdMode::Direct {
703        let total_rows = usize::try_from(segment.meta.num_rows)
704            .map_err(|_| ExecutorError::Columnar("segment row count exceeds usize::MAX".into()))?;
705        segment.row_ids = (0..total_rows)
706            .map(|idx| {
707                alopex_core::columnar::segment_v2::encode_row_id(segment_id, idx as u64)
708                    .map_err(|e| ExecutorError::Columnar(e.to_string()))
709            })
710            .collect::<Result<Vec<u64>>>()?;
711
712        for (idx, meta) in segment.meta.row_groups.iter().enumerate() {
713            let start = usize::try_from(meta.row_start)
714                .map_err(|_| ExecutorError::Columnar("row_start exceeds usize::MAX".into()))?;
715            let count = usize::try_from(meta.row_count)
716                .map_err(|_| ExecutorError::Columnar("row_count exceeds usize::MAX".into()))?;
717            if count == 0 {
718                continue;
719            }
720            let end = start
721                .checked_add(count)
722                .ok_or_else(|| ExecutorError::Columnar("row_id range overflow".into()))?;
723            if end > segment.row_ids.len() {
724                return Err(ExecutorError::Columnar(
725                    "row_ids length is smaller than row_group range".into(),
726                ));
727            }
728            row_group_stats[idx].row_id_min = segment.row_ids.get(start).copied();
729            row_group_stats[idx].row_id_max = segment.row_ids.get(end - 1).copied();
730        }
731    } else {
732        segment.row_ids.clear();
733    }
734
735    let segment_bytes = bincode_config()
736        .serialize(&segment)
737        .map_err(|e| ExecutorError::Columnar(e.to_string()))?;
738    txn.inner_mut().put(
739        key_layout::column_segment_key(table_id, segment_id, 0),
740        segment_bytes,
741    )?;
742
743    let meta_bytes = bincode_config()
744        .serialize(&segment.meta)
745        .map_err(|e| ExecutorError::Columnar(e.to_string()))?;
746    txn.inner_mut()
747        .put(key_layout::statistics_key(table_id, segment_id), meta_bytes)?;
748
749    let rg_bytes = bincode_config()
750        .serialize(&row_group_stats)
751        .map_err(|e| ExecutorError::Columnar(e.to_string()))?;
752    txn.inner_mut().put(
753        key_layout::row_group_stats_key(table_id, segment_id),
754        rg_bytes,
755    )?;
756
757    index.push(segment_id);
758    let index_bytes = bincode_config()
759        .serialize(&index)
760        .map_err(|e| ExecutorError::Columnar(e.to_string()))?;
761    txn.inner_mut().put(index_key, index_bytes)?;
762    Ok(segment_id)
763}
764
765/// テキストをテーブル型に合わせて `SqlValue` へ変換する。
766pub(crate) fn parse_value(raw: &str, ty: &ResolvedType) -> Result<SqlValue> {
767    let trimmed = raw.trim();
768    if trimmed.eq_ignore_ascii_case("null") {
769        return Ok(SqlValue::Null);
770    }
771
772    match ty {
773        ResolvedType::Integer => trimmed
774            .parse::<i32>()
775            .map(SqlValue::Integer)
776            .map_err(|e| parse_error(trimmed, ty, e)),
777        ResolvedType::BigInt => trimmed
778            .parse::<i64>()
779            .map(SqlValue::BigInt)
780            .map_err(|e| parse_error(trimmed, ty, e)),
781        ResolvedType::Float => trimmed
782            .parse::<f32>()
783            .map(SqlValue::Float)
784            .map_err(|e| parse_error(trimmed, ty, e)),
785        ResolvedType::Double => trimmed
786            .parse::<f64>()
787            .map(SqlValue::Double)
788            .map_err(|e| parse_error(trimmed, ty, e)),
789        ResolvedType::Boolean => {
790            let parsed = trimmed
791                .parse::<bool>()
792                .or(match trimmed {
793                    "1" => Ok(true),
794                    "0" => Ok(false),
795                    _ => Err(()),
796                })
797                .map_err(|_| {
798                    ExecutorError::BulkLoad(format!(
799                        "failed to parse value '{trimmed}' as {}: invalid boolean",
800                        ty.type_name()
801                    ))
802                })?;
803            Ok(SqlValue::Boolean(parsed))
804        }
805        ResolvedType::Timestamp => trimmed
806            .parse::<i64>()
807            .map(SqlValue::Timestamp)
808            .map_err(|e| parse_error(trimmed, ty, e)),
809        ResolvedType::Text => Ok(SqlValue::Text(trimmed.to_string())),
810        ResolvedType::Blob => Ok(SqlValue::Blob(trimmed.as_bytes().to_vec())),
811        ResolvedType::Vector { dimension, .. } => {
812            let body = trimmed.trim_matches(['[', ']']);
813            if body.is_empty() {
814                return Err(ExecutorError::BulkLoad(
815                    "vector literal cannot be empty".into(),
816                ));
817            }
818            let mut values = Vec::new();
819            for part in body.split(',') {
820                let v = part
821                    .trim()
822                    .parse::<f32>()
823                    .map_err(|e| ExecutorError::BulkLoad(format!("invalid vector value: {e}")))?;
824                values.push(v);
825            }
826            if values.len() as u32 != *dimension {
827                return Err(ExecutorError::BulkLoad(format!(
828                    "vector dimension mismatch: expected {}, got {}",
829                    dimension,
830                    values.len()
831                )));
832            }
833            Ok(SqlValue::Vector(values))
834        }
835        ResolvedType::Null => Ok(SqlValue::Null),
836    }
837}
838
839fn parse_error(trimmed: &str, ty: &ResolvedType, err: impl std::fmt::Display) -> ExecutorError {
840    ExecutorError::BulkLoad(format!(
841        "failed to parse value '{trimmed}' as {}: {err}",
842        ty.type_name()
843    ))
844}
845
846fn is_type_compatible(file_type: &ResolvedType, table_type: &ResolvedType) -> bool {
847    match (file_type, table_type) {
848        (
849            ResolvedType::Vector {
850                dimension: f_dim,
851                metric: f_metric,
852            },
853            ResolvedType::Vector {
854                dimension: t_dim,
855                metric: t_metric,
856            },
857        ) => f_dim == t_dim && f_metric == t_metric,
858        (ft, tt) => ft == tt || ft.can_cast_to(tt),
859    }
860}
861
862fn map_storage_error(table: &TableMetadata, err: StorageError) -> ExecutorError {
863    match err {
864        StorageError::NullConstraintViolation { column } => {
865            ExecutorError::ConstraintViolation(crate::executor::ConstraintViolation::NotNull {
866                column,
867            })
868        }
869        StorageError::PrimaryKeyViolation { .. } => {
870            ExecutorError::ConstraintViolation(crate::executor::ConstraintViolation::PrimaryKey {
871                columns: table.primary_key.clone().unwrap_or_default(),
872                value: None,
873            })
874        }
875        StorageError::TransactionConflict => ExecutorError::TransactionConflict,
876        other => ExecutorError::Storage(other),
877    }
878}
879
880fn map_index_error(index: &IndexMetadata, err: StorageError) -> ExecutorError {
881    match err {
882        StorageError::UniqueViolation { .. } => {
883            if index.name.starts_with("__pk_") {
884                ExecutorError::ConstraintViolation(
885                    crate::executor::ConstraintViolation::PrimaryKey {
886                        columns: index.columns.clone(),
887                        value: None,
888                    },
889                )
890            } else {
891                ExecutorError::ConstraintViolation(crate::executor::ConstraintViolation::Unique {
892                    index_name: index.name.clone(),
893                    columns: index.columns.clone(),
894                    value: None,
895                })
896            }
897        }
898        StorageError::NullConstraintViolation { column } => {
899            ExecutorError::ConstraintViolation(crate::executor::ConstraintViolation::NotNull {
900                column,
901            })
902        }
903        StorageError::TransactionConflict => ExecutorError::TransactionConflict,
904        other => ExecutorError::Storage(other),
905    }
906}
907
908fn populate_indexes<S: KVStore>(
909    txn: &mut SqlTransaction<'_, S>,
910    indexes: &[IndexMetadata],
911    rows: &[(u64, Vec<SqlValue>)],
912) -> Result<()> {
913    for index in indexes {
914        let mut storage =
915            txn.index_storage(index.index_id, index.unique, index.column_indices.clone());
916        for (row_id, row) in rows {
917            if should_skip_unique_index_for_null(index, row) {
918                continue;
919            }
920            storage
921                .insert(row, *row_id)
922                .map_err(|e| map_index_error(index, e))?;
923        }
924    }
925    Ok(())
926}
927
928fn populate_hnsw_indexes<S: KVStore>(
929    txn: &mut SqlTransaction<'_, S>,
930    table: &TableMetadata,
931    indexes: &[IndexMetadata],
932    rows: &[(u64, Vec<SqlValue>)],
933) -> Result<()> {
934    for index in indexes {
935        for (row_id, row) in rows {
936            HnswBridge::on_insert(txn, table, index, *row_id, row)?;
937        }
938    }
939    Ok(())
940}
941
942fn should_skip_unique_index_for_null(index: &IndexMetadata, row: &[SqlValue]) -> bool {
943    index.unique
944        && index
945            .column_indices
946            .iter()
947            .any(|&idx| row.get(idx).is_none_or(SqlValue::is_null))
948}
949
950#[cfg(test)]
951mod tests {
952    use super::*;
953    use crate::catalog::{ColumnMetadata, MemoryCatalog, StorageType};
954    use crate::executor::ddl::create_table::execute_create_table;
955    use crate::planner::types::ResolvedType;
956    use crate::storage::TxnBridge;
957    use ::parquet::arrow::ArrowWriter;
958    use alopex_core::kv::memory::MemoryKV;
959    use arrow_array::{Int32Array, RecordBatch, StringArray};
960    use arrow_schema::{DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema};
961    use std::fs::File;
962    use std::io::Write;
963    use std::path::Path;
964    use std::sync::Arc;
965
966    fn bridge() -> (TxnBridge<MemoryKV>, MemoryCatalog) {
967        (
968            TxnBridge::new(Arc::new(MemoryKV::new())),
969            MemoryCatalog::new(),
970        )
971    }
972
973    fn create_table(
974        bridge: &TxnBridge<MemoryKV>,
975        catalog: &mut MemoryCatalog,
976        storage: StorageType,
977    ) {
978        let mut table = TableMetadata::new(
979            "users",
980            vec![
981                ColumnMetadata::new("id", ResolvedType::Integer).with_primary_key(true),
982                ColumnMetadata::new("name", ResolvedType::Text),
983            ],
984        )
985        .with_primary_key(vec!["id".into()]);
986        table.storage_options.storage_type = storage;
987
988        let mut txn = bridge.begin_write().unwrap();
989        execute_create_table(&mut txn, catalog, table, vec![], false).unwrap();
990        txn.commit().unwrap();
991    }
992
993    #[test]
994    fn validate_file_path_rejects_symlink_and_directory() {
995        let dir = std::env::temp_dir();
996        let dir_path = dir.join("alopex_copy_dir");
997        std::fs::create_dir_all(&dir_path).unwrap();
998
999        let config = CopySecurityConfig {
1000            allowed_base_dirs: Some(vec![dir.clone()]),
1001            allow_symlinks: false,
1002        };
1003
1004        // Directory is rejected.
1005        let err = validate_file_path(dir_path.to_str().unwrap(), &config).unwrap_err();
1006        assert!(matches!(err, ExecutorError::PathValidationFailed { .. }));
1007
1008        // Symlink is rejected on unix.
1009        #[cfg(unix)]
1010        {
1011            use std::os::unix::fs::symlink;
1012            let file_path = dir.join("alopex_copy_file.txt");
1013            fs::write(&file_path, "1,alice\n").unwrap();
1014            let link = dir.join("alopex_copy_link.txt");
1015            let _ = fs::remove_file(&link);
1016            symlink(&file_path, &link).unwrap();
1017            let err = validate_file_path(link.to_str().unwrap(), &config).unwrap_err();
1018            assert!(matches!(err, ExecutorError::PathValidationFailed { .. }));
1019        }
1020    }
1021
1022    #[test]
1023    fn validate_schema_checks_names_and_types() {
1024        let (bridge, mut catalog) = bridge();
1025        create_table(&bridge, &mut catalog, StorageType::Row);
1026        let table = catalog.get_table("users").unwrap();
1027
1028        let schema = CopySchema {
1029            fields: vec![
1030                CopyField {
1031                    name: Some("users".into()),
1032                    data_type: Some(ResolvedType::Integer),
1033                },
1034                CopyField {
1035                    name: Some("name".into()),
1036                    data_type: Some(ResolvedType::Text),
1037                },
1038            ],
1039        };
1040
1041        let err = validate_schema(&schema, table).unwrap_err();
1042        assert!(matches!(err, ExecutorError::SchemaMismatch { .. }));
1043    }
1044
1045    #[test]
1046    fn execute_copy_csv_inserts_rows() {
1047        let dir = std::env::temp_dir();
1048        let file_path = dir.join("alopex_copy_test.csv");
1049        let mut file = File::create(&file_path).unwrap();
1050        writeln!(file, "id,name").unwrap();
1051        writeln!(file, "1,alice").unwrap();
1052        writeln!(file, "2,bob").unwrap();
1053
1054        let (bridge, mut catalog) = bridge();
1055        create_table(&bridge, &mut catalog, StorageType::Row);
1056
1057        let mut txn = bridge.begin_write().unwrap();
1058        let result = execute_copy(
1059            &mut txn,
1060            &catalog,
1061            "users",
1062            file_path.to_str().unwrap(),
1063            FileFormat::Csv,
1064            CopyOptions { header: true },
1065            &CopySecurityConfig::default(),
1066        )
1067        .unwrap();
1068        txn.commit().unwrap();
1069        assert_eq!(result, ExecutionResult::RowsAffected(2));
1070
1071        // Verify rows inserted.
1072        let table = catalog.get_table("users").unwrap().clone();
1073        let mut read_txn = bridge.begin_read().unwrap();
1074        let mut storage = read_txn.table_storage(&table);
1075        let rows: Vec<_> = storage.scan().unwrap().map(|r| r.unwrap().1).collect();
1076        assert_eq!(rows.len(), 2);
1077        assert!(rows.contains(&vec![SqlValue::Integer(1), SqlValue::Text("alice".into())]));
1078    }
1079
1080    #[test]
1081    fn execute_copy_parquet_reads_schema_and_rows() {
1082        let dir = std::env::temp_dir();
1083        let file_path = dir.join("alopex_copy_test.parquet");
1084        write_parquet_sample(&file_path, 2);
1085
1086        let (bridge, mut catalog) = bridge();
1087        create_table(&bridge, &mut catalog, StorageType::Row);
1088
1089        let mut txn = bridge.begin_write().unwrap();
1090        let result = execute_copy(
1091            &mut txn,
1092            &catalog,
1093            "users",
1094            file_path.to_str().unwrap(),
1095            FileFormat::Parquet,
1096            CopyOptions::default(),
1097            &CopySecurityConfig::default(),
1098        )
1099        .unwrap();
1100        txn.commit().unwrap();
1101        assert_eq!(result, ExecutionResult::RowsAffected(2));
1102
1103        // スキーマは Parquet から取得するため、テーブル側と不一致なら validate_schema が弾く。
1104        let table = catalog.get_table("users").unwrap().clone();
1105        let mut read_txn = bridge.begin_read().unwrap();
1106        let mut storage = read_txn.table_storage(&table);
1107        let rows: Vec<_> = storage.scan().unwrap().map(|r| r.unwrap().1).collect();
1108        assert_eq!(rows.len(), 2);
1109        assert!(rows.contains(&vec![SqlValue::Integer(1), SqlValue::Text("user0".into())]));
1110    }
1111
1112    #[test]
1113    fn parquet_reader_streams_batches() {
1114        let dir = std::env::temp_dir();
1115        let file_path = dir.join("alopex_copy_stream.parquet");
1116        write_parquet_sample(&file_path, 1500);
1117
1118        let (bridge, mut catalog) = bridge();
1119        create_table(&bridge, &mut catalog, StorageType::Row);
1120        let table = catalog.get_table("users").unwrap().clone();
1121
1122        let mut reader = ParquetReader::open(file_path.to_str().unwrap(), &table, false).unwrap();
1123        let mut batches = 0;
1124        let mut total = 0;
1125        while let Some(batch) = reader.next_batch(512).unwrap() {
1126            total += batch.len();
1127            batches += 1;
1128        }
1129        assert!(
1130            batches >= 2,
1131            "複数バッチを期待しましたが {batches} バッチでした"
1132        );
1133        assert_eq!(total, 1500);
1134    }
1135
1136    fn write_parquet_sample(path: &Path, count: usize) {
1137        let schema = Arc::new(ArrowSchema::new(vec![
1138            ArrowField::new("id", ArrowDataType::Int32, false),
1139            ArrowField::new("name", ArrowDataType::Utf8, false),
1140        ]));
1141
1142        let file = File::create(path).unwrap();
1143        let mut writer = ArrowWriter::try_new(file, schema.clone(), None).unwrap();
1144
1145        let chunk_size = 700;
1146        let mut start = 0;
1147        while start < count {
1148            let end = (start + chunk_size).min(count);
1149            let ids: Vec<i32> = ((start + 1) as i32..=end as i32).collect();
1150            let names: Vec<String> = (start..end).map(|i| format!("user{i}")).collect();
1151
1152            let batch = RecordBatch::try_new(
1153                schema.clone(),
1154                vec![
1155                    Arc::new(Int32Array::from(ids)) as Arc<_>,
1156                    Arc::new(StringArray::from(names)) as Arc<_>,
1157                ],
1158            )
1159            .unwrap();
1160            writer.write(&batch).unwrap();
1161            start = end;
1162        }
1163
1164        writer.close().unwrap();
1165    }
1166}