1use std::collections::{BTreeMap, BTreeSet};
2use std::fs;
3use std::io::{BufRead, BufReader};
4use std::path::{Path, PathBuf};
5
6use polars::prelude::{ArrowDataType, DataType, SerReader};
7use serde::Serialize;
8use serde_json::Value as JsonValue;
9use serde_yaml::Value as YamlValue;
10use tempfile::NamedTempFile;
11use url::Url;
12
13use crate::config;
14use crate::errors::IoError;
15use crate::{ConfigError, FloeResult, ValidateOptions};
16
17const DEFAULT_SAMPLE_ROWS: usize = 200;
18
19#[derive(Debug, Clone)]
20pub struct AddEntityOptions {
21 pub config_path: PathBuf,
22 pub output_path: Option<PathBuf>,
23 pub input: String,
24 pub format: String,
25 pub name: Option<String>,
26 pub domain: Option<String>,
27 pub dry_run: bool,
28}
29
30#[derive(Debug, Clone)]
31pub struct AddEntityOutcome {
32 pub entity_name: String,
33 pub output_path: PathBuf,
34 pub column_count: usize,
35 pub format: String,
36 pub dry_run: bool,
37 pub rendered_yaml: Option<String>,
38 pub notes: Vec<String>,
39}
40
41#[derive(Debug, Clone)]
42struct InferredEntity {
43 name: String,
44 format: String,
45 input: String,
46 domain: Option<String>,
47 source_options: Option<GeneratedSourceOptions>,
48 columns: Vec<GeneratedColumn>,
49 notes: Vec<String>,
50}
51
52#[derive(Debug, Clone, Serialize)]
53struct GeneratedEntityYaml {
54 name: String,
55 #[serde(skip_serializing_if = "Option::is_none")]
56 domain: Option<String>,
57 source: GeneratedSourceYaml,
58 sink: GeneratedSinkYaml,
59 policy: GeneratedPolicyYaml,
60 schema: GeneratedSchemaYaml,
61}
62
63#[derive(Debug, Clone, Serialize)]
64struct GeneratedSourceYaml {
65 format: String,
66 path: String,
67 #[serde(skip_serializing_if = "Option::is_none")]
68 options: Option<GeneratedSourceOptions>,
69}
70
71#[derive(Debug, Clone, Serialize)]
72struct GeneratedSourceOptions {
73 #[serde(skip_serializing_if = "Option::is_none")]
74 separator: Option<String>,
75 #[serde(skip_serializing_if = "Option::is_none")]
76 json_mode: Option<String>,
77}
78
79#[derive(Debug, Clone, Serialize)]
80struct GeneratedSinkYaml {
81 accepted: GeneratedSinkTargetYaml,
82 rejected: GeneratedSinkTargetYaml,
83}
84
85#[derive(Debug, Clone, Serialize)]
86struct GeneratedSinkTargetYaml {
87 format: String,
88 path: String,
89}
90
91#[derive(Debug, Clone, Serialize)]
92struct GeneratedPolicyYaml {
93 severity: String,
94}
95
96#[derive(Debug, Clone, Serialize)]
97struct GeneratedSchemaYaml {
98 mismatch: GeneratedMismatchYaml,
99 columns: Vec<GeneratedColumn>,
100}
101
102#[derive(Debug, Clone, Serialize)]
103struct GeneratedMismatchYaml {
104 missing_columns: String,
105 extra_columns: String,
106}
107
108#[derive(Debug, Clone, Serialize)]
109struct GeneratedColumn {
110 name: String,
111 source: String,
112 #[serde(rename = "type")]
113 column_type: String,
114 nullable: bool,
115 unique: bool,
116}
117
118#[derive(Debug, Clone, Copy, PartialEq, Eq)]
119enum JsonMode {
120 Array,
121 Ndjson,
122}
123
124impl JsonMode {
125 fn as_str(self) -> &'static str {
126 match self {
127 JsonMode::Array => "array",
128 JsonMode::Ndjson => "ndjson",
129 }
130 }
131}
132
133#[derive(Debug, Clone, Copy, PartialEq, Eq)]
134enum InferredScalarType {
135 String,
136 Boolean,
137 Number,
138 Date,
139 Time,
140 DateTime,
141}
142
143#[derive(Debug, Default, Clone)]
144struct JsonColumnStats {
145 kind: Option<InferredScalarType>,
146 non_null_count: usize,
147 nullable: bool,
148}
149
150pub fn add_entity_to_config(options: AddEntityOptions) -> FloeResult<AddEntityOutcome> {
151 let inferred = infer_entity_from_input(
152 &options.input,
153 options.format.as_str(),
154 options.name.as_deref(),
155 options.domain.as_deref(),
156 )?;
157
158 let output_path = options
159 .output_path
160 .clone()
161 .unwrap_or_else(|| options.config_path.clone());
162 let config_text = fs::read_to_string(&options.config_path).map_err(|err| {
163 Box::new(IoError(format!(
164 "failed to read config at {}: {err}",
165 options.config_path.display()
166 ))) as Box<dyn std::error::Error + Send + Sync>
167 })?;
168
169 let updated_yaml = append_entity_yaml(&config_text, &inferred)?;
170 if !options.dry_run {
171 if let Some(parent) = output_path.parent() {
172 if !parent.as_os_str().is_empty() {
173 fs::create_dir_all(parent)?;
174 }
175 }
176 }
177 validate_generated_config(
178 &updated_yaml,
179 if options.dry_run {
180 &options.config_path
181 } else {
182 &output_path
183 },
184 )?;
185
186 if !options.dry_run {
187 fs::write(&output_path, updated_yaml.as_bytes()).map_err(|err| {
188 Box::new(IoError(format!(
189 "failed to write config at {}: {err}",
190 output_path.display()
191 ))) as Box<dyn std::error::Error + Send + Sync>
192 })?;
193 }
194
195 Ok(AddEntityOutcome {
196 entity_name: inferred.name,
197 output_path,
198 column_count: inferred.columns.len(),
199 format: inferred.format,
200 dry_run: options.dry_run,
201 rendered_yaml: options.dry_run.then_some(updated_yaml),
202 notes: inferred.notes,
203 })
204}
205
206fn infer_entity_from_input(
207 input: &str,
208 format: &str,
209 name_override: Option<&str>,
210 domain: Option<&str>,
211) -> FloeResult<InferredEntity> {
212 let entity_name = match name_override {
213 Some(value) => value.to_string(),
214 None => derive_entity_name(input)?,
215 };
216 if entity_name.trim().is_empty() {
217 return Err(Box::new(ConfigError(
218 "entity name must not be empty".to_string(),
219 )));
220 }
221
222 let local_input_path = resolve_local_input_path(input)?;
223 let persisted_input_path = normalize_persisted_input_path(input, &local_input_path);
224 let (source_options, columns, notes) = match format {
225 "csv" => infer_csv_columns(&local_input_path)?,
226 "json" => infer_json_columns(&local_input_path)?,
227 "parquet" => infer_parquet_columns(&local_input_path)?,
228 other => {
229 return Err(Box::new(ConfigError(format!(
230 "unsupported add-entity format: {other} (allowed: csv, json, parquet)"
231 ))))
232 }
233 };
234
235 if columns.is_empty() {
236 return Err(Box::new(ConfigError(format!(
237 "no columns inferred from {}",
238 local_input_path.display()
239 ))));
240 }
241
242 Ok(InferredEntity {
243 name: entity_name,
244 format: format.to_string(),
245 input: persisted_input_path,
246 domain: domain.map(ToString::to_string),
247 source_options,
248 columns,
249 notes,
250 })
251}
252
253fn infer_csv_columns(
254 input_path: &Path,
255) -> FloeResult<(
256 Option<GeneratedSourceOptions>,
257 Vec<GeneratedColumn>,
258 Vec<String>,
259)> {
260 let separator = sniff_delimiter(input_path).unwrap_or(',');
261 let separator_str = separator.to_string();
262
263 let mut source_options = config::SourceOptions::defaults_for_format("csv");
264 source_options.separator = Some(separator.to_string());
265 source_options.header = Some(true);
266
267 let reader = source_options
268 .to_csv_read_options(input_path)?
269 .with_n_rows(Some(DEFAULT_SAMPLE_ROWS))
270 .with_infer_schema_length(Some(DEFAULT_SAMPLE_ROWS))
271 .try_into_reader_with_file_path(None)
272 .map_err(|err| {
273 Box::new(IoError(format!(
274 "failed to open csv at {}: {err}",
275 input_path.display()
276 ))) as Box<dyn std::error::Error + Send + Sync>
277 })?;
278 let df = reader.finish().map_err(|err| {
279 Box::new(IoError(format!("csv schema inference failed: {err}")))
280 as Box<dyn std::error::Error + Send + Sync>
281 })?;
282
283 let mut columns = Vec::with_capacity(df.width());
284 for column in df.get_columns() {
285 let name = column.name().as_str().to_string();
286 columns.push(GeneratedColumn {
287 name: name.clone(),
288 source: name,
289 column_type: polars_dtype_to_floe_type(column.dtype()),
290 nullable: df.height() == 0 || column.null_count() > 0,
291 unique: false,
292 });
293 }
294
295 Ok((
296 Some(GeneratedSourceOptions {
297 separator: Some(separator_str),
298 json_mode: None,
299 }),
300 columns,
301 Vec::new(),
302 ))
303}
304
305fn infer_parquet_columns(
306 input_path: &Path,
307) -> FloeResult<(
308 Option<GeneratedSourceOptions>,
309 Vec<GeneratedColumn>,
310 Vec<String>,
311)> {
312 let file = std::fs::File::open(input_path).map_err(|err| {
313 Box::new(IoError(format!(
314 "failed to open parquet at {}: {err}",
315 input_path.display()
316 ))) as Box<dyn std::error::Error + Send + Sync>
317 })?;
318 let mut reader = polars::prelude::ParquetReader::new(file);
319 let schema = reader.schema().map_err(|err| {
320 Box::new(IoError(format!(
321 "failed to read parquet schema at {}: {err}",
322 input_path.display()
323 ))) as Box<dyn std::error::Error + Send + Sync>
324 })?;
325
326 let columns = schema
327 .iter()
328 .map(|(name, field)| GeneratedColumn {
329 name: name.to_string(),
330 source: name.to_string(),
331 column_type: arrow_dtype_to_floe_type(field.dtype()),
332 nullable: true,
333 unique: false,
334 })
335 .collect();
336
337 Ok((None, columns, Vec::new()))
338}
339
340fn infer_json_columns(
341 input_path: &Path,
342) -> FloeResult<(
343 Option<GeneratedSourceOptions>,
344 Vec<GeneratedColumn>,
345 Vec<String>,
346)> {
347 let json_mode = detect_json_mode(input_path)?;
348 let mut stats_by_key: BTreeMap<String, JsonColumnStats> = BTreeMap::new();
349 let mut sampled_rows = 0usize;
350 let mut nested_keys = BTreeSet::new();
351
352 match json_mode {
353 JsonMode::Array => {
354 let content = fs::read_to_string(input_path).map_err(|err| {
355 Box::new(IoError(format!(
356 "failed to read json at {}: {err}",
357 input_path.display()
358 ))) as Box<dyn std::error::Error + Send + Sync>
359 })?;
360 let value: JsonValue = serde_json::from_str(&content).map_err(|err| {
361 Box::new(IoError(format!("json parse error: {err}")))
362 as Box<dyn std::error::Error + Send + Sync>
363 })?;
364 let rows = value.as_array().ok_or_else(|| {
365 Box::new(ConfigError("expected json array at root".to_string()))
366 as Box<dyn std::error::Error + Send + Sync>
367 })?;
368 for row in rows.iter().take(DEFAULT_SAMPLE_ROWS) {
369 let object = row.as_object().ok_or_else(|| {
370 Box::new(ConfigError(
371 "expected top-level json objects inside array".to_string(),
372 )) as Box<dyn std::error::Error + Send + Sync>
373 })?;
374 sampled_rows += 1;
375 update_json_stats(object, &mut stats_by_key, &mut nested_keys);
376 }
377 }
378 JsonMode::Ndjson => {
379 let file = std::fs::File::open(input_path).map_err(|err| {
380 Box::new(IoError(format!(
381 "failed to read json at {}: {err}",
382 input_path.display()
383 ))) as Box<dyn std::error::Error + Send + Sync>
384 })?;
385 let reader = BufReader::new(file);
386 for (idx, line) in reader.lines().enumerate() {
387 if sampled_rows >= DEFAULT_SAMPLE_ROWS {
388 break;
389 }
390 let line = line.map_err(|err| {
391 Box::new(IoError(format!(
392 "failed to read json at {}: {err}",
393 input_path.display()
394 ))) as Box<dyn std::error::Error + Send + Sync>
395 })?;
396 let line = line.trim();
397 if line.is_empty() {
398 continue;
399 }
400 let value: JsonValue = serde_json::from_str(line).map_err(|err| {
401 Box::new(IoError(format!(
402 "json parse error at line {}: {err}",
403 idx + 1
404 ))) as Box<dyn std::error::Error + Send + Sync>
405 })?;
406 let object = value.as_object().ok_or_else(|| {
407 Box::new(ConfigError(format!(
408 "expected top-level json object at line {}",
409 idx + 1
410 ))) as Box<dyn std::error::Error + Send + Sync>
411 })?;
412 sampled_rows += 1;
413 update_json_stats(object, &mut stats_by_key, &mut nested_keys);
414 }
415 }
416 }
417
418 if sampled_rows == 0 {
419 return Err(Box::new(ConfigError(format!(
420 "no json objects found in {}",
421 input_path.display()
422 ))));
423 }
424
425 let mut columns = Vec::with_capacity(stats_by_key.len());
426 for (key, stats) in &stats_by_key {
427 let nullable = stats.nullable || stats.non_null_count < sampled_rows;
428 columns.push(GeneratedColumn {
429 name: key.clone(),
430 source: key.clone(),
431 column_type: stats
432 .kind
433 .map(inferred_scalar_type_to_floe_type)
434 .unwrap_or("string")
435 .to_string(),
436 nullable,
437 unique: false,
438 });
439 }
440
441 let mut notes = Vec::new();
442 notes.push(format!(
443 "json inference sampled {} row(s) in {} mode",
444 sampled_rows,
445 json_mode.as_str()
446 ));
447 if !nested_keys.is_empty() {
448 notes.push(format!(
449 "nested JSON values inferred as string for top-level key(s): {}",
450 nested_keys.into_iter().collect::<Vec<_>>().join(", ")
451 ));
452 }
453
454 Ok((
455 Some(GeneratedSourceOptions {
456 separator: None,
457 json_mode: Some(json_mode.as_str().to_string()),
458 }),
459 columns,
460 notes,
461 ))
462}
463
464fn update_json_stats(
465 object: &serde_json::Map<String, JsonValue>,
466 stats_by_key: &mut BTreeMap<String, JsonColumnStats>,
467 nested_keys: &mut BTreeSet<String>,
468) {
469 for (key, value) in object {
470 let stats = stats_by_key.entry(key.clone()).or_default();
471 match json_value_kind(value) {
472 None => {
473 stats.nullable = true;
474 }
475 Some(kind) => {
476 stats.non_null_count += 1;
477 if matches!(value, JsonValue::Array(_) | JsonValue::Object(_)) {
478 nested_keys.insert(key.clone());
479 }
480 stats.kind = Some(match (stats.kind, kind) {
481 (None, next) => next,
482 (Some(current), next) => merge_json_kinds(current, next),
483 });
484 }
485 }
486 }
487}
488
489fn json_value_kind(value: &JsonValue) -> Option<InferredScalarType> {
490 match value {
491 JsonValue::Null => None,
492 JsonValue::Bool(_) => Some(InferredScalarType::Boolean),
493 JsonValue::Number(_) => Some(InferredScalarType::Number),
494 JsonValue::String(value) => Some(infer_string_scalar_type(value)),
495 JsonValue::Array(_) | JsonValue::Object(_) => Some(InferredScalarType::String),
496 }
497}
498
499fn infer_string_scalar_type(value: &str) -> InferredScalarType {
500 let trimmed = value.trim();
501 if trimmed.is_empty() {
502 return InferredScalarType::String;
503 }
504 if looks_like_date_time(trimmed) {
505 return InferredScalarType::DateTime;
506 }
507 if looks_like_date(trimmed) {
508 return InferredScalarType::Date;
509 }
510 if looks_like_time(trimmed) {
511 return InferredScalarType::Time;
512 }
513 InferredScalarType::String
514}
515
516fn looks_like_date(value: &str) -> bool {
517 let bytes = value.as_bytes();
518 bytes.len() == 10
519 && bytes[4] == b'-'
520 && bytes[7] == b'-'
521 && bytes
522 .iter()
523 .enumerate()
524 .all(|(idx, b)| matches!(idx, 4 | 7) || b.is_ascii_digit())
525}
526
527fn looks_like_time(value: &str) -> bool {
528 let bytes = value.as_bytes();
529 if bytes.len() == 8 {
530 return bytes[2] == b':'
531 && bytes[5] == b':'
532 && bytes
533 .iter()
534 .enumerate()
535 .all(|(idx, b)| matches!(idx, 2 | 5) || b.is_ascii_digit());
536 }
537 if bytes.len() == 12 {
538 return bytes[2] == b':'
539 && bytes[5] == b':'
540 && bytes[8] == b'.'
541 && bytes
542 .iter()
543 .enumerate()
544 .all(|(idx, b)| matches!(idx, 2 | 5 | 8) || b.is_ascii_digit());
545 }
546 false
547}
548
549fn looks_like_date_time(value: &str) -> bool {
550 value.contains('T')
551 && (value.ends_with('Z') || value.contains('+') || value.matches(':').count() >= 2)
552}
553
554fn merge_json_kinds(current: InferredScalarType, next: InferredScalarType) -> InferredScalarType {
555 use InferredScalarType as T;
556 match (current, next) {
557 (T::String, _) | (_, T::String) => T::String,
558 (T::Number, T::Number) => T::Number,
559 (T::Boolean, T::Boolean) => T::Boolean,
560 (T::Date, T::Date) => T::Date,
561 (T::Time, T::Time) => T::Time,
562 (T::DateTime, T::DateTime) => T::DateTime,
563 (T::Date, T::DateTime) | (T::DateTime, T::Date) => T::DateTime,
564 _ => T::String,
565 }
566}
567
568fn inferred_scalar_type_to_floe_type(kind: InferredScalarType) -> &'static str {
569 match kind {
570 InferredScalarType::String => "string",
571 InferredScalarType::Boolean => "boolean",
572 InferredScalarType::Number => "number",
573 InferredScalarType::Date => "date",
574 InferredScalarType::Time => "time",
575 InferredScalarType::DateTime => "datetime",
576 }
577}
578
579fn detect_json_mode(path: &Path) -> FloeResult<JsonMode> {
580 let file = std::fs::File::open(path).map_err(|err| {
581 Box::new(IoError(format!(
582 "failed to read json at {}: {err}",
583 path.display()
584 ))) as Box<dyn std::error::Error + Send + Sync>
585 })?;
586 let mut reader = BufReader::new(file);
587 let mut buf = String::new();
588 while reader.read_line(&mut buf)? > 0 {
589 let trimmed = buf.trim_start();
590 if trimmed.is_empty() {
591 buf.clear();
592 continue;
593 }
594 return Ok(if trimmed.starts_with('[') {
595 JsonMode::Array
596 } else {
597 JsonMode::Ndjson
598 });
599 }
600 Err(Box::new(ConfigError(format!(
601 "json input {} is empty",
602 path.display()
603 ))))
604}
605
606fn polars_dtype_to_floe_type(dtype: &DataType) -> String {
607 match dtype {
608 DataType::Boolean => "boolean".to_string(),
609 DataType::Date => "date".to_string(),
610 DataType::Time => "time".to_string(),
611 DataType::Datetime(_, _) => "datetime".to_string(),
612 DataType::Int8
613 | DataType::Int16
614 | DataType::Int32
615 | DataType::Int64
616 | DataType::UInt8
617 | DataType::UInt16
618 | DataType::UInt32
619 | DataType::UInt64
620 | DataType::Float32
621 | DataType::Float64 => "number".to_string(),
622 _ => "string".to_string(),
623 }
624}
625
626fn arrow_dtype_to_floe_type(dtype: &ArrowDataType) -> String {
627 match dtype {
628 ArrowDataType::Boolean => "boolean".to_string(),
629 ArrowDataType::Date32 | ArrowDataType::Date64 => "date".to_string(),
630 ArrowDataType::Time32(_) | ArrowDataType::Time64(_) => "time".to_string(),
631 ArrowDataType::Timestamp(_, _) => "datetime".to_string(),
632 ArrowDataType::Int8
633 | ArrowDataType::Int16
634 | ArrowDataType::Int32
635 | ArrowDataType::Int64
636 | ArrowDataType::UInt8
637 | ArrowDataType::UInt16
638 | ArrowDataType::UInt32
639 | ArrowDataType::UInt64
640 | ArrowDataType::Float16
641 | ArrowDataType::Float32
642 | ArrowDataType::Float64
643 | ArrowDataType::Decimal(_, _)
644 | ArrowDataType::Decimal256(_, _) => "number".to_string(),
645 _ => "string".to_string(),
646 }
647}
648
649fn append_entity_yaml(config_text: &str, inferred: &InferredEntity) -> FloeResult<String> {
650 let mut root: YamlValue = serde_yaml::from_str(config_text)?;
651 let root_map = root.as_mapping_mut().ok_or_else(|| {
652 Box::new(ConfigError(
653 "config root must be a YAML mapping".to_string(),
654 )) as Box<dyn std::error::Error + Send + Sync>
655 })?;
656
657 let entities_key = YamlValue::String("entities".to_string());
658 let new_entity_value = serde_yaml::to_value(build_generated_entity_yaml(inferred))?;
659
660 match root_map.get_mut(&entities_key) {
661 Some(value) => {
662 let entities = value.as_sequence_mut().ok_or_else(|| {
663 Box::new(ConfigError(
664 "root.entities must be a YAML sequence".to_string(),
665 )) as Box<dyn std::error::Error + Send + Sync>
666 })?;
667 if entities
668 .iter()
669 .any(|entity| entity_name_matches(entity, &inferred.name))
670 {
671 return Err(Box::new(ConfigError(format!(
672 "entity already exists: {}",
673 inferred.name
674 ))));
675 }
676 entities.push(new_entity_value);
677 }
678 None => {
679 root_map.insert(entities_key, YamlValue::Sequence(vec![new_entity_value]));
680 }
681 }
682
683 Ok(serde_yaml::to_string(&root)?)
684}
685
686fn entity_name_matches(value: &YamlValue, name: &str) -> bool {
687 value
688 .as_mapping()
689 .and_then(|map| map.get(YamlValue::String("name".to_string())))
690 .and_then(YamlValue::as_str)
691 == Some(name)
692}
693
694fn build_generated_entity_yaml(inferred: &InferredEntity) -> GeneratedEntityYaml {
695 let source_options = match inferred.format.as_str() {
696 "json" => inferred.source_options.clone(),
697 "csv" => inferred.source_options.clone(),
698 _ => None,
699 };
700
701 GeneratedEntityYaml {
702 name: inferred.name.clone(),
703 domain: inferred.domain.clone(),
704 source: GeneratedSourceYaml {
705 format: inferred.format.clone(),
706 path: inferred.input.clone(),
707 options: source_options,
708 },
709 sink: GeneratedSinkYaml {
710 accepted: GeneratedSinkTargetYaml {
711 format: "parquet".to_string(),
712 path: format!("out/accepted/{}/", inferred.name),
713 },
714 rejected: GeneratedSinkTargetYaml {
715 format: "csv".to_string(),
716 path: format!("out/rejected/{}/", inferred.name),
717 },
718 },
719 policy: GeneratedPolicyYaml {
720 severity: "reject".to_string(),
721 },
722 schema: GeneratedSchemaYaml {
723 mismatch: GeneratedMismatchYaml {
724 missing_columns: "reject_file".to_string(),
725 extra_columns: "ignore".to_string(),
726 },
727 columns: inferred.columns.clone(),
728 },
729 }
730}
731
732fn validate_generated_config(yaml: &str, target_path: &Path) -> FloeResult<()> {
733 let validation_parent = target_path.parent().unwrap_or_else(|| Path::new("."));
734 let mut temp_file =
735 NamedTempFile::new_in(validation_parent).or_else(|_| NamedTempFile::new())?;
736 use std::io::Write;
737 temp_file.write_all(yaml.as_bytes())?;
738 crate::validate(temp_file.path(), ValidateOptions::default())
739}
740
741fn resolve_local_input_path(input: &str) -> FloeResult<PathBuf> {
742 if let Some((scheme, _)) = input.split_once("://") {
743 if scheme.eq_ignore_ascii_case("file") {
744 let url = Url::parse(input)?;
745 return url.to_file_path().map_err(|_| {
746 Box::new(ConfigError(format!(
747 "invalid file:// URI for add-entity input: {input}"
748 ))) as Box<dyn std::error::Error + Send + Sync>
749 });
750 }
751 return Err(Box::new(ConfigError(format!(
752 "remote URI inference is not supported yet for add-entity input: {input}"
753 ))));
754 }
755
756 let path = PathBuf::from(input);
757 let absolute = if path.is_absolute() {
758 path
759 } else {
760 std::env::current_dir()?.join(path)
761 };
762 let metadata = fs::metadata(&absolute).map_err(|err| {
763 Box::new(IoError(format!(
764 "failed to access input at {}: {err}",
765 absolute.display()
766 ))) as Box<dyn std::error::Error + Send + Sync>
767 })?;
768 if metadata.is_dir() {
769 return Err(Box::new(ConfigError(format!(
770 "add-entity expects a file input for schema inference, got directory: {}",
771 absolute.display()
772 ))));
773 }
774 Ok(absolute)
775}
776
777fn normalize_persisted_input_path(input: &str, local_input_path: &Path) -> String {
778 if let Some((scheme, _)) = input.split_once("://") {
779 if scheme.eq_ignore_ascii_case("file") {
780 return local_input_path.display().to_string();
781 }
782 }
783 input.to_string()
784}
785
786fn derive_entity_name(input: &str) -> FloeResult<String> {
787 let raw = if let Some((scheme, _)) = input.split_once("://") {
788 if scheme.eq_ignore_ascii_case("file") {
789 let url = Url::parse(input)?;
790 let path = url.to_file_path().map_err(|_| {
791 Box::new(ConfigError(format!(
792 "invalid file:// URI for add-entity input: {input}"
793 ))) as Box<dyn std::error::Error + Send + Sync>
794 })?;
795 file_name_stem_string(&path)
796 } else {
797 let trimmed = input.trim_end_matches('/');
798 let last = trimmed.rsplit('/').next().unwrap_or(trimmed);
799 last.split('?').next().unwrap_or(last).to_string()
800 }
801 } else {
802 file_name_stem_string(Path::new(input))
803 };
804 let normalized = slugify_entity_name(&raw);
805 if normalized.is_empty() {
806 return Err(Box::new(ConfigError(format!(
807 "failed to derive entity name from input: {input}"
808 ))));
809 }
810 Ok(normalized)
811}
812
813fn file_name_stem_string(path: &Path) -> String {
814 path.file_stem()
815 .or_else(|| path.file_name())
816 .map(|name| name.to_string_lossy().to_string())
817 .unwrap_or_else(|| "entity".to_string())
818}
819
820fn slugify_entity_name(value: &str) -> String {
821 let mut out = String::new();
822 let mut last_was_sep = false;
823 for ch in value.chars() {
824 if ch.is_ascii_alphanumeric() {
825 out.push(ch.to_ascii_lowercase());
826 last_was_sep = false;
827 } else if !last_was_sep {
828 out.push('_');
829 last_was_sep = true;
830 }
831 }
832 out.trim_matches('_').to_string()
833}
834
835fn sniff_delimiter(path: &Path) -> Option<char> {
836 let file = std::fs::File::open(path).ok()?;
837 let reader = BufReader::new(file);
838 let mut lines = Vec::new();
839 for line in reader.lines().take(12) {
840 let line = line.ok()?;
841 let trimmed = line.trim();
842 if trimmed.is_empty() {
843 continue;
844 }
845 lines.push(trimmed.to_string());
846 if lines.len() >= 6 {
847 break;
848 }
849 }
850 if lines.is_empty() {
851 return None;
852 }
853
854 let candidates = [',', ';', '\t', '|'];
855 let mut best: Option<(i32, char)> = None;
856 for candidate in candidates {
857 let counts = lines
858 .iter()
859 .map(|line| line.chars().filter(|ch| *ch == candidate).count())
860 .collect::<Vec<_>>();
861 let first = *counts.first().unwrap_or(&0);
862 let min = *counts.iter().min().unwrap_or(&0);
863 let max = *counts.iter().max().unwrap_or(&0);
864 let sum = counts.iter().sum::<usize>();
865 if first == 0 || sum == 0 {
866 continue;
867 }
868 let consistency_bonus = if min > 0 && max == min { 10_000 } else { 0 };
869 let score = consistency_bonus + (min as i32 * 100) + sum as i32;
870 if best
871 .map(|(best_score, _)| score > best_score)
872 .unwrap_or(true)
873 {
874 best = Some((score, candidate));
875 }
876 }
877 best.map(|(_, candidate)| candidate)
878}