Skip to main content

floe_core/
add_entity.rs

1use std::collections::{BTreeMap, BTreeSet};
2use std::fs;
3use std::io::{BufRead, BufReader};
4use std::path::{Path, PathBuf};
5
6use polars::prelude::{ArrowDataType, DataType, SerReader};
7use serde::Serialize;
8use serde_json::Value as JsonValue;
9use serde_yaml::Value as YamlValue;
10use tempfile::NamedTempFile;
11use url::Url;
12
13use crate::config;
14use crate::errors::IoError;
15use crate::{ConfigError, FloeResult, ValidateOptions};
16
17const DEFAULT_SAMPLE_ROWS: usize = 200;
18const MINIMAL_CONFIG_YAML: &str = "version: \"0.2\"\nentities: []\n";
19
20#[derive(Debug, Clone)]
21pub struct AddEntityOptions {
22    pub config_path: PathBuf,
23    pub output_path: Option<PathBuf>,
24    pub input: String,
25    pub format: Option<String>,
26    pub name: Option<String>,
27    pub domain: Option<String>,
28    pub dry_run: bool,
29}
30
31#[derive(Debug, Clone)]
32pub struct AddEntityOutcome {
33    pub entity_name: String,
34    pub output_path: PathBuf,
35    pub column_count: usize,
36    pub format: String,
37    pub dry_run: bool,
38    pub rendered_yaml: Option<String>,
39    pub notes: Vec<String>,
40}
41
42#[derive(Debug, Clone)]
43struct InferredEntity {
44    name: String,
45    format: String,
46    input: String,
47    domain: Option<String>,
48    source_options: Option<GeneratedSourceOptions>,
49    columns: Vec<GeneratedColumn>,
50    notes: Vec<String>,
51}
52
53#[derive(Debug, Clone, Serialize)]
54struct GeneratedEntityYaml {
55    name: String,
56    #[serde(skip_serializing_if = "Option::is_none")]
57    domain: Option<String>,
58    source: GeneratedSourceYaml,
59    sink: GeneratedSinkYaml,
60    policy: GeneratedPolicyYaml,
61    schema: GeneratedSchemaYaml,
62}
63
64#[derive(Debug, Clone, Serialize)]
65struct GeneratedSourceYaml {
66    format: String,
67    path: String,
68    #[serde(skip_serializing_if = "Option::is_none")]
69    options: Option<GeneratedSourceOptions>,
70}
71
72#[derive(Debug, Clone, Serialize)]
73struct GeneratedSourceOptions {
74    #[serde(skip_serializing_if = "Option::is_none")]
75    separator: Option<String>,
76    #[serde(skip_serializing_if = "Option::is_none")]
77    json_mode: Option<String>,
78}
79
80#[derive(Debug, Clone, Serialize)]
81struct GeneratedSinkYaml {
82    accepted: GeneratedSinkTargetYaml,
83    rejected: GeneratedSinkTargetYaml,
84}
85
86#[derive(Debug, Clone, Serialize)]
87struct GeneratedSinkTargetYaml {
88    format: String,
89    path: String,
90}
91
92#[derive(Debug, Clone, Serialize)]
93struct GeneratedPolicyYaml {
94    severity: String,
95}
96
97#[derive(Debug, Clone, Serialize)]
98struct GeneratedSchemaYaml {
99    mismatch: GeneratedMismatchYaml,
100    columns: Vec<GeneratedColumn>,
101}
102
103#[derive(Debug, Clone, Serialize)]
104struct GeneratedMismatchYaml {
105    missing_columns: String,
106    extra_columns: String,
107}
108
109#[derive(Debug, Clone, Serialize)]
110struct GeneratedColumn {
111    name: String,
112    source: String,
113    #[serde(rename = "type")]
114    column_type: String,
115    nullable: bool,
116    unique: bool,
117}
118
119#[derive(Debug, Clone, Copy, PartialEq, Eq)]
120enum JsonMode {
121    Array,
122    Ndjson,
123}
124
125impl JsonMode {
126    fn as_str(self) -> &'static str {
127        match self {
128            JsonMode::Array => "array",
129            JsonMode::Ndjson => "ndjson",
130        }
131    }
132}
133
134#[derive(Debug, Clone, Copy, PartialEq, Eq)]
135enum InferredScalarType {
136    String,
137    Boolean,
138    Number,
139    Date,
140    Time,
141    DateTime,
142}
143
144#[derive(Debug, Default, Clone)]
145struct JsonColumnStats {
146    kind: Option<InferredScalarType>,
147    non_null_count: usize,
148    nullable: bool,
149}
150
151pub fn add_entity_to_config(options: AddEntityOptions) -> FloeResult<AddEntityOutcome> {
152    let inferred = infer_entity_from_input(
153        &options.input,
154        options.format.as_deref(),
155        options.name.as_deref(),
156        options.domain.as_deref(),
157    )?;
158
159    let output_path = options
160        .output_path
161        .clone()
162        .unwrap_or_else(|| options.config_path.clone());
163    let config_text = load_or_initialize_config_text(&options.config_path)?;
164
165    let updated_yaml = append_entity_yaml(&config_text, &inferred)?;
166    if !options.dry_run {
167        if let Some(parent) = output_path.parent() {
168            if !parent.as_os_str().is_empty() {
169                fs::create_dir_all(parent)?;
170            }
171        }
172    }
173    validate_generated_config(
174        &updated_yaml,
175        if options.dry_run {
176            &options.config_path
177        } else {
178            &output_path
179        },
180    )?;
181
182    if !options.dry_run {
183        fs::write(&output_path, updated_yaml.as_bytes()).map_err(|err| {
184            Box::new(IoError(format!(
185                "failed to write config at {}: {err}",
186                output_path.display()
187            ))) as Box<dyn std::error::Error + Send + Sync>
188        })?;
189    }
190
191    Ok(AddEntityOutcome {
192        entity_name: inferred.name,
193        output_path,
194        column_count: inferred.columns.len(),
195        format: inferred.format,
196        dry_run: options.dry_run,
197        rendered_yaml: options.dry_run.then_some(updated_yaml),
198        notes: inferred.notes,
199    })
200}
201
202fn infer_entity_from_input(
203    input: &str,
204    format_override: Option<&str>,
205    name_override: Option<&str>,
206    domain: Option<&str>,
207) -> FloeResult<InferredEntity> {
208    let entity_name = match name_override {
209        Some(value) => value.to_string(),
210        None => derive_entity_name(input)?,
211    };
212    if entity_name.trim().is_empty() {
213        return Err(Box::new(ConfigError(
214            "entity name must not be empty".to_string(),
215        )));
216    }
217
218    let local_input_path = resolve_local_input_path(input)?;
219    let format = resolve_add_entity_format(format_override, input, &local_input_path)?;
220    let persisted_input_path = normalize_persisted_input_path(input, &local_input_path);
221    let (source_options, columns, notes) = match format.as_str() {
222        "csv" => infer_csv_columns(&local_input_path)?,
223        "json" => infer_json_columns(&local_input_path)?,
224        "parquet" => infer_parquet_columns(&local_input_path)?,
225        other => {
226            return Err(Box::new(ConfigError(format!(
227                "unsupported add-entity format: {other} (allowed: csv, json, parquet)"
228            ))))
229        }
230    };
231
232    if columns.is_empty() {
233        return Err(Box::new(ConfigError(format!(
234            "no columns inferred from {}",
235            local_input_path.display()
236        ))));
237    }
238
239    Ok(InferredEntity {
240        name: entity_name,
241        format,
242        input: persisted_input_path,
243        domain: domain.map(ToString::to_string),
244        source_options,
245        columns,
246        notes,
247    })
248}
249
250fn load_or_initialize_config_text(config_path: &Path) -> FloeResult<String> {
251    match fs::read_to_string(config_path) {
252        Ok(text) => Ok(text),
253        Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
254            Ok(MINIMAL_CONFIG_YAML.to_string())
255        }
256        Err(err) => Err(Box::new(IoError(format!(
257            "failed to read config at {}: {err}",
258            config_path.display()
259        )))),
260    }
261}
262
263fn resolve_add_entity_format(
264    format_override: Option<&str>,
265    input: &str,
266    local_input_path: &Path,
267) -> FloeResult<String> {
268    if let Some(format) = format_override {
269        return Ok(format.to_string());
270    }
271
272    infer_format_from_extension(local_input_path).ok_or_else(|| {
273        Box::new(ConfigError(format!(
274            "could not infer add-entity format from input {} (supported extensions: .csv, .json, .parquet); use --format",
275            input
276        ))) as Box<dyn std::error::Error + Send + Sync>
277    })
278}
279
280fn infer_format_from_extension(path: &Path) -> Option<String> {
281    let ext = path.extension()?.to_string_lossy().to_ascii_lowercase();
282    match ext.as_str() {
283        "csv" => Some("csv".to_string()),
284        "json" => Some("json".to_string()),
285        "parquet" => Some("parquet".to_string()),
286        _ => None,
287    }
288}
289
290fn infer_csv_columns(
291    input_path: &Path,
292) -> FloeResult<(
293    Option<GeneratedSourceOptions>,
294    Vec<GeneratedColumn>,
295    Vec<String>,
296)> {
297    let separator = sniff_delimiter(input_path).unwrap_or(',');
298    let separator_str = separator.to_string();
299
300    let mut source_options = config::SourceOptions::defaults_for_format("csv");
301    source_options.separator = Some(separator.to_string());
302    source_options.header = Some(true);
303
304    let reader = source_options
305        .to_csv_read_options(input_path)?
306        .with_n_rows(Some(DEFAULT_SAMPLE_ROWS))
307        .with_infer_schema_length(Some(DEFAULT_SAMPLE_ROWS))
308        .try_into_reader_with_file_path(None)
309        .map_err(|err| {
310            Box::new(IoError(format!(
311                "failed to open csv at {}: {err}",
312                input_path.display()
313            ))) as Box<dyn std::error::Error + Send + Sync>
314        })?;
315    let df = reader.finish().map_err(|err| {
316        Box::new(IoError(format!("csv schema inference failed: {err}")))
317            as Box<dyn std::error::Error + Send + Sync>
318    })?;
319
320    let mut columns = Vec::with_capacity(df.width());
321    for column in df.get_columns() {
322        let name = column.name().as_str().to_string();
323        columns.push(GeneratedColumn {
324            name: name.clone(),
325            source: name,
326            column_type: polars_dtype_to_floe_type(column.dtype()),
327            nullable: df.height() == 0 || column.null_count() > 0,
328            unique: false,
329        });
330    }
331
332    Ok((
333        Some(GeneratedSourceOptions {
334            separator: Some(separator_str),
335            json_mode: None,
336        }),
337        columns,
338        Vec::new(),
339    ))
340}
341
342fn infer_parquet_columns(
343    input_path: &Path,
344) -> FloeResult<(
345    Option<GeneratedSourceOptions>,
346    Vec<GeneratedColumn>,
347    Vec<String>,
348)> {
349    let file = std::fs::File::open(input_path).map_err(|err| {
350        Box::new(IoError(format!(
351            "failed to open parquet at {}: {err}",
352            input_path.display()
353        ))) as Box<dyn std::error::Error + Send + Sync>
354    })?;
355    let mut reader = polars::prelude::ParquetReader::new(file);
356    let schema = reader.schema().map_err(|err| {
357        Box::new(IoError(format!(
358            "failed to read parquet schema at {}: {err}",
359            input_path.display()
360        ))) as Box<dyn std::error::Error + Send + Sync>
361    })?;
362
363    let columns = schema
364        .iter()
365        .map(|(name, field)| GeneratedColumn {
366            name: name.to_string(),
367            source: name.to_string(),
368            column_type: arrow_dtype_to_floe_type(field.dtype()),
369            nullable: true,
370            unique: false,
371        })
372        .collect();
373
374    Ok((None, columns, Vec::new()))
375}
376
377fn infer_json_columns(
378    input_path: &Path,
379) -> FloeResult<(
380    Option<GeneratedSourceOptions>,
381    Vec<GeneratedColumn>,
382    Vec<String>,
383)> {
384    let json_mode = detect_json_mode(input_path)?;
385    let mut stats_by_key: BTreeMap<String, JsonColumnStats> = BTreeMap::new();
386    let mut sampled_rows = 0usize;
387    let mut nested_keys = BTreeSet::new();
388
389    match json_mode {
390        JsonMode::Array => {
391            let content = fs::read_to_string(input_path).map_err(|err| {
392                Box::new(IoError(format!(
393                    "failed to read json at {}: {err}",
394                    input_path.display()
395                ))) as Box<dyn std::error::Error + Send + Sync>
396            })?;
397            let value: JsonValue = serde_json::from_str(&content).map_err(|err| {
398                Box::new(IoError(format!("json parse error: {err}")))
399                    as Box<dyn std::error::Error + Send + Sync>
400            })?;
401            let rows = value.as_array().ok_or_else(|| {
402                Box::new(ConfigError("expected json array at root".to_string()))
403                    as Box<dyn std::error::Error + Send + Sync>
404            })?;
405            for row in rows.iter().take(DEFAULT_SAMPLE_ROWS) {
406                let object = row.as_object().ok_or_else(|| {
407                    Box::new(ConfigError(
408                        "expected top-level json objects inside array".to_string(),
409                    )) as Box<dyn std::error::Error + Send + Sync>
410                })?;
411                sampled_rows += 1;
412                update_json_stats(object, &mut stats_by_key, &mut nested_keys);
413            }
414        }
415        JsonMode::Ndjson => {
416            let file = std::fs::File::open(input_path).map_err(|err| {
417                Box::new(IoError(format!(
418                    "failed to read json at {}: {err}",
419                    input_path.display()
420                ))) as Box<dyn std::error::Error + Send + Sync>
421            })?;
422            let reader = BufReader::new(file);
423            for (idx, line) in reader.lines().enumerate() {
424                if sampled_rows >= DEFAULT_SAMPLE_ROWS {
425                    break;
426                }
427                let line = line.map_err(|err| {
428                    Box::new(IoError(format!(
429                        "failed to read json at {}: {err}",
430                        input_path.display()
431                    ))) as Box<dyn std::error::Error + Send + Sync>
432                })?;
433                let line = line.trim();
434                if line.is_empty() {
435                    continue;
436                }
437                let value: JsonValue = serde_json::from_str(line).map_err(|err| {
438                    Box::new(IoError(format!(
439                        "json parse error at line {}: {err}",
440                        idx + 1
441                    ))) as Box<dyn std::error::Error + Send + Sync>
442                })?;
443                let object = value.as_object().ok_or_else(|| {
444                    Box::new(ConfigError(format!(
445                        "expected top-level json object at line {}",
446                        idx + 1
447                    ))) as Box<dyn std::error::Error + Send + Sync>
448                })?;
449                sampled_rows += 1;
450                update_json_stats(object, &mut stats_by_key, &mut nested_keys);
451            }
452        }
453    }
454
455    if sampled_rows == 0 {
456        return Err(Box::new(ConfigError(format!(
457            "no json objects found in {}",
458            input_path.display()
459        ))));
460    }
461
462    let mut columns = Vec::with_capacity(stats_by_key.len());
463    for (key, stats) in &stats_by_key {
464        let nullable = stats.nullable || stats.non_null_count < sampled_rows;
465        columns.push(GeneratedColumn {
466            name: key.clone(),
467            source: key.clone(),
468            column_type: stats
469                .kind
470                .map(inferred_scalar_type_to_floe_type)
471                .unwrap_or("string")
472                .to_string(),
473            nullable,
474            unique: false,
475        });
476    }
477
478    let mut notes = Vec::new();
479    notes.push(format!(
480        "json inference sampled {} row(s) in {} mode",
481        sampled_rows,
482        json_mode.as_str()
483    ));
484    if !nested_keys.is_empty() {
485        notes.push(format!(
486            "nested JSON values inferred as string for top-level key(s): {}",
487            nested_keys.into_iter().collect::<Vec<_>>().join(", ")
488        ));
489    }
490
491    Ok((
492        Some(GeneratedSourceOptions {
493            separator: None,
494            json_mode: Some(json_mode.as_str().to_string()),
495        }),
496        columns,
497        notes,
498    ))
499}
500
501fn update_json_stats(
502    object: &serde_json::Map<String, JsonValue>,
503    stats_by_key: &mut BTreeMap<String, JsonColumnStats>,
504    nested_keys: &mut BTreeSet<String>,
505) {
506    for (key, value) in object {
507        let stats = stats_by_key.entry(key.clone()).or_default();
508        match json_value_kind(value) {
509            None => {
510                stats.nullable = true;
511            }
512            Some(kind) => {
513                stats.non_null_count += 1;
514                if matches!(value, JsonValue::Array(_) | JsonValue::Object(_)) {
515                    nested_keys.insert(key.clone());
516                }
517                stats.kind = Some(match (stats.kind, kind) {
518                    (None, next) => next,
519                    (Some(current), next) => merge_json_kinds(current, next),
520                });
521            }
522        }
523    }
524}
525
526fn json_value_kind(value: &JsonValue) -> Option<InferredScalarType> {
527    match value {
528        JsonValue::Null => None,
529        JsonValue::Bool(_) => Some(InferredScalarType::Boolean),
530        JsonValue::Number(_) => Some(InferredScalarType::Number),
531        JsonValue::String(value) => Some(infer_string_scalar_type(value)),
532        JsonValue::Array(_) | JsonValue::Object(_) => Some(InferredScalarType::String),
533    }
534}
535
536fn infer_string_scalar_type(value: &str) -> InferredScalarType {
537    let trimmed = value.trim();
538    if trimmed.is_empty() {
539        return InferredScalarType::String;
540    }
541    if looks_like_date_time(trimmed) {
542        return InferredScalarType::DateTime;
543    }
544    if looks_like_date(trimmed) {
545        return InferredScalarType::Date;
546    }
547    if looks_like_time(trimmed) {
548        return InferredScalarType::Time;
549    }
550    InferredScalarType::String
551}
552
553fn looks_like_date(value: &str) -> bool {
554    let bytes = value.as_bytes();
555    bytes.len() == 10
556        && bytes[4] == b'-'
557        && bytes[7] == b'-'
558        && bytes
559            .iter()
560            .enumerate()
561            .all(|(idx, b)| matches!(idx, 4 | 7) || b.is_ascii_digit())
562}
563
564fn looks_like_time(value: &str) -> bool {
565    let bytes = value.as_bytes();
566    if bytes.len() == 8 {
567        return bytes[2] == b':'
568            && bytes[5] == b':'
569            && bytes
570                .iter()
571                .enumerate()
572                .all(|(idx, b)| matches!(idx, 2 | 5) || b.is_ascii_digit());
573    }
574    if bytes.len() == 12 {
575        return bytes[2] == b':'
576            && bytes[5] == b':'
577            && bytes[8] == b'.'
578            && bytes
579                .iter()
580                .enumerate()
581                .all(|(idx, b)| matches!(idx, 2 | 5 | 8) || b.is_ascii_digit());
582    }
583    false
584}
585
586fn looks_like_date_time(value: &str) -> bool {
587    value.contains('T')
588        && (value.ends_with('Z') || value.contains('+') || value.matches(':').count() >= 2)
589}
590
591fn merge_json_kinds(current: InferredScalarType, next: InferredScalarType) -> InferredScalarType {
592    use InferredScalarType as T;
593    match (current, next) {
594        (T::String, _) | (_, T::String) => T::String,
595        (T::Number, T::Number) => T::Number,
596        (T::Boolean, T::Boolean) => T::Boolean,
597        (T::Date, T::Date) => T::Date,
598        (T::Time, T::Time) => T::Time,
599        (T::DateTime, T::DateTime) => T::DateTime,
600        (T::Date, T::DateTime) | (T::DateTime, T::Date) => T::DateTime,
601        _ => T::String,
602    }
603}
604
605fn inferred_scalar_type_to_floe_type(kind: InferredScalarType) -> &'static str {
606    match kind {
607        InferredScalarType::String => "string",
608        InferredScalarType::Boolean => "boolean",
609        InferredScalarType::Number => "number",
610        InferredScalarType::Date => "date",
611        InferredScalarType::Time => "time",
612        InferredScalarType::DateTime => "datetime",
613    }
614}
615
616fn detect_json_mode(path: &Path) -> FloeResult<JsonMode> {
617    let file = std::fs::File::open(path).map_err(|err| {
618        Box::new(IoError(format!(
619            "failed to read json at {}: {err}",
620            path.display()
621        ))) as Box<dyn std::error::Error + Send + Sync>
622    })?;
623    let mut reader = BufReader::new(file);
624    let mut buf = String::new();
625    while reader.read_line(&mut buf)? > 0 {
626        let trimmed = buf.trim_start();
627        if trimmed.is_empty() {
628            buf.clear();
629            continue;
630        }
631        return Ok(if trimmed.starts_with('[') {
632            JsonMode::Array
633        } else {
634            JsonMode::Ndjson
635        });
636    }
637    Err(Box::new(ConfigError(format!(
638        "json input {} is empty",
639        path.display()
640    ))))
641}
642
643fn polars_dtype_to_floe_type(dtype: &DataType) -> String {
644    match dtype {
645        DataType::Boolean => "boolean".to_string(),
646        DataType::Date => "date".to_string(),
647        DataType::Time => "time".to_string(),
648        DataType::Datetime(_, _) => "datetime".to_string(),
649        DataType::Int8
650        | DataType::Int16
651        | DataType::Int32
652        | DataType::Int64
653        | DataType::UInt8
654        | DataType::UInt16
655        | DataType::UInt32
656        | DataType::UInt64
657        | DataType::Float32
658        | DataType::Float64 => "number".to_string(),
659        _ => "string".to_string(),
660    }
661}
662
663fn arrow_dtype_to_floe_type(dtype: &ArrowDataType) -> String {
664    match dtype {
665        ArrowDataType::Boolean => "boolean".to_string(),
666        ArrowDataType::Date32 | ArrowDataType::Date64 => "date".to_string(),
667        ArrowDataType::Time32(_) | ArrowDataType::Time64(_) => "time".to_string(),
668        ArrowDataType::Timestamp(_, _) => "datetime".to_string(),
669        ArrowDataType::Int8
670        | ArrowDataType::Int16
671        | ArrowDataType::Int32
672        | ArrowDataType::Int64
673        | ArrowDataType::UInt8
674        | ArrowDataType::UInt16
675        | ArrowDataType::UInt32
676        | ArrowDataType::UInt64
677        | ArrowDataType::Float16
678        | ArrowDataType::Float32
679        | ArrowDataType::Float64
680        | ArrowDataType::Decimal(_, _)
681        | ArrowDataType::Decimal256(_, _) => "number".to_string(),
682        _ => "string".to_string(),
683    }
684}
685
686fn append_entity_yaml(config_text: &str, inferred: &InferredEntity) -> FloeResult<String> {
687    let mut root: YamlValue = serde_yaml::from_str(config_text)?;
688    let root_map = root.as_mapping_mut().ok_or_else(|| {
689        Box::new(ConfigError(
690            "config root must be a YAML mapping".to_string(),
691        )) as Box<dyn std::error::Error + Send + Sync>
692    })?;
693
694    let entities_key = YamlValue::String("entities".to_string());
695    let new_entity_value = serde_yaml::to_value(build_generated_entity_yaml(inferred))?;
696
697    match root_map.get_mut(&entities_key) {
698        Some(value) => {
699            let entities = value.as_sequence_mut().ok_or_else(|| {
700                Box::new(ConfigError(
701                    "root.entities must be a YAML sequence".to_string(),
702                )) as Box<dyn std::error::Error + Send + Sync>
703            })?;
704            if entities
705                .iter()
706                .any(|entity| entity_name_matches(entity, &inferred.name))
707            {
708                return Err(Box::new(ConfigError(format!(
709                    "entity already exists: {}",
710                    inferred.name
711                ))));
712            }
713            entities.push(new_entity_value);
714        }
715        None => {
716            root_map.insert(entities_key, YamlValue::Sequence(vec![new_entity_value]));
717        }
718    }
719
720    Ok(serde_yaml::to_string(&root)?)
721}
722
723fn entity_name_matches(value: &YamlValue, name: &str) -> bool {
724    value
725        .as_mapping()
726        .and_then(|map| map.get(YamlValue::String("name".to_string())))
727        .and_then(YamlValue::as_str)
728        == Some(name)
729}
730
731fn build_generated_entity_yaml(inferred: &InferredEntity) -> GeneratedEntityYaml {
732    let source_options = match inferred.format.as_str() {
733        "json" => inferred.source_options.clone(),
734        "csv" => inferred.source_options.clone(),
735        _ => None,
736    };
737
738    GeneratedEntityYaml {
739        name: inferred.name.clone(),
740        domain: inferred.domain.clone(),
741        source: GeneratedSourceYaml {
742            format: inferred.format.clone(),
743            path: inferred.input.clone(),
744            options: source_options,
745        },
746        sink: GeneratedSinkYaml {
747            accepted: GeneratedSinkTargetYaml {
748                format: "parquet".to_string(),
749                path: format!("out/accepted/{}/", inferred.name),
750            },
751            rejected: GeneratedSinkTargetYaml {
752                format: "csv".to_string(),
753                path: format!("out/rejected/{}/", inferred.name),
754            },
755        },
756        policy: GeneratedPolicyYaml {
757            severity: "reject".to_string(),
758        },
759        schema: GeneratedSchemaYaml {
760            mismatch: GeneratedMismatchYaml {
761                missing_columns: "reject_file".to_string(),
762                extra_columns: "ignore".to_string(),
763            },
764            columns: inferred.columns.clone(),
765        },
766    }
767}
768
769fn validate_generated_config(yaml: &str, target_path: &Path) -> FloeResult<()> {
770    let validation_parent = target_path.parent().unwrap_or_else(|| Path::new("."));
771    let mut temp_file =
772        NamedTempFile::new_in(validation_parent).or_else(|_| NamedTempFile::new())?;
773    use std::io::Write;
774    temp_file.write_all(yaml.as_bytes())?;
775    crate::validate(temp_file.path(), ValidateOptions::default())
776}
777
778fn resolve_local_input_path(input: &str) -> FloeResult<PathBuf> {
779    if let Some((scheme, _)) = input.split_once("://") {
780        if scheme.eq_ignore_ascii_case("file") {
781            let url = Url::parse(input)?;
782            return url.to_file_path().map_err(|_| {
783                Box::new(ConfigError(format!(
784                    "invalid file:// URI for add-entity input: {input}"
785                ))) as Box<dyn std::error::Error + Send + Sync>
786            });
787        }
788        return Err(Box::new(ConfigError(format!(
789            "remote URI inference is not supported yet for add-entity input: {input}"
790        ))));
791    }
792
793    let path = PathBuf::from(input);
794    let absolute = if path.is_absolute() {
795        path
796    } else {
797        std::env::current_dir()?.join(path)
798    };
799    let metadata = fs::metadata(&absolute).map_err(|err| {
800        Box::new(IoError(format!(
801            "failed to access input at {}: {err}",
802            absolute.display()
803        ))) as Box<dyn std::error::Error + Send + Sync>
804    })?;
805    if metadata.is_dir() {
806        return Err(Box::new(ConfigError(format!(
807            "add-entity expects a file input for schema inference, got directory: {}",
808            absolute.display()
809        ))));
810    }
811    Ok(absolute)
812}
813
814fn normalize_persisted_input_path(input: &str, local_input_path: &Path) -> String {
815    if let Some((scheme, _)) = input.split_once("://") {
816        if scheme.eq_ignore_ascii_case("file") {
817            return local_input_path.display().to_string();
818        }
819    }
820    input.to_string()
821}
822
823fn derive_entity_name(input: &str) -> FloeResult<String> {
824    let raw = if let Some((scheme, _)) = input.split_once("://") {
825        if scheme.eq_ignore_ascii_case("file") {
826            let url = Url::parse(input)?;
827            let path = url.to_file_path().map_err(|_| {
828                Box::new(ConfigError(format!(
829                    "invalid file:// URI for add-entity input: {input}"
830                ))) as Box<dyn std::error::Error + Send + Sync>
831            })?;
832            file_name_stem_string(&path)
833        } else {
834            let trimmed = input.trim_end_matches('/');
835            let last = trimmed.rsplit('/').next().unwrap_or(trimmed);
836            last.split('?').next().unwrap_or(last).to_string()
837        }
838    } else {
839        file_name_stem_string(Path::new(input))
840    };
841    let normalized = slugify_entity_name(&raw);
842    if normalized.is_empty() {
843        return Err(Box::new(ConfigError(format!(
844            "failed to derive entity name from input: {input}"
845        ))));
846    }
847    Ok(normalized)
848}
849
850fn file_name_stem_string(path: &Path) -> String {
851    path.file_stem()
852        .or_else(|| path.file_name())
853        .map(|name| name.to_string_lossy().to_string())
854        .unwrap_or_else(|| "entity".to_string())
855}
856
857fn slugify_entity_name(value: &str) -> String {
858    let mut out = String::new();
859    let mut last_was_sep = false;
860    for ch in value.chars() {
861        if ch.is_ascii_alphanumeric() {
862            out.push(ch.to_ascii_lowercase());
863            last_was_sep = false;
864        } else if !last_was_sep {
865            out.push('_');
866            last_was_sep = true;
867        }
868    }
869    out.trim_matches('_').to_string()
870}
871
872fn sniff_delimiter(path: &Path) -> Option<char> {
873    let file = std::fs::File::open(path).ok()?;
874    let reader = BufReader::new(file);
875    let mut lines = Vec::new();
876    for line in reader.lines().take(12) {
877        let line = line.ok()?;
878        let trimmed = line.trim();
879        if trimmed.is_empty() {
880            continue;
881        }
882        lines.push(trimmed.to_string());
883        if lines.len() >= 6 {
884            break;
885        }
886    }
887    if lines.is_empty() {
888        return None;
889    }
890
891    let candidates = [',', ';', '\t', '|'];
892    let mut best: Option<(i32, char)> = None;
893    for candidate in candidates {
894        let counts = lines
895            .iter()
896            .map(|line| line.chars().filter(|ch| *ch == candidate).count())
897            .collect::<Vec<_>>();
898        let first = *counts.first().unwrap_or(&0);
899        let min = *counts.iter().min().unwrap_or(&0);
900        let max = *counts.iter().max().unwrap_or(&0);
901        let sum = counts.iter().sum::<usize>();
902        if first == 0 || sum == 0 {
903            continue;
904        }
905        let consistency_bonus = if min > 0 && max == min { 10_000 } else { 0 };
906        let score = consistency_bonus + (min as i32 * 100) + sum as i32;
907        if best
908            .map(|(best_score, _)| score > best_score)
909            .unwrap_or(true)
910        {
911            best = Some((score, candidate));
912        }
913    }
914    best.map(|(_, candidate)| candidate)
915}