Skip to main content

floe_core/
add_entity.rs

1use std::collections::{BTreeMap, BTreeSet};
2use std::fs;
3use std::io::{BufRead, BufReader};
4use std::path::{Path, PathBuf};
5
6use polars::prelude::{ArrowDataType, DataType, SerReader};
7use serde::Serialize;
8use serde_json::Value as JsonValue;
9use serde_yaml::Value as YamlValue;
10use tempfile::NamedTempFile;
11use url::Url;
12
13use crate::config;
14use crate::errors::IoError;
15use crate::{ConfigError, FloeResult, ValidateOptions};
16
17const DEFAULT_SAMPLE_ROWS: usize = 200;
18
19#[derive(Debug, Clone)]
20pub struct AddEntityOptions {
21    pub config_path: PathBuf,
22    pub output_path: Option<PathBuf>,
23    pub input: String,
24    pub format: String,
25    pub name: Option<String>,
26    pub domain: Option<String>,
27    pub dry_run: bool,
28}
29
30#[derive(Debug, Clone)]
31pub struct AddEntityOutcome {
32    pub entity_name: String,
33    pub output_path: PathBuf,
34    pub column_count: usize,
35    pub format: String,
36    pub dry_run: bool,
37    pub rendered_yaml: Option<String>,
38    pub notes: Vec<String>,
39}
40
41#[derive(Debug, Clone)]
42struct InferredEntity {
43    name: String,
44    format: String,
45    input: String,
46    domain: Option<String>,
47    source_options: Option<GeneratedSourceOptions>,
48    columns: Vec<GeneratedColumn>,
49    notes: Vec<String>,
50}
51
52#[derive(Debug, Clone, Serialize)]
53struct GeneratedEntityYaml {
54    name: String,
55    #[serde(skip_serializing_if = "Option::is_none")]
56    domain: Option<String>,
57    source: GeneratedSourceYaml,
58    sink: GeneratedSinkYaml,
59    policy: GeneratedPolicyYaml,
60    schema: GeneratedSchemaYaml,
61}
62
63#[derive(Debug, Clone, Serialize)]
64struct GeneratedSourceYaml {
65    format: String,
66    path: String,
67    #[serde(skip_serializing_if = "Option::is_none")]
68    options: Option<GeneratedSourceOptions>,
69}
70
71#[derive(Debug, Clone, Serialize)]
72struct GeneratedSourceOptions {
73    #[serde(skip_serializing_if = "Option::is_none")]
74    separator: Option<String>,
75    #[serde(skip_serializing_if = "Option::is_none")]
76    json_mode: Option<String>,
77}
78
79#[derive(Debug, Clone, Serialize)]
80struct GeneratedSinkYaml {
81    accepted: GeneratedSinkTargetYaml,
82    rejected: GeneratedSinkTargetYaml,
83}
84
85#[derive(Debug, Clone, Serialize)]
86struct GeneratedSinkTargetYaml {
87    format: String,
88    path: String,
89}
90
91#[derive(Debug, Clone, Serialize)]
92struct GeneratedPolicyYaml {
93    severity: String,
94}
95
96#[derive(Debug, Clone, Serialize)]
97struct GeneratedSchemaYaml {
98    mismatch: GeneratedMismatchYaml,
99    columns: Vec<GeneratedColumn>,
100}
101
102#[derive(Debug, Clone, Serialize)]
103struct GeneratedMismatchYaml {
104    missing_columns: String,
105    extra_columns: String,
106}
107
108#[derive(Debug, Clone, Serialize)]
109struct GeneratedColumn {
110    name: String,
111    source: String,
112    #[serde(rename = "type")]
113    column_type: String,
114    nullable: bool,
115    unique: bool,
116}
117
118#[derive(Debug, Clone, Copy, PartialEq, Eq)]
119enum JsonMode {
120    Array,
121    Ndjson,
122}
123
124impl JsonMode {
125    fn as_str(self) -> &'static str {
126        match self {
127            JsonMode::Array => "array",
128            JsonMode::Ndjson => "ndjson",
129        }
130    }
131}
132
133#[derive(Debug, Clone, Copy, PartialEq, Eq)]
134enum InferredScalarType {
135    String,
136    Boolean,
137    Number,
138    Date,
139    Time,
140    DateTime,
141}
142
143#[derive(Debug, Default, Clone)]
144struct JsonColumnStats {
145    kind: Option<InferredScalarType>,
146    non_null_count: usize,
147    nullable: bool,
148}
149
150pub fn add_entity_to_config(options: AddEntityOptions) -> FloeResult<AddEntityOutcome> {
151    let inferred = infer_entity_from_input(
152        &options.input,
153        options.format.as_str(),
154        options.name.as_deref(),
155        options.domain.as_deref(),
156    )?;
157
158    let output_path = options
159        .output_path
160        .clone()
161        .unwrap_or_else(|| options.config_path.clone());
162    let config_text = fs::read_to_string(&options.config_path).map_err(|err| {
163        Box::new(IoError(format!(
164            "failed to read config at {}: {err}",
165            options.config_path.display()
166        ))) as Box<dyn std::error::Error + Send + Sync>
167    })?;
168
169    let updated_yaml = append_entity_yaml(&config_text, &inferred)?;
170    if !options.dry_run {
171        if let Some(parent) = output_path.parent() {
172            if !parent.as_os_str().is_empty() {
173                fs::create_dir_all(parent)?;
174            }
175        }
176    }
177    validate_generated_config(
178        &updated_yaml,
179        if options.dry_run {
180            &options.config_path
181        } else {
182            &output_path
183        },
184    )?;
185
186    if !options.dry_run {
187        fs::write(&output_path, updated_yaml.as_bytes()).map_err(|err| {
188            Box::new(IoError(format!(
189                "failed to write config at {}: {err}",
190                output_path.display()
191            ))) as Box<dyn std::error::Error + Send + Sync>
192        })?;
193    }
194
195    Ok(AddEntityOutcome {
196        entity_name: inferred.name,
197        output_path,
198        column_count: inferred.columns.len(),
199        format: inferred.format,
200        dry_run: options.dry_run,
201        rendered_yaml: options.dry_run.then_some(updated_yaml),
202        notes: inferred.notes,
203    })
204}
205
206fn infer_entity_from_input(
207    input: &str,
208    format: &str,
209    name_override: Option<&str>,
210    domain: Option<&str>,
211) -> FloeResult<InferredEntity> {
212    let entity_name = match name_override {
213        Some(value) => value.to_string(),
214        None => derive_entity_name(input)?,
215    };
216    if entity_name.trim().is_empty() {
217        return Err(Box::new(ConfigError(
218            "entity name must not be empty".to_string(),
219        )));
220    }
221
222    let local_input_path = resolve_local_input_path(input)?;
223    let persisted_input_path = normalize_persisted_input_path(input, &local_input_path);
224    let (source_options, columns, notes) = match format {
225        "csv" => infer_csv_columns(&local_input_path)?,
226        "json" => infer_json_columns(&local_input_path)?,
227        "parquet" => infer_parquet_columns(&local_input_path)?,
228        other => {
229            return Err(Box::new(ConfigError(format!(
230                "unsupported add-entity format: {other} (allowed: csv, json, parquet)"
231            ))))
232        }
233    };
234
235    if columns.is_empty() {
236        return Err(Box::new(ConfigError(format!(
237            "no columns inferred from {}",
238            local_input_path.display()
239        ))));
240    }
241
242    Ok(InferredEntity {
243        name: entity_name,
244        format: format.to_string(),
245        input: persisted_input_path,
246        domain: domain.map(ToString::to_string),
247        source_options,
248        columns,
249        notes,
250    })
251}
252
253fn infer_csv_columns(
254    input_path: &Path,
255) -> FloeResult<(
256    Option<GeneratedSourceOptions>,
257    Vec<GeneratedColumn>,
258    Vec<String>,
259)> {
260    let separator = sniff_delimiter(input_path).unwrap_or(',');
261    let separator_str = separator.to_string();
262
263    let mut source_options = config::SourceOptions::defaults_for_format("csv");
264    source_options.separator = Some(separator.to_string());
265    source_options.header = Some(true);
266
267    let reader = source_options
268        .to_csv_read_options(input_path)?
269        .with_n_rows(Some(DEFAULT_SAMPLE_ROWS))
270        .with_infer_schema_length(Some(DEFAULT_SAMPLE_ROWS))
271        .try_into_reader_with_file_path(None)
272        .map_err(|err| {
273            Box::new(IoError(format!(
274                "failed to open csv at {}: {err}",
275                input_path.display()
276            ))) as Box<dyn std::error::Error + Send + Sync>
277        })?;
278    let df = reader.finish().map_err(|err| {
279        Box::new(IoError(format!("csv schema inference failed: {err}")))
280            as Box<dyn std::error::Error + Send + Sync>
281    })?;
282
283    let mut columns = Vec::with_capacity(df.width());
284    for column in df.get_columns() {
285        let name = column.name().as_str().to_string();
286        columns.push(GeneratedColumn {
287            name: name.clone(),
288            source: name,
289            column_type: polars_dtype_to_floe_type(column.dtype()),
290            nullable: df.height() == 0 || column.null_count() > 0,
291            unique: false,
292        });
293    }
294
295    Ok((
296        Some(GeneratedSourceOptions {
297            separator: Some(separator_str),
298            json_mode: None,
299        }),
300        columns,
301        Vec::new(),
302    ))
303}
304
305fn infer_parquet_columns(
306    input_path: &Path,
307) -> FloeResult<(
308    Option<GeneratedSourceOptions>,
309    Vec<GeneratedColumn>,
310    Vec<String>,
311)> {
312    let file = std::fs::File::open(input_path).map_err(|err| {
313        Box::new(IoError(format!(
314            "failed to open parquet at {}: {err}",
315            input_path.display()
316        ))) as Box<dyn std::error::Error + Send + Sync>
317    })?;
318    let mut reader = polars::prelude::ParquetReader::new(file);
319    let schema = reader.schema().map_err(|err| {
320        Box::new(IoError(format!(
321            "failed to read parquet schema at {}: {err}",
322            input_path.display()
323        ))) as Box<dyn std::error::Error + Send + Sync>
324    })?;
325
326    let columns = schema
327        .iter()
328        .map(|(name, field)| GeneratedColumn {
329            name: name.to_string(),
330            source: name.to_string(),
331            column_type: arrow_dtype_to_floe_type(field.dtype()),
332            nullable: true,
333            unique: false,
334        })
335        .collect();
336
337    Ok((None, columns, Vec::new()))
338}
339
340fn infer_json_columns(
341    input_path: &Path,
342) -> FloeResult<(
343    Option<GeneratedSourceOptions>,
344    Vec<GeneratedColumn>,
345    Vec<String>,
346)> {
347    let json_mode = detect_json_mode(input_path)?;
348    let mut stats_by_key: BTreeMap<String, JsonColumnStats> = BTreeMap::new();
349    let mut sampled_rows = 0usize;
350    let mut nested_keys = BTreeSet::new();
351
352    match json_mode {
353        JsonMode::Array => {
354            let content = fs::read_to_string(input_path).map_err(|err| {
355                Box::new(IoError(format!(
356                    "failed to read json at {}: {err}",
357                    input_path.display()
358                ))) as Box<dyn std::error::Error + Send + Sync>
359            })?;
360            let value: JsonValue = serde_json::from_str(&content).map_err(|err| {
361                Box::new(IoError(format!("json parse error: {err}")))
362                    as Box<dyn std::error::Error + Send + Sync>
363            })?;
364            let rows = value.as_array().ok_or_else(|| {
365                Box::new(ConfigError("expected json array at root".to_string()))
366                    as Box<dyn std::error::Error + Send + Sync>
367            })?;
368            for row in rows.iter().take(DEFAULT_SAMPLE_ROWS) {
369                let object = row.as_object().ok_or_else(|| {
370                    Box::new(ConfigError(
371                        "expected top-level json objects inside array".to_string(),
372                    )) as Box<dyn std::error::Error + Send + Sync>
373                })?;
374                sampled_rows += 1;
375                update_json_stats(object, &mut stats_by_key, &mut nested_keys);
376            }
377        }
378        JsonMode::Ndjson => {
379            let file = std::fs::File::open(input_path).map_err(|err| {
380                Box::new(IoError(format!(
381                    "failed to read json at {}: {err}",
382                    input_path.display()
383                ))) as Box<dyn std::error::Error + Send + Sync>
384            })?;
385            let reader = BufReader::new(file);
386            for (idx, line) in reader.lines().enumerate() {
387                if sampled_rows >= DEFAULT_SAMPLE_ROWS {
388                    break;
389                }
390                let line = line.map_err(|err| {
391                    Box::new(IoError(format!(
392                        "failed to read json at {}: {err}",
393                        input_path.display()
394                    ))) as Box<dyn std::error::Error + Send + Sync>
395                })?;
396                let line = line.trim();
397                if line.is_empty() {
398                    continue;
399                }
400                let value: JsonValue = serde_json::from_str(line).map_err(|err| {
401                    Box::new(IoError(format!(
402                        "json parse error at line {}: {err}",
403                        idx + 1
404                    ))) as Box<dyn std::error::Error + Send + Sync>
405                })?;
406                let object = value.as_object().ok_or_else(|| {
407                    Box::new(ConfigError(format!(
408                        "expected top-level json object at line {}",
409                        idx + 1
410                    ))) as Box<dyn std::error::Error + Send + Sync>
411                })?;
412                sampled_rows += 1;
413                update_json_stats(object, &mut stats_by_key, &mut nested_keys);
414            }
415        }
416    }
417
418    if sampled_rows == 0 {
419        return Err(Box::new(ConfigError(format!(
420            "no json objects found in {}",
421            input_path.display()
422        ))));
423    }
424
425    let mut columns = Vec::with_capacity(stats_by_key.len());
426    for (key, stats) in &stats_by_key {
427        let nullable = stats.nullable || stats.non_null_count < sampled_rows;
428        columns.push(GeneratedColumn {
429            name: key.clone(),
430            source: key.clone(),
431            column_type: stats
432                .kind
433                .map(inferred_scalar_type_to_floe_type)
434                .unwrap_or("string")
435                .to_string(),
436            nullable,
437            unique: false,
438        });
439    }
440
441    let mut notes = Vec::new();
442    notes.push(format!(
443        "json inference sampled {} row(s) in {} mode",
444        sampled_rows,
445        json_mode.as_str()
446    ));
447    if !nested_keys.is_empty() {
448        notes.push(format!(
449            "nested JSON values inferred as string for top-level key(s): {}",
450            nested_keys.into_iter().collect::<Vec<_>>().join(", ")
451        ));
452    }
453
454    Ok((
455        Some(GeneratedSourceOptions {
456            separator: None,
457            json_mode: Some(json_mode.as_str().to_string()),
458        }),
459        columns,
460        notes,
461    ))
462}
463
464fn update_json_stats(
465    object: &serde_json::Map<String, JsonValue>,
466    stats_by_key: &mut BTreeMap<String, JsonColumnStats>,
467    nested_keys: &mut BTreeSet<String>,
468) {
469    for (key, value) in object {
470        let stats = stats_by_key.entry(key.clone()).or_default();
471        match json_value_kind(value) {
472            None => {
473                stats.nullable = true;
474            }
475            Some(kind) => {
476                stats.non_null_count += 1;
477                if matches!(value, JsonValue::Array(_) | JsonValue::Object(_)) {
478                    nested_keys.insert(key.clone());
479                }
480                stats.kind = Some(match (stats.kind, kind) {
481                    (None, next) => next,
482                    (Some(current), next) => merge_json_kinds(current, next),
483                });
484            }
485        }
486    }
487}
488
489fn json_value_kind(value: &JsonValue) -> Option<InferredScalarType> {
490    match value {
491        JsonValue::Null => None,
492        JsonValue::Bool(_) => Some(InferredScalarType::Boolean),
493        JsonValue::Number(_) => Some(InferredScalarType::Number),
494        JsonValue::String(value) => Some(infer_string_scalar_type(value)),
495        JsonValue::Array(_) | JsonValue::Object(_) => Some(InferredScalarType::String),
496    }
497}
498
499fn infer_string_scalar_type(value: &str) -> InferredScalarType {
500    let trimmed = value.trim();
501    if trimmed.is_empty() {
502        return InferredScalarType::String;
503    }
504    if looks_like_date_time(trimmed) {
505        return InferredScalarType::DateTime;
506    }
507    if looks_like_date(trimmed) {
508        return InferredScalarType::Date;
509    }
510    if looks_like_time(trimmed) {
511        return InferredScalarType::Time;
512    }
513    InferredScalarType::String
514}
515
516fn looks_like_date(value: &str) -> bool {
517    let bytes = value.as_bytes();
518    bytes.len() == 10
519        && bytes[4] == b'-'
520        && bytes[7] == b'-'
521        && bytes
522            .iter()
523            .enumerate()
524            .all(|(idx, b)| matches!(idx, 4 | 7) || b.is_ascii_digit())
525}
526
527fn looks_like_time(value: &str) -> bool {
528    let bytes = value.as_bytes();
529    if bytes.len() == 8 {
530        return bytes[2] == b':'
531            && bytes[5] == b':'
532            && bytes
533                .iter()
534                .enumerate()
535                .all(|(idx, b)| matches!(idx, 2 | 5) || b.is_ascii_digit());
536    }
537    if bytes.len() == 12 {
538        return bytes[2] == b':'
539            && bytes[5] == b':'
540            && bytes[8] == b'.'
541            && bytes
542                .iter()
543                .enumerate()
544                .all(|(idx, b)| matches!(idx, 2 | 5 | 8) || b.is_ascii_digit());
545    }
546    false
547}
548
549fn looks_like_date_time(value: &str) -> bool {
550    value.contains('T')
551        && (value.ends_with('Z') || value.contains('+') || value.matches(':').count() >= 2)
552}
553
554fn merge_json_kinds(current: InferredScalarType, next: InferredScalarType) -> InferredScalarType {
555    use InferredScalarType as T;
556    match (current, next) {
557        (T::String, _) | (_, T::String) => T::String,
558        (T::Number, T::Number) => T::Number,
559        (T::Boolean, T::Boolean) => T::Boolean,
560        (T::Date, T::Date) => T::Date,
561        (T::Time, T::Time) => T::Time,
562        (T::DateTime, T::DateTime) => T::DateTime,
563        (T::Date, T::DateTime) | (T::DateTime, T::Date) => T::DateTime,
564        _ => T::String,
565    }
566}
567
568fn inferred_scalar_type_to_floe_type(kind: InferredScalarType) -> &'static str {
569    match kind {
570        InferredScalarType::String => "string",
571        InferredScalarType::Boolean => "boolean",
572        InferredScalarType::Number => "number",
573        InferredScalarType::Date => "date",
574        InferredScalarType::Time => "time",
575        InferredScalarType::DateTime => "datetime",
576    }
577}
578
579fn detect_json_mode(path: &Path) -> FloeResult<JsonMode> {
580    let file = std::fs::File::open(path).map_err(|err| {
581        Box::new(IoError(format!(
582            "failed to read json at {}: {err}",
583            path.display()
584        ))) as Box<dyn std::error::Error + Send + Sync>
585    })?;
586    let mut reader = BufReader::new(file);
587    let mut buf = String::new();
588    while reader.read_line(&mut buf)? > 0 {
589        let trimmed = buf.trim_start();
590        if trimmed.is_empty() {
591            buf.clear();
592            continue;
593        }
594        return Ok(if trimmed.starts_with('[') {
595            JsonMode::Array
596        } else {
597            JsonMode::Ndjson
598        });
599    }
600    Err(Box::new(ConfigError(format!(
601        "json input {} is empty",
602        path.display()
603    ))))
604}
605
606fn polars_dtype_to_floe_type(dtype: &DataType) -> String {
607    match dtype {
608        DataType::Boolean => "boolean".to_string(),
609        DataType::Date => "date".to_string(),
610        DataType::Time => "time".to_string(),
611        DataType::Datetime(_, _) => "datetime".to_string(),
612        DataType::Int8
613        | DataType::Int16
614        | DataType::Int32
615        | DataType::Int64
616        | DataType::UInt8
617        | DataType::UInt16
618        | DataType::UInt32
619        | DataType::UInt64
620        | DataType::Float32
621        | DataType::Float64 => "number".to_string(),
622        _ => "string".to_string(),
623    }
624}
625
626fn arrow_dtype_to_floe_type(dtype: &ArrowDataType) -> String {
627    match dtype {
628        ArrowDataType::Boolean => "boolean".to_string(),
629        ArrowDataType::Date32 | ArrowDataType::Date64 => "date".to_string(),
630        ArrowDataType::Time32(_) | ArrowDataType::Time64(_) => "time".to_string(),
631        ArrowDataType::Timestamp(_, _) => "datetime".to_string(),
632        ArrowDataType::Int8
633        | ArrowDataType::Int16
634        | ArrowDataType::Int32
635        | ArrowDataType::Int64
636        | ArrowDataType::UInt8
637        | ArrowDataType::UInt16
638        | ArrowDataType::UInt32
639        | ArrowDataType::UInt64
640        | ArrowDataType::Float16
641        | ArrowDataType::Float32
642        | ArrowDataType::Float64
643        | ArrowDataType::Decimal(_, _)
644        | ArrowDataType::Decimal256(_, _) => "number".to_string(),
645        _ => "string".to_string(),
646    }
647}
648
649fn append_entity_yaml(config_text: &str, inferred: &InferredEntity) -> FloeResult<String> {
650    let mut root: YamlValue = serde_yaml::from_str(config_text)?;
651    let root_map = root.as_mapping_mut().ok_or_else(|| {
652        Box::new(ConfigError(
653            "config root must be a YAML mapping".to_string(),
654        )) as Box<dyn std::error::Error + Send + Sync>
655    })?;
656
657    let entities_key = YamlValue::String("entities".to_string());
658    let new_entity_value = serde_yaml::to_value(build_generated_entity_yaml(inferred))?;
659
660    match root_map.get_mut(&entities_key) {
661        Some(value) => {
662            let entities = value.as_sequence_mut().ok_or_else(|| {
663                Box::new(ConfigError(
664                    "root.entities must be a YAML sequence".to_string(),
665                )) as Box<dyn std::error::Error + Send + Sync>
666            })?;
667            if entities
668                .iter()
669                .any(|entity| entity_name_matches(entity, &inferred.name))
670            {
671                return Err(Box::new(ConfigError(format!(
672                    "entity already exists: {}",
673                    inferred.name
674                ))));
675            }
676            entities.push(new_entity_value);
677        }
678        None => {
679            root_map.insert(entities_key, YamlValue::Sequence(vec![new_entity_value]));
680        }
681    }
682
683    Ok(serde_yaml::to_string(&root)?)
684}
685
686fn entity_name_matches(value: &YamlValue, name: &str) -> bool {
687    value
688        .as_mapping()
689        .and_then(|map| map.get(YamlValue::String("name".to_string())))
690        .and_then(YamlValue::as_str)
691        == Some(name)
692}
693
694fn build_generated_entity_yaml(inferred: &InferredEntity) -> GeneratedEntityYaml {
695    let source_options = match inferred.format.as_str() {
696        "json" => inferred.source_options.clone(),
697        "csv" => inferred.source_options.clone(),
698        _ => None,
699    };
700
701    GeneratedEntityYaml {
702        name: inferred.name.clone(),
703        domain: inferred.domain.clone(),
704        source: GeneratedSourceYaml {
705            format: inferred.format.clone(),
706            path: inferred.input.clone(),
707            options: source_options,
708        },
709        sink: GeneratedSinkYaml {
710            accepted: GeneratedSinkTargetYaml {
711                format: "parquet".to_string(),
712                path: format!("out/accepted/{}/", inferred.name),
713            },
714            rejected: GeneratedSinkTargetYaml {
715                format: "csv".to_string(),
716                path: format!("out/rejected/{}/", inferred.name),
717            },
718        },
719        policy: GeneratedPolicyYaml {
720            severity: "reject".to_string(),
721        },
722        schema: GeneratedSchemaYaml {
723            mismatch: GeneratedMismatchYaml {
724                missing_columns: "reject_file".to_string(),
725                extra_columns: "ignore".to_string(),
726            },
727            columns: inferred.columns.clone(),
728        },
729    }
730}
731
732fn validate_generated_config(yaml: &str, target_path: &Path) -> FloeResult<()> {
733    let validation_parent = target_path.parent().unwrap_or_else(|| Path::new("."));
734    let mut temp_file =
735        NamedTempFile::new_in(validation_parent).or_else(|_| NamedTempFile::new())?;
736    use std::io::Write;
737    temp_file.write_all(yaml.as_bytes())?;
738    crate::validate(temp_file.path(), ValidateOptions::default())
739}
740
741fn resolve_local_input_path(input: &str) -> FloeResult<PathBuf> {
742    if let Some((scheme, _)) = input.split_once("://") {
743        if scheme.eq_ignore_ascii_case("file") {
744            let url = Url::parse(input)?;
745            return url.to_file_path().map_err(|_| {
746                Box::new(ConfigError(format!(
747                    "invalid file:// URI for add-entity input: {input}"
748                ))) as Box<dyn std::error::Error + Send + Sync>
749            });
750        }
751        return Err(Box::new(ConfigError(format!(
752            "remote URI inference is not supported yet for add-entity input: {input}"
753        ))));
754    }
755
756    let path = PathBuf::from(input);
757    let absolute = if path.is_absolute() {
758        path
759    } else {
760        std::env::current_dir()?.join(path)
761    };
762    let metadata = fs::metadata(&absolute).map_err(|err| {
763        Box::new(IoError(format!(
764            "failed to access input at {}: {err}",
765            absolute.display()
766        ))) as Box<dyn std::error::Error + Send + Sync>
767    })?;
768    if metadata.is_dir() {
769        return Err(Box::new(ConfigError(format!(
770            "add-entity expects a file input for schema inference, got directory: {}",
771            absolute.display()
772        ))));
773    }
774    Ok(absolute)
775}
776
777fn normalize_persisted_input_path(input: &str, local_input_path: &Path) -> String {
778    if let Some((scheme, _)) = input.split_once("://") {
779        if scheme.eq_ignore_ascii_case("file") {
780            return local_input_path.display().to_string();
781        }
782    }
783    input.to_string()
784}
785
786fn derive_entity_name(input: &str) -> FloeResult<String> {
787    let raw = if let Some((scheme, _)) = input.split_once("://") {
788        if scheme.eq_ignore_ascii_case("file") {
789            let url = Url::parse(input)?;
790            let path = url.to_file_path().map_err(|_| {
791                Box::new(ConfigError(format!(
792                    "invalid file:// URI for add-entity input: {input}"
793                ))) as Box<dyn std::error::Error + Send + Sync>
794            })?;
795            file_name_stem_string(&path)
796        } else {
797            let trimmed = input.trim_end_matches('/');
798            let last = trimmed.rsplit('/').next().unwrap_or(trimmed);
799            last.split('?').next().unwrap_or(last).to_string()
800        }
801    } else {
802        file_name_stem_string(Path::new(input))
803    };
804    let normalized = slugify_entity_name(&raw);
805    if normalized.is_empty() {
806        return Err(Box::new(ConfigError(format!(
807            "failed to derive entity name from input: {input}"
808        ))));
809    }
810    Ok(normalized)
811}
812
813fn file_name_stem_string(path: &Path) -> String {
814    path.file_stem()
815        .or_else(|| path.file_name())
816        .map(|name| name.to_string_lossy().to_string())
817        .unwrap_or_else(|| "entity".to_string())
818}
819
820fn slugify_entity_name(value: &str) -> String {
821    let mut out = String::new();
822    let mut last_was_sep = false;
823    for ch in value.chars() {
824        if ch.is_ascii_alphanumeric() {
825            out.push(ch.to_ascii_lowercase());
826            last_was_sep = false;
827        } else if !last_was_sep {
828            out.push('_');
829            last_was_sep = true;
830        }
831    }
832    out.trim_matches('_').to_string()
833}
834
835fn sniff_delimiter(path: &Path) -> Option<char> {
836    let file = std::fs::File::open(path).ok()?;
837    let reader = BufReader::new(file);
838    let mut lines = Vec::new();
839    for line in reader.lines().take(12) {
840        let line = line.ok()?;
841        let trimmed = line.trim();
842        if trimmed.is_empty() {
843            continue;
844        }
845        lines.push(trimmed.to_string());
846        if lines.len() >= 6 {
847            break;
848        }
849    }
850    if lines.is_empty() {
851        return None;
852    }
853
854    let candidates = [',', ';', '\t', '|'];
855    let mut best: Option<(i32, char)> = None;
856    for candidate in candidates {
857        let counts = lines
858            .iter()
859            .map(|line| line.chars().filter(|ch| *ch == candidate).count())
860            .collect::<Vec<_>>();
861        let first = *counts.first().unwrap_or(&0);
862        let min = *counts.iter().min().unwrap_or(&0);
863        let max = *counts.iter().max().unwrap_or(&0);
864        let sum = counts.iter().sum::<usize>();
865        if first == 0 || sum == 0 {
866            continue;
867        }
868        let consistency_bonus = if min > 0 && max == min { 10_000 } else { 0 };
869        let score = consistency_bonus + (min as i32 * 100) + sum as i32;
870        if best
871            .map(|(best_score, _)| score > best_score)
872            .unwrap_or(true)
873        {
874            best = Some((score, candidate));
875        }
876    }
877    best.map(|(_, candidate)| candidate)
878}