Skip to main content

floe_core/
lib.rs

1use std::path::Path;
2
3mod add_entity;
4pub mod checks;
5pub mod config;
6pub mod errors;
7pub mod io;
8pub mod lineage;
9pub(crate) mod log;
10pub mod manifest;
11pub mod profile;
12pub mod report;
13pub mod run;
14pub mod runner;
15pub mod runtime;
16pub mod state;
17pub mod vars;
18pub mod warnings;
19
20pub use crate::state::{
21    inspect_entity_state, inspect_entity_state_with_base, reset_entity_state,
22    reset_entity_state_with_base,
23};
24pub use add_entity::{add_entity_to_config, AddEntityOptions, AddEntityOutcome};
25pub use checks as check;
26pub use config::{
27    resolve_config_location, upload_to_remote_uri, write_bytes_to_remote_uri, ConfigLocation,
28};
29pub use errors::ConfigError;
30pub use manifest::{
31    build_common_manifest_json, config_from_manifest_json, ManifestOptions, PathMode,
32};
33pub use profile::{
34    detect_malformed_placeholder, detect_unresolved_placeholders, parse_profile,
35    parse_profile_from_str, validate_merged_vars, validate_profile, ProfileConfig,
36};
37pub use run::events::{set_observer, MultiObserver, RunEvent, RunObserver};
38pub use run::{
39    run, run_with_base, run_with_manifest_path, DryRunEntityPreview, EntityOutcome, RunOutcome,
40};
41pub use runner::{parse_run_status_from_logs, ConnectorRunStatus};
42pub use runtime::{DefaultRuntime, Runtime};
43pub use vars::{resolve_vars, VarSources};
44
45pub type FloeResult<T> = Result<T, Box<dyn std::error::Error + Send + Sync>>;
46
47#[derive(Debug, Default)]
48pub struct ValidateOptions {
49    pub entities: Vec<String>,
50    pub profile_vars: std::collections::HashMap<String, String>,
51    pub profile_catalogs: Option<config::CatalogsConfig>,
52    pub profile_storages: Option<config::StoragesConfig>,
53    pub profile_lineage: Option<config::LineageConfig>,
54}
55
56#[derive(Debug, Default)]
57pub struct RunOptions {
58    pub run_id: Option<String>,
59    pub entities: Vec<String>,
60    pub dry_run: bool,
61    pub full_refresh: bool,
62    pub profile: Option<ProfileConfig>,
63}
64
65pub fn validate(config_path: &Path, options: ValidateOptions) -> FloeResult<()> {
66    let config_base = config::ConfigBase::local_from_path(config_path);
67    validate_with_base(config_path, config_base, options)
68}
69
70pub fn validate_with_base(
71    config_path: &Path,
72    _config_base: config::ConfigBase,
73    options: ValidateOptions,
74) -> FloeResult<()> {
75    let mut config = config::parse_config_with_vars(config_path, &options.profile_vars)?;
76    apply_profile_catalogs(&mut config, options.profile_catalogs.as_ref());
77    apply_profile_storages(&mut config, options.profile_storages.as_ref());
78    apply_profile_lineage(&mut config, options.profile_lineage.as_ref());
79    config::validate_config(&config)?;
80
81    if !options.entities.is_empty() {
82        run::validate_entities(&config, &options.entities)?;
83    }
84
85    Ok(())
86}
87
88pub fn load_config(config_path: &Path) -> FloeResult<config::RootConfig> {
89    config::parse_config(config_path)
90}
91
92/// Read manifest JSON from any supported URI (local path, `s3://`, `gs://`, `abfs://`).
93/// For remote URIs the file is downloaded to a temp directory that is cleaned up before
94/// this function returns; the caller receives the raw JSON text as a `String`.
95pub fn read_manifest_text(uri: &str) -> FloeResult<String> {
96    let location = config::resolve_config_location(uri)?;
97    let text = std::fs::read_to_string(&location.path)?;
98    Ok(text)
99}
100
101pub fn load_config_with_profile_vars(
102    config_path: &Path,
103    profile_vars: &std::collections::HashMap<String, String>,
104) -> FloeResult<config::RootConfig> {
105    config::parse_config_with_vars(config_path, profile_vars)
106}
107
108pub fn load_config_with_profile_overrides(
109    config_path: &Path,
110    profile_vars: &std::collections::HashMap<String, String>,
111    profile_catalogs: Option<&config::CatalogsConfig>,
112    profile_storages: Option<&config::StoragesConfig>,
113    profile_lineage: Option<&config::LineageConfig>,
114) -> FloeResult<config::RootConfig> {
115    let mut config = config::parse_config_with_vars(config_path, profile_vars)?;
116    apply_profile_catalogs(&mut config, profile_catalogs);
117    apply_profile_storages(&mut config, profile_storages);
118    apply_profile_lineage(&mut config, profile_lineage);
119    Ok(config)
120}
121
122pub fn validate_profile_file(profile_path: &Path) -> FloeResult<ProfileConfig> {
123    let profile = parse_profile(profile_path)?;
124    validate_profile(&profile)?;
125    Ok(profile)
126}
127
128pub(crate) fn apply_profile_catalogs(
129    config: &mut config::RootConfig,
130    profile_catalogs: Option<&config::CatalogsConfig>,
131) {
132    if let Some(catalogs) = profile_catalogs {
133        config.catalogs = Some(catalogs.clone());
134    }
135}
136
137pub(crate) fn apply_profile_storages(
138    config: &mut config::RootConfig,
139    profile_storages: Option<&config::StoragesConfig>,
140) {
141    if let Some(storages) = profile_storages {
142        config.storages = Some(storages.clone());
143    }
144}
145
146pub(crate) fn apply_profile_lineage(
147    config: &mut config::RootConfig,
148    profile_lineage: Option<&config::LineageConfig>,
149) {
150    if let Some(lineage) = profile_lineage {
151        config.lineage = Some(lineage.clone());
152    }
153}
154
155pub fn extract_config_env_vars(
156    config_path: &Path,
157) -> FloeResult<std::collections::HashMap<String, String>> {
158    Ok(config::extract_raw_env_vars(config_path).unwrap_or_default())
159}
160
161pub fn validate_config_for_tests(config: &config::RootConfig) -> FloeResult<()> {
162    config::validate_config(config)
163}