1use std::collections::{BTreeMap, BTreeSet};
2use std::fs;
3use std::io::{BufRead, BufReader};
4use std::path::{Path, PathBuf};
5
6use polars::prelude::{ArrowDataType, DataType, SerReader};
7use serde::Serialize;
8use serde_json::Value as JsonValue;
9use serde_yaml::Value as YamlValue;
10use tempfile::NamedTempFile;
11use url::Url;
12
13use crate::config;
14use crate::errors::IoError;
15use crate::{ConfigError, FloeResult, ValidateOptions};
16
17const DEFAULT_SAMPLE_ROWS: usize = 200;
18const MINIMAL_CONFIG_YAML: &str = "version: \"0.2\"\nentities: []\n";
19
20#[derive(Debug, Clone)]
21pub struct AddEntityOptions {
22 pub config_path: PathBuf,
23 pub output_path: Option<PathBuf>,
24 pub input: String,
25 pub format: Option<String>,
26 pub name: Option<String>,
27 pub domain: Option<String>,
28 pub dry_run: bool,
29}
30
31#[derive(Debug, Clone)]
32pub struct AddEntityOutcome {
33 pub entity_name: String,
34 pub output_path: PathBuf,
35 pub column_count: usize,
36 pub format: String,
37 pub dry_run: bool,
38 pub rendered_yaml: Option<String>,
39 pub notes: Vec<String>,
40}
41
42#[derive(Debug, Clone)]
43struct InferredEntity {
44 name: String,
45 format: String,
46 input: String,
47 domain: Option<String>,
48 source_options: Option<GeneratedSourceOptions>,
49 columns: Vec<GeneratedColumn>,
50 notes: Vec<String>,
51}
52
53#[derive(Debug, Clone, Serialize)]
54struct GeneratedEntityYaml {
55 name: String,
56 #[serde(skip_serializing_if = "Option::is_none")]
57 domain: Option<String>,
58 source: GeneratedSourceYaml,
59 sink: GeneratedSinkYaml,
60 policy: GeneratedPolicyYaml,
61 schema: GeneratedSchemaYaml,
62}
63
64#[derive(Debug, Clone, Serialize)]
65struct GeneratedSourceYaml {
66 format: String,
67 path: String,
68 #[serde(skip_serializing_if = "Option::is_none")]
69 options: Option<GeneratedSourceOptions>,
70}
71
72#[derive(Debug, Clone, Serialize)]
73struct GeneratedSourceOptions {
74 #[serde(skip_serializing_if = "Option::is_none")]
75 separator: Option<String>,
76 #[serde(skip_serializing_if = "Option::is_none")]
77 json_mode: Option<String>,
78}
79
80#[derive(Debug, Clone, Serialize)]
81struct GeneratedSinkYaml {
82 accepted: GeneratedSinkTargetYaml,
83 rejected: GeneratedSinkTargetYaml,
84}
85
86#[derive(Debug, Clone, Serialize)]
87struct GeneratedSinkTargetYaml {
88 format: String,
89 path: String,
90}
91
92#[derive(Debug, Clone, Serialize)]
93struct GeneratedPolicyYaml {
94 severity: String,
95}
96
97#[derive(Debug, Clone, Serialize)]
98struct GeneratedSchemaYaml {
99 mismatch: GeneratedMismatchYaml,
100 columns: Vec<GeneratedColumn>,
101}
102
103#[derive(Debug, Clone, Serialize)]
104struct GeneratedMismatchYaml {
105 missing_columns: String,
106 extra_columns: String,
107}
108
109#[derive(Debug, Clone, Serialize)]
110struct GeneratedColumn {
111 name: String,
112 source: String,
113 #[serde(rename = "type")]
114 column_type: String,
115 nullable: bool,
116 unique: bool,
117}
118
119#[derive(Debug, Clone, Copy, PartialEq, Eq)]
120enum JsonMode {
121 Array,
122 Ndjson,
123}
124
125impl JsonMode {
126 fn as_str(self) -> &'static str {
127 match self {
128 JsonMode::Array => "array",
129 JsonMode::Ndjson => "ndjson",
130 }
131 }
132}
133
134#[derive(Debug, Clone, Copy, PartialEq, Eq)]
135enum InferredScalarType {
136 String,
137 Boolean,
138 Number,
139 Date,
140 Time,
141 DateTime,
142}
143
144#[derive(Debug, Default, Clone)]
145struct JsonColumnStats {
146 kind: Option<InferredScalarType>,
147 non_null_count: usize,
148 nullable: bool,
149}
150
151pub fn add_entity_to_config(options: AddEntityOptions) -> FloeResult<AddEntityOutcome> {
152 let inferred = infer_entity_from_input(
153 &options.input,
154 options.format.as_deref(),
155 options.name.as_deref(),
156 options.domain.as_deref(),
157 )?;
158
159 let output_path = options
160 .output_path
161 .clone()
162 .unwrap_or_else(|| options.config_path.clone());
163 let config_text = load_or_initialize_config_text(&options.config_path)?;
164
165 let updated_yaml = append_entity_yaml(&config_text, &inferred)?;
166 if !options.dry_run {
167 if let Some(parent) = output_path.parent() {
168 if !parent.as_os_str().is_empty() {
169 fs::create_dir_all(parent)?;
170 }
171 }
172 }
173 validate_generated_config(
174 &updated_yaml,
175 if options.dry_run {
176 &options.config_path
177 } else {
178 &output_path
179 },
180 )?;
181
182 if !options.dry_run {
183 fs::write(&output_path, updated_yaml.as_bytes()).map_err(|err| {
184 Box::new(IoError(format!(
185 "failed to write config at {}: {err}",
186 output_path.display()
187 ))) as Box<dyn std::error::Error + Send + Sync>
188 })?;
189 }
190
191 Ok(AddEntityOutcome {
192 entity_name: inferred.name,
193 output_path,
194 column_count: inferred.columns.len(),
195 format: inferred.format,
196 dry_run: options.dry_run,
197 rendered_yaml: options.dry_run.then_some(updated_yaml),
198 notes: inferred.notes,
199 })
200}
201
202fn infer_entity_from_input(
203 input: &str,
204 format_override: Option<&str>,
205 name_override: Option<&str>,
206 domain: Option<&str>,
207) -> FloeResult<InferredEntity> {
208 let entity_name = match name_override {
209 Some(value) => value.to_string(),
210 None => derive_entity_name(input)?,
211 };
212 if entity_name.trim().is_empty() {
213 return Err(Box::new(ConfigError(
214 "entity name must not be empty".to_string(),
215 )));
216 }
217
218 let local_input_path = resolve_local_input_path(input)?;
219 let format = resolve_add_entity_format(format_override, input, &local_input_path)?;
220 let persisted_input_path = normalize_persisted_input_path(input, &local_input_path);
221 let (source_options, columns, notes) = match format.as_str() {
222 "csv" => infer_csv_columns(&local_input_path)?,
223 "json" => infer_json_columns(&local_input_path)?,
224 "parquet" => infer_parquet_columns(&local_input_path)?,
225 other => {
226 return Err(Box::new(ConfigError(format!(
227 "unsupported add-entity format: {other} (allowed: csv, json, parquet)"
228 ))))
229 }
230 };
231
232 if columns.is_empty() {
233 return Err(Box::new(ConfigError(format!(
234 "no columns inferred from {}",
235 local_input_path.display()
236 ))));
237 }
238
239 Ok(InferredEntity {
240 name: entity_name,
241 format,
242 input: persisted_input_path,
243 domain: domain.map(ToString::to_string),
244 source_options,
245 columns,
246 notes,
247 })
248}
249
250fn load_or_initialize_config_text(config_path: &Path) -> FloeResult<String> {
251 match fs::read_to_string(config_path) {
252 Ok(text) => Ok(text),
253 Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
254 Ok(MINIMAL_CONFIG_YAML.to_string())
255 }
256 Err(err) => Err(Box::new(IoError(format!(
257 "failed to read config at {}: {err}",
258 config_path.display()
259 )))),
260 }
261}
262
263fn resolve_add_entity_format(
264 format_override: Option<&str>,
265 input: &str,
266 local_input_path: &Path,
267) -> FloeResult<String> {
268 if let Some(format) = format_override {
269 return Ok(format.to_string());
270 }
271
272 infer_format_from_extension(local_input_path).ok_or_else(|| {
273 Box::new(ConfigError(format!(
274 "could not infer add-entity format from input {} (supported extensions: .csv, .json, .parquet); use --format",
275 input
276 ))) as Box<dyn std::error::Error + Send + Sync>
277 })
278}
279
280fn infer_format_from_extension(path: &Path) -> Option<String> {
281 let ext = path.extension()?.to_string_lossy().to_ascii_lowercase();
282 match ext.as_str() {
283 "csv" => Some("csv".to_string()),
284 "json" => Some("json".to_string()),
285 "parquet" => Some("parquet".to_string()),
286 _ => None,
287 }
288}
289
290fn infer_csv_columns(
291 input_path: &Path,
292) -> FloeResult<(
293 Option<GeneratedSourceOptions>,
294 Vec<GeneratedColumn>,
295 Vec<String>,
296)> {
297 let separator = sniff_delimiter(input_path).unwrap_or(',');
298 let separator_str = separator.to_string();
299
300 let mut source_options = config::SourceOptions::defaults_for_format("csv");
301 source_options.separator = Some(separator.to_string());
302 source_options.header = Some(true);
303
304 let reader = source_options
305 .to_csv_read_options(input_path)?
306 .with_n_rows(Some(DEFAULT_SAMPLE_ROWS))
307 .with_infer_schema_length(Some(DEFAULT_SAMPLE_ROWS))
308 .try_into_reader_with_file_path(None)
309 .map_err(|err| {
310 Box::new(IoError(format!(
311 "failed to open csv at {}: {err}",
312 input_path.display()
313 ))) as Box<dyn std::error::Error + Send + Sync>
314 })?;
315 let df = reader.finish().map_err(|err| {
316 Box::new(IoError(format!("csv schema inference failed: {err}")))
317 as Box<dyn std::error::Error + Send + Sync>
318 })?;
319
320 let mut columns = Vec::with_capacity(df.width());
321 for column in df.get_columns() {
322 let name = column.name().as_str().to_string();
323 columns.push(GeneratedColumn {
324 name: name.clone(),
325 source: name,
326 column_type: polars_dtype_to_floe_type(column.dtype()),
327 nullable: df.height() == 0 || column.null_count() > 0,
328 unique: false,
329 });
330 }
331
332 Ok((
333 Some(GeneratedSourceOptions {
334 separator: Some(separator_str),
335 json_mode: None,
336 }),
337 columns,
338 Vec::new(),
339 ))
340}
341
342fn infer_parquet_columns(
343 input_path: &Path,
344) -> FloeResult<(
345 Option<GeneratedSourceOptions>,
346 Vec<GeneratedColumn>,
347 Vec<String>,
348)> {
349 let file = std::fs::File::open(input_path).map_err(|err| {
350 Box::new(IoError(format!(
351 "failed to open parquet at {}: {err}",
352 input_path.display()
353 ))) as Box<dyn std::error::Error + Send + Sync>
354 })?;
355 let mut reader = polars::prelude::ParquetReader::new(file);
356 let schema = reader.schema().map_err(|err| {
357 Box::new(IoError(format!(
358 "failed to read parquet schema at {}: {err}",
359 input_path.display()
360 ))) as Box<dyn std::error::Error + Send + Sync>
361 })?;
362
363 let columns = schema
364 .iter()
365 .map(|(name, field)| GeneratedColumn {
366 name: name.to_string(),
367 source: name.to_string(),
368 column_type: arrow_dtype_to_floe_type(field.dtype()),
369 nullable: true,
370 unique: false,
371 })
372 .collect();
373
374 Ok((None, columns, Vec::new()))
375}
376
377fn infer_json_columns(
378 input_path: &Path,
379) -> FloeResult<(
380 Option<GeneratedSourceOptions>,
381 Vec<GeneratedColumn>,
382 Vec<String>,
383)> {
384 let json_mode = detect_json_mode(input_path)?;
385 let mut stats_by_key: BTreeMap<String, JsonColumnStats> = BTreeMap::new();
386 let mut sampled_rows = 0usize;
387 let mut nested_keys = BTreeSet::new();
388
389 match json_mode {
390 JsonMode::Array => {
391 let content = fs::read_to_string(input_path).map_err(|err| {
392 Box::new(IoError(format!(
393 "failed to read json at {}: {err}",
394 input_path.display()
395 ))) as Box<dyn std::error::Error + Send + Sync>
396 })?;
397 let value: JsonValue = serde_json::from_str(&content).map_err(|err| {
398 Box::new(IoError(format!("json parse error: {err}")))
399 as Box<dyn std::error::Error + Send + Sync>
400 })?;
401 let rows = value.as_array().ok_or_else(|| {
402 Box::new(ConfigError("expected json array at root".to_string()))
403 as Box<dyn std::error::Error + Send + Sync>
404 })?;
405 for row in rows.iter().take(DEFAULT_SAMPLE_ROWS) {
406 let object = row.as_object().ok_or_else(|| {
407 Box::new(ConfigError(
408 "expected top-level json objects inside array".to_string(),
409 )) as Box<dyn std::error::Error + Send + Sync>
410 })?;
411 sampled_rows += 1;
412 update_json_stats(object, &mut stats_by_key, &mut nested_keys);
413 }
414 }
415 JsonMode::Ndjson => {
416 let file = std::fs::File::open(input_path).map_err(|err| {
417 Box::new(IoError(format!(
418 "failed to read json at {}: {err}",
419 input_path.display()
420 ))) as Box<dyn std::error::Error + Send + Sync>
421 })?;
422 let reader = BufReader::new(file);
423 for (idx, line) in reader.lines().enumerate() {
424 if sampled_rows >= DEFAULT_SAMPLE_ROWS {
425 break;
426 }
427 let line = line.map_err(|err| {
428 Box::new(IoError(format!(
429 "failed to read json at {}: {err}",
430 input_path.display()
431 ))) as Box<dyn std::error::Error + Send + Sync>
432 })?;
433 let line = line.trim();
434 if line.is_empty() {
435 continue;
436 }
437 let value: JsonValue = serde_json::from_str(line).map_err(|err| {
438 Box::new(IoError(format!(
439 "json parse error at line {}: {err}",
440 idx + 1
441 ))) as Box<dyn std::error::Error + Send + Sync>
442 })?;
443 let object = value.as_object().ok_or_else(|| {
444 Box::new(ConfigError(format!(
445 "expected top-level json object at line {}",
446 idx + 1
447 ))) as Box<dyn std::error::Error + Send + Sync>
448 })?;
449 sampled_rows += 1;
450 update_json_stats(object, &mut stats_by_key, &mut nested_keys);
451 }
452 }
453 }
454
455 if sampled_rows == 0 {
456 return Err(Box::new(ConfigError(format!(
457 "no json objects found in {}",
458 input_path.display()
459 ))));
460 }
461
462 let mut columns = Vec::with_capacity(stats_by_key.len());
463 for (key, stats) in &stats_by_key {
464 let nullable = stats.nullable || stats.non_null_count < sampled_rows;
465 columns.push(GeneratedColumn {
466 name: key.clone(),
467 source: key.clone(),
468 column_type: stats
469 .kind
470 .map(inferred_scalar_type_to_floe_type)
471 .unwrap_or("string")
472 .to_string(),
473 nullable,
474 unique: false,
475 });
476 }
477
478 let mut notes = Vec::new();
479 notes.push(format!(
480 "json inference sampled {} row(s) in {} mode",
481 sampled_rows,
482 json_mode.as_str()
483 ));
484 if !nested_keys.is_empty() {
485 notes.push(format!(
486 "nested JSON values inferred as string for top-level key(s): {}",
487 nested_keys.into_iter().collect::<Vec<_>>().join(", ")
488 ));
489 }
490
491 Ok((
492 Some(GeneratedSourceOptions {
493 separator: None,
494 json_mode: Some(json_mode.as_str().to_string()),
495 }),
496 columns,
497 notes,
498 ))
499}
500
501fn update_json_stats(
502 object: &serde_json::Map<String, JsonValue>,
503 stats_by_key: &mut BTreeMap<String, JsonColumnStats>,
504 nested_keys: &mut BTreeSet<String>,
505) {
506 for (key, value) in object {
507 let stats = stats_by_key.entry(key.clone()).or_default();
508 match json_value_kind(value) {
509 None => {
510 stats.nullable = true;
511 }
512 Some(kind) => {
513 stats.non_null_count += 1;
514 if matches!(value, JsonValue::Array(_) | JsonValue::Object(_)) {
515 nested_keys.insert(key.clone());
516 }
517 stats.kind = Some(match (stats.kind, kind) {
518 (None, next) => next,
519 (Some(current), next) => merge_json_kinds(current, next),
520 });
521 }
522 }
523 }
524}
525
526fn json_value_kind(value: &JsonValue) -> Option<InferredScalarType> {
527 match value {
528 JsonValue::Null => None,
529 JsonValue::Bool(_) => Some(InferredScalarType::Boolean),
530 JsonValue::Number(_) => Some(InferredScalarType::Number),
531 JsonValue::String(value) => Some(infer_string_scalar_type(value)),
532 JsonValue::Array(_) | JsonValue::Object(_) => Some(InferredScalarType::String),
533 }
534}
535
536fn infer_string_scalar_type(value: &str) -> InferredScalarType {
537 let trimmed = value.trim();
538 if trimmed.is_empty() {
539 return InferredScalarType::String;
540 }
541 if looks_like_date_time(trimmed) {
542 return InferredScalarType::DateTime;
543 }
544 if looks_like_date(trimmed) {
545 return InferredScalarType::Date;
546 }
547 if looks_like_time(trimmed) {
548 return InferredScalarType::Time;
549 }
550 InferredScalarType::String
551}
552
553fn looks_like_date(value: &str) -> bool {
554 let bytes = value.as_bytes();
555 bytes.len() == 10
556 && bytes[4] == b'-'
557 && bytes[7] == b'-'
558 && bytes
559 .iter()
560 .enumerate()
561 .all(|(idx, b)| matches!(idx, 4 | 7) || b.is_ascii_digit())
562}
563
564fn looks_like_time(value: &str) -> bool {
565 let bytes = value.as_bytes();
566 if bytes.len() == 8 {
567 return bytes[2] == b':'
568 && bytes[5] == b':'
569 && bytes
570 .iter()
571 .enumerate()
572 .all(|(idx, b)| matches!(idx, 2 | 5) || b.is_ascii_digit());
573 }
574 if bytes.len() == 12 {
575 return bytes[2] == b':'
576 && bytes[5] == b':'
577 && bytes[8] == b'.'
578 && bytes
579 .iter()
580 .enumerate()
581 .all(|(idx, b)| matches!(idx, 2 | 5 | 8) || b.is_ascii_digit());
582 }
583 false
584}
585
586fn looks_like_date_time(value: &str) -> bool {
587 value.contains('T')
588 && (value.ends_with('Z') || value.contains('+') || value.matches(':').count() >= 2)
589}
590
591fn merge_json_kinds(current: InferredScalarType, next: InferredScalarType) -> InferredScalarType {
592 use InferredScalarType as T;
593 match (current, next) {
594 (T::String, _) | (_, T::String) => T::String,
595 (T::Number, T::Number) => T::Number,
596 (T::Boolean, T::Boolean) => T::Boolean,
597 (T::Date, T::Date) => T::Date,
598 (T::Time, T::Time) => T::Time,
599 (T::DateTime, T::DateTime) => T::DateTime,
600 (T::Date, T::DateTime) | (T::DateTime, T::Date) => T::DateTime,
601 _ => T::String,
602 }
603}
604
605fn inferred_scalar_type_to_floe_type(kind: InferredScalarType) -> &'static str {
606 match kind {
607 InferredScalarType::String => "string",
608 InferredScalarType::Boolean => "boolean",
609 InferredScalarType::Number => "number",
610 InferredScalarType::Date => "date",
611 InferredScalarType::Time => "time",
612 InferredScalarType::DateTime => "datetime",
613 }
614}
615
616fn detect_json_mode(path: &Path) -> FloeResult<JsonMode> {
617 let file = std::fs::File::open(path).map_err(|err| {
618 Box::new(IoError(format!(
619 "failed to read json at {}: {err}",
620 path.display()
621 ))) as Box<dyn std::error::Error + Send + Sync>
622 })?;
623 let mut reader = BufReader::new(file);
624 let mut buf = String::new();
625 while reader.read_line(&mut buf)? > 0 {
626 let trimmed = buf.trim_start();
627 if trimmed.is_empty() {
628 buf.clear();
629 continue;
630 }
631 return Ok(if trimmed.starts_with('[') {
632 JsonMode::Array
633 } else {
634 JsonMode::Ndjson
635 });
636 }
637 Err(Box::new(ConfigError(format!(
638 "json input {} is empty",
639 path.display()
640 ))))
641}
642
643fn polars_dtype_to_floe_type(dtype: &DataType) -> String {
644 match dtype {
645 DataType::Boolean => "boolean".to_string(),
646 DataType::Date => "date".to_string(),
647 DataType::Time => "time".to_string(),
648 DataType::Datetime(_, _) => "datetime".to_string(),
649 DataType::Int8
650 | DataType::Int16
651 | DataType::Int32
652 | DataType::Int64
653 | DataType::UInt8
654 | DataType::UInt16
655 | DataType::UInt32
656 | DataType::UInt64
657 | DataType::Float32
658 | DataType::Float64 => "number".to_string(),
659 _ => "string".to_string(),
660 }
661}
662
663fn arrow_dtype_to_floe_type(dtype: &ArrowDataType) -> String {
664 match dtype {
665 ArrowDataType::Boolean => "boolean".to_string(),
666 ArrowDataType::Date32 | ArrowDataType::Date64 => "date".to_string(),
667 ArrowDataType::Time32(_) | ArrowDataType::Time64(_) => "time".to_string(),
668 ArrowDataType::Timestamp(_, _) => "datetime".to_string(),
669 ArrowDataType::Int8
670 | ArrowDataType::Int16
671 | ArrowDataType::Int32
672 | ArrowDataType::Int64
673 | ArrowDataType::UInt8
674 | ArrowDataType::UInt16
675 | ArrowDataType::UInt32
676 | ArrowDataType::UInt64
677 | ArrowDataType::Float16
678 | ArrowDataType::Float32
679 | ArrowDataType::Float64
680 | ArrowDataType::Decimal(_, _)
681 | ArrowDataType::Decimal256(_, _) => "number".to_string(),
682 _ => "string".to_string(),
683 }
684}
685
686fn append_entity_yaml(config_text: &str, inferred: &InferredEntity) -> FloeResult<String> {
687 let mut root: YamlValue = serde_yaml::from_str(config_text)?;
688 let root_map = root.as_mapping_mut().ok_or_else(|| {
689 Box::new(ConfigError(
690 "config root must be a YAML mapping".to_string(),
691 )) as Box<dyn std::error::Error + Send + Sync>
692 })?;
693
694 let entities_key = YamlValue::String("entities".to_string());
695 let new_entity_value = serde_yaml::to_value(build_generated_entity_yaml(inferred))?;
696
697 match root_map.get_mut(&entities_key) {
698 Some(value) => {
699 let entities = value.as_sequence_mut().ok_or_else(|| {
700 Box::new(ConfigError(
701 "root.entities must be a YAML sequence".to_string(),
702 )) as Box<dyn std::error::Error + Send + Sync>
703 })?;
704 if entities
705 .iter()
706 .any(|entity| entity_name_matches(entity, &inferred.name))
707 {
708 return Err(Box::new(ConfigError(format!(
709 "entity already exists: {}",
710 inferred.name
711 ))));
712 }
713 entities.push(new_entity_value);
714 }
715 None => {
716 root_map.insert(entities_key, YamlValue::Sequence(vec![new_entity_value]));
717 }
718 }
719
720 Ok(serde_yaml::to_string(&root)?)
721}
722
723fn entity_name_matches(value: &YamlValue, name: &str) -> bool {
724 value
725 .as_mapping()
726 .and_then(|map| map.get(YamlValue::String("name".to_string())))
727 .and_then(YamlValue::as_str)
728 == Some(name)
729}
730
731fn build_generated_entity_yaml(inferred: &InferredEntity) -> GeneratedEntityYaml {
732 let source_options = match inferred.format.as_str() {
733 "json" => inferred.source_options.clone(),
734 "csv" => inferred.source_options.clone(),
735 _ => None,
736 };
737
738 GeneratedEntityYaml {
739 name: inferred.name.clone(),
740 domain: inferred.domain.clone(),
741 source: GeneratedSourceYaml {
742 format: inferred.format.clone(),
743 path: inferred.input.clone(),
744 options: source_options,
745 },
746 sink: GeneratedSinkYaml {
747 accepted: GeneratedSinkTargetYaml {
748 format: "parquet".to_string(),
749 path: format!("out/accepted/{}/", inferred.name),
750 },
751 rejected: GeneratedSinkTargetYaml {
752 format: "csv".to_string(),
753 path: format!("out/rejected/{}/", inferred.name),
754 },
755 },
756 policy: GeneratedPolicyYaml {
757 severity: "reject".to_string(),
758 },
759 schema: GeneratedSchemaYaml {
760 mismatch: GeneratedMismatchYaml {
761 missing_columns: "reject_file".to_string(),
762 extra_columns: "ignore".to_string(),
763 },
764 columns: inferred.columns.clone(),
765 },
766 }
767}
768
769fn validate_generated_config(yaml: &str, target_path: &Path) -> FloeResult<()> {
770 let validation_parent = target_path.parent().unwrap_or_else(|| Path::new("."));
771 let mut temp_file =
772 NamedTempFile::new_in(validation_parent).or_else(|_| NamedTempFile::new())?;
773 use std::io::Write;
774 temp_file.write_all(yaml.as_bytes())?;
775 crate::validate(temp_file.path(), ValidateOptions::default())
776}
777
778fn resolve_local_input_path(input: &str) -> FloeResult<PathBuf> {
779 if let Some((scheme, _)) = input.split_once("://") {
780 if scheme.eq_ignore_ascii_case("file") {
781 let url = Url::parse(input)?;
782 return url.to_file_path().map_err(|_| {
783 Box::new(ConfigError(format!(
784 "invalid file:// URI for add-entity input: {input}"
785 ))) as Box<dyn std::error::Error + Send + Sync>
786 });
787 }
788 return Err(Box::new(ConfigError(format!(
789 "remote URI inference is not supported yet for add-entity input: {input}"
790 ))));
791 }
792
793 let path = PathBuf::from(input);
794 let absolute = if path.is_absolute() {
795 path
796 } else {
797 std::env::current_dir()?.join(path)
798 };
799 let metadata = fs::metadata(&absolute).map_err(|err| {
800 Box::new(IoError(format!(
801 "failed to access input at {}: {err}",
802 absolute.display()
803 ))) as Box<dyn std::error::Error + Send + Sync>
804 })?;
805 if metadata.is_dir() {
806 return Err(Box::new(ConfigError(format!(
807 "add-entity expects a file input for schema inference, got directory: {}",
808 absolute.display()
809 ))));
810 }
811 Ok(absolute)
812}
813
814fn normalize_persisted_input_path(input: &str, local_input_path: &Path) -> String {
815 if let Some((scheme, _)) = input.split_once("://") {
816 if scheme.eq_ignore_ascii_case("file") {
817 return local_input_path.display().to_string();
818 }
819 }
820 input.to_string()
821}
822
823fn derive_entity_name(input: &str) -> FloeResult<String> {
824 let raw = if let Some((scheme, _)) = input.split_once("://") {
825 if scheme.eq_ignore_ascii_case("file") {
826 let url = Url::parse(input)?;
827 let path = url.to_file_path().map_err(|_| {
828 Box::new(ConfigError(format!(
829 "invalid file:// URI for add-entity input: {input}"
830 ))) as Box<dyn std::error::Error + Send + Sync>
831 })?;
832 file_name_stem_string(&path)
833 } else {
834 let trimmed = input.trim_end_matches('/');
835 let last = trimmed.rsplit('/').next().unwrap_or(trimmed);
836 last.split('?').next().unwrap_or(last).to_string()
837 }
838 } else {
839 file_name_stem_string(Path::new(input))
840 };
841 let normalized = slugify_entity_name(&raw);
842 if normalized.is_empty() {
843 return Err(Box::new(ConfigError(format!(
844 "failed to derive entity name from input: {input}"
845 ))));
846 }
847 Ok(normalized)
848}
849
850fn file_name_stem_string(path: &Path) -> String {
851 path.file_stem()
852 .or_else(|| path.file_name())
853 .map(|name| name.to_string_lossy().to_string())
854 .unwrap_or_else(|| "entity".to_string())
855}
856
857fn slugify_entity_name(value: &str) -> String {
858 let mut out = String::new();
859 let mut last_was_sep = false;
860 for ch in value.chars() {
861 if ch.is_ascii_alphanumeric() {
862 out.push(ch.to_ascii_lowercase());
863 last_was_sep = false;
864 } else if !last_was_sep {
865 out.push('_');
866 last_was_sep = true;
867 }
868 }
869 out.trim_matches('_').to_string()
870}
871
872fn sniff_delimiter(path: &Path) -> Option<char> {
873 let file = std::fs::File::open(path).ok()?;
874 let reader = BufReader::new(file);
875 let mut lines = Vec::new();
876 for line in reader.lines().take(12) {
877 let line = line.ok()?;
878 let trimmed = line.trim();
879 if trimmed.is_empty() {
880 continue;
881 }
882 lines.push(trimmed.to_string());
883 if lines.len() >= 6 {
884 break;
885 }
886 }
887 if lines.is_empty() {
888 return None;
889 }
890
891 let candidates = [',', ';', '\t', '|'];
892 let mut best: Option<(i32, char)> = None;
893 for candidate in candidates {
894 let counts = lines
895 .iter()
896 .map(|line| line.chars().filter(|ch| *ch == candidate).count())
897 .collect::<Vec<_>>();
898 let first = *counts.first().unwrap_or(&0);
899 let min = *counts.iter().min().unwrap_or(&0);
900 let max = *counts.iter().max().unwrap_or(&0);
901 let sum = counts.iter().sum::<usize>();
902 if first == 0 || sum == 0 {
903 continue;
904 }
905 let consistency_bonus = if min > 0 && max == min { 10_000 } else { 0 };
906 let score = consistency_bonus + (min as i32 * 100) + sum as i32;
907 if best
908 .map(|(best_score, _)| score > best_score)
909 .unwrap_or(true)
910 {
911 best = Some((score, candidate));
912 }
913 }
914 best.map(|(_, candidate)| candidate)
915}