use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::time::Instant;
use crate::io::storage::Target;
use crate::{config, report, FloeResult, RunOptions};
pub struct RunContext {
pub config: config::RootConfig,
pub config_path: PathBuf,
pub config_dir: PathBuf,
pub storage_resolver: config::StorageResolver,
pub catalog_resolver: config::CatalogResolver,
pub report_base_path: Option<String>,
pub report_target: Option<Target>,
pub run_id: String,
pub started_at: String,
pub run_timer: Instant,
pub full_refresh: bool,
}
impl RunContext {
pub fn new(
config_path: &Path,
config_base: config::ConfigBase,
options: &RunOptions,
profile_vars: HashMap<String, String>,
) -> FloeResult<Self> {
let mut config = config::parse_config_with_vars(config_path, &profile_vars)?;
crate::apply_profile_catalogs(
&mut config,
options
.profile
.as_ref()
.and_then(|profile| profile.catalogs.as_ref()),
);
crate::apply_profile_storages(
&mut config,
options
.profile
.as_ref()
.and_then(|profile| profile.storages.as_ref()),
);
crate::apply_profile_lineage(
&mut config,
options
.profile
.as_ref()
.and_then(|profile| profile.lineage.as_ref()),
);
let storage_resolver = config::StorageResolver::new(&config, config_base)?;
let catalog_resolver = config::CatalogResolver::new(&config)?;
let config_dir =
crate::io::storage::paths::normalize_local_path(storage_resolver.config_dir());
let config_path = crate::io::storage::paths::normalize_local_path(config_path);
let (report_target, report_base_path) = match config.report.as_ref() {
Some(report) => {
let resolved = storage_resolver
.resolve_report_path(report.storage.as_deref(), &report.path)?;
let target = Target::from_resolved(&resolved)?;
let base_path = match resolved.local_path.as_ref() {
Some(path) => path.display().to_string(),
None => resolved.uri.clone(),
};
(Some(target), Some(base_path))
}
None => (None, None),
};
let started_at = report::now_rfc3339();
let run_id = options
.run_id
.clone()
.unwrap_or_else(|| report::run_id_from_timestamp(&started_at));
Ok(Self {
config,
config_path,
config_dir,
storage_resolver,
catalog_resolver,
report_base_path,
report_target,
run_id,
started_at,
run_timer: Instant::now(),
full_refresh: options.full_refresh,
})
}
pub fn from_config(
config: config::RootConfig,
config_base: config::ConfigBase,
manifest_path: &Path,
report_base_uri: &str,
options: &RunOptions,
) -> FloeResult<Self> {
let mut storage_resolver = config::StorageResolver::new(&config, config_base)?;
let catalog_resolver = config::CatalogResolver::new(&config)?;
let config_dir =
crate::io::storage::paths::normalize_local_path(storage_resolver.config_dir());
let manifest_str = manifest_path.to_string_lossy();
let config_path = if config::is_remote_uri(&manifest_str) {
std::path::PathBuf::from(manifest_str.as_ref())
} else {
crate::io::storage::paths::normalize_local_path(manifest_path)
};
let (report_target, report_base_path) =
if !report_base_uri.is_empty() && report_base_uri != "report" {
let local_path = if let Some(stripped) = report_base_uri.strip_prefix("local://") {
Some(std::path::PathBuf::from(stripped))
} else if !report_base_uri.contains("://") {
Some(std::path::PathBuf::from(report_base_uri))
} else {
None
};
let base_path = local_path
.as_ref()
.map(|p| p.display().to_string())
.unwrap_or_else(|| report_base_uri.to_string());
let report_target = if let Some(path) = local_path.as_ref() {
let resolved = config::ResolvedPath {
storage: "local".to_string(),
uri: report_base_uri.to_string(),
local_path: Some(path.clone()),
};
Target::from_resolved(&resolved).ok()
} else if config::is_remote_uri(report_base_uri) {
build_remote_report_target(report_base_uri, &mut storage_resolver)
} else {
None
};
(report_target, Some(base_path))
} else {
(None, None)
};
let started_at = report::now_rfc3339();
let run_id = options
.run_id
.clone()
.unwrap_or_else(|| report::run_id_from_timestamp(&started_at));
Ok(Self {
config,
config_path,
config_dir,
storage_resolver,
catalog_resolver,
report_base_path,
report_target,
run_id,
started_at,
run_timer: Instant::now(),
full_refresh: options.full_refresh,
})
}
}
fn build_remote_report_target(uri: &str, resolver: &mut config::StorageResolver) -> Option<Target> {
let storage_name = if let Some(name) = resolver.find_definition_name_for_uri(uri) {
name
} else if let Some((def, name)) = synthetic_storage_for_uri(uri) {
resolver.register_definition(def);
name
} else {
return None;
};
let resolved = config::ResolvedPath {
storage: storage_name,
uri: uri.to_string(),
local_path: None,
};
Target::from_resolved(&resolved).ok()
}
fn synthetic_storage_for_uri(uri: &str) -> Option<(config::StorageDefinition, String)> {
use crate::io::storage::{adls, gcs, s3};
if uri.starts_with("s3://") {
let loc = s3::parse_s3_uri(uri).ok()?;
let name = "__report_s3__".to_string();
return Some((
config::StorageDefinition {
name: name.clone(),
fs_type: "s3".to_string(),
bucket: Some(loc.bucket),
region: None,
endpoint: None,
path_style_access: None,
account: None,
container: None,
prefix: None,
},
name,
));
}
if uri.starts_with("gs://") {
let loc = gcs::parse_gcs_uri(uri).ok()?;
let name = "__report_gcs__".to_string();
return Some((
config::StorageDefinition {
name: name.clone(),
fs_type: "gcs".to_string(),
bucket: Some(loc.bucket),
region: None,
endpoint: None,
path_style_access: None,
account: None,
container: None,
prefix: None,
},
name,
));
}
if uri.starts_with("abfs://") {
let loc = adls::parse_adls_uri(uri).ok()?;
let name = "__report_adls__".to_string();
return Some((
config::StorageDefinition {
name: name.clone(),
fs_type: "adls".to_string(),
bucket: None,
region: None,
endpoint: None,
path_style_access: None,
account: Some(loc.account),
container: Some(loc.container),
prefix: None,
},
name,
));
}
None
}
#[cfg(test)]
mod tests {
use std::path::Path;
#[test]
fn remote_uri_preserved_via_pathbuf_from() {
let uri = "s3://bucket/manifests/prod.json";
let normalized = crate::io::storage::paths::normalize_local_path(Path::new(uri));
assert_ne!(
normalized.display().to_string(),
uri,
"normalize_local_path should mangle s3:// (confirming the bug we guard against)"
);
let preserved = std::path::PathBuf::from(uri);
assert_eq!(preserved.display().to_string(), uri);
}
}