use std::collections::HashMap;
use std::collections::HashSet;
use std::path::{Path, PathBuf};
use anyhow::{Context, Result, bail};
use lintel_schema_cache::SchemaCache;
use schema_catalog::SchemaEntry;
use tracing::info;
use crate::download::{ProcessedSchemas, fetch_one};
use crate::refs::{RefRewriteContext, resolve_and_rewrite, resolve_and_rewrite_value};
use lintel_catalog_builder::config::SchemaDefinition;
use super::GenerateContext;
use super::util::{
extract_lintel_meta, first_line, prefetch_versions, process_fetched_versions, resolve_latest_id,
};
pub(super) struct GroupSchemaContext<'a> {
pub(super) generate: &'a GenerateContext<'a>,
pub(super) group_dir: &'a Path,
pub(super) group_key: &'a str,
pub(super) trimmed_base: &'a str,
pub(super) processed: &'a ProcessedSchemas,
pub(super) source_base_url: Option<&'a str>,
pub(super) sibling_urls: HashMap<String, String>,
}
fn local_source_id(ctx: &GroupSchemaContext<'_>, relative_path: &str) -> String {
if let Some(base) = ctx.source_base_url {
format!("{}/{relative_path}", base.trim_end_matches('/'))
} else {
relative_path.to_string()
}
}
#[allow(clippy::too_many_lines)]
pub(super) async fn process_group_schema(
ctx: &GroupSchemaContext<'_>,
key: &str,
schema_def: &SchemaDefinition,
output_paths: &mut HashSet<PathBuf>,
) -> Result<SchemaEntry> {
let entry_dir = ctx.group_dir.join(key);
tokio::fs::create_dir_all(&entry_dir).await?;
let dest_path = entry_dir.join("latest.json");
let canonical_dest = entry_dir
.canonicalize()
.unwrap_or_else(|_| entry_dir.clone());
if !output_paths.insert(canonical_dest) {
bail!(
"output path collision: {} (group={}, key={key})",
entry_dir.display(),
ctx.group_key,
);
}
let schema_url = format!(
"{}/schemas/{}/{key}/latest.json",
ctx.trimmed_base, ctx.group_key
);
let (schema_fetch_result, version_results) = tokio::join!(
async {
if let Some(url) = &schema_def.url {
Some(fetch_one(ctx.generate.cache, url).await.with_context(|| {
format!("failed to download schema for {}/{key}", ctx.group_key)
}))
} else {
None
}
},
prefetch_versions(ctx.generate.cache, &schema_def.versions),
);
let shared_dir = entry_dir.join("_shared");
let shared_base_url = format!(
"{}/schemas/{}/{key}/_shared",
ctx.trimmed_base, ctx.group_key
);
let mut already_downloaded: HashMap<String, String> = HashMap::new();
let lintel_source = if schema_def.url.is_none() {
let relative_source = format!("schemas/{}/{key}.json", ctx.group_key);
let source_path = ctx.generate.config_dir.join(&relative_source);
if !source_path.exists() {
bail!(
"local schema not found: {} (expected for group={}, key={key})",
source_path.display(),
ctx.group_key,
);
}
let text = tokio::fs::read_to_string(&source_path)
.await
.with_context(|| format!("failed to read local schema {}", source_path.display()))?;
let source_id = local_source_id(ctx, &relative_source);
let hash = SchemaCache::hash_content(&text);
Some((source_id, hash, text))
} else {
None
};
let mut file_match = schema_def.file_match.clone();
let mut parsers = Vec::new();
if file_match.is_empty()
&& let Some((_, _, text)) = &lintel_source
&& let Ok(val) = serde_json::from_str::<serde_json::Value>(text)
{
(file_match, parsers) = extract_lintel_meta(&val);
}
let source_url = schema_def.url.clone().or_else(|| {
ctx.source_base_url.map(|base| {
format!(
"{}/schemas/{}/{key}.json",
base.trim_end_matches('/'),
ctx.group_key,
)
})
});
let local_dir = if schema_def.url.is_none() {
Some(ctx.generate.config_dir.join("schemas").join(ctx.group_key))
} else {
None
};
let mut ref_ctx = RefRewriteContext {
cache: ctx.generate.cache,
shared_dir: &shared_dir,
base_url_for_shared: &shared_base_url,
already_downloaded: &mut already_downloaded,
source_url,
processed: ctx.processed,
local_source_dir: local_dir.as_deref(),
sibling_urls: ctx.sibling_urls.clone(),
lintel_source: lintel_source
.as_ref()
.map(|(id, hash, _)| (id.clone(), hash.clone())),
file_match: file_match.clone(),
parsers,
};
if let Some(url) = &schema_def.url {
let (mut value, status) =
schema_fetch_result.expect("fetch result must exist when URL is present")?;
info!(url = %url, status = %status, "downloaded group schema");
if ref_ctx.file_match.is_empty() {
let (schema_file_match, schema_parsers) = extract_lintel_meta(&value);
if !schema_file_match.is_empty() {
file_match = schema_file_match;
ref_ctx.file_match.clone_from(&file_match);
ref_ctx.parsers = schema_parsers;
}
}
let schema_base_url = format!("{}/schemas/{}/{key}", ctx.trimmed_base, ctx.group_key,);
let resolved_url = resolve_latest_id(
ctx.generate.cache,
url,
&version_results,
&schema_url,
&schema_base_url,
);
resolve_and_rewrite_value(&mut ref_ctx, &mut value, &dest_path, &resolved_url).await?;
} else {
let (_, _, text) = lintel_source
.as_ref()
.expect("computed above for local schemas");
resolve_and_rewrite(&mut ref_ctx, text, &dest_path, &schema_url).await?;
}
let version_urls = process_fetched_versions(&mut ref_ctx, &entry_dir, version_results).await?;
let processed_value = ctx.processed.get_by_path(&dest_path).unwrap_or_default();
let schema_title = processed_value
.get("title")
.and_then(|v| v.as_str())
.map(String::from);
let schema_desc = processed_value
.get("description")
.and_then(|v| v.as_str())
.map(String::from);
let catalog_desc = crate::download::parse_lintel_extra(&processed_value)
.and_then(|extra| extra.catalog_description);
let name = schema_def
.name
.clone()
.or_else(|| schema_title.clone())
.unwrap_or_else(|| key.to_string());
let description = schema_def
.description
.clone()
.or(catalog_desc)
.or_else(|| schema_desc.as_deref().map(first_line))
.or(schema_title)
.unwrap_or_default();
Ok(SchemaEntry {
name,
description,
url: schema_url,
source_url: schema_def.url.clone(),
file_match,
versions: version_urls,
})
}