use alloc::sync::Arc;
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Mutex;
use anyhow::Result;
use lintel_schema_cache::{CacheStatus, SchemaCache};
use schema_catalog::FileFormat;
use serde::{Deserialize, Serialize};
#[derive(Clone)]
pub struct ProcessedSchemas {
inner: Arc<Mutex<HashMap<String, serde_json::Value>>>,
output_dir: PathBuf,
}
impl ProcessedSchemas {
pub fn new(output_dir: &Path) -> Self {
Self {
inner: Arc::new(Mutex::new(HashMap::new())),
output_dir: output_dir.to_path_buf(),
}
}
pub fn insert(&self, path: &Path, value: &serde_json::Value) {
let relative = path
.strip_prefix(&self.output_dir)
.unwrap_or(path)
.to_string_lossy()
.to_string();
self.inner
.lock()
.expect("ProcessedSchemas lock poisoned")
.insert(relative, value.clone());
}
pub fn get(&self, relative_path: &str) -> Option<serde_json::Value> {
self.inner
.lock()
.expect("ProcessedSchemas lock poisoned")
.get(relative_path)
.cloned()
}
pub fn get_by_path(&self, path: &Path) -> Option<serde_json::Value> {
let relative = path
.strip_prefix(&self.output_dir)
.unwrap_or(path)
.to_string_lossy();
self.get(&relative)
}
pub fn len(&self) -> usize {
self.inner
.lock()
.expect("ProcessedSchemas lock poisoned")
.len()
}
pub fn keys(&self) -> Vec<String> {
self.inner
.lock()
.expect("ProcessedSchemas lock poisoned")
.keys()
.cloned()
.collect()
}
}
const MAX_SCHEMA_SIZE: u64 = 10 * 1024 * 1024;
#[allow(clippy::trivially_copy_pass_by_ref)] fn is_false(b: &bool) -> bool {
!b
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct LintelExtra {
#[serde(default)]
pub source: String,
#[serde(default)]
pub source_sha256: String,
#[serde(default, skip_serializing_if = "is_false")]
pub invalid: bool,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub file_match: Vec<String>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub parsers: Vec<FileFormat>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub catalog_description: Option<String>,
}
pub fn parse_lintel_extra(value: &serde_json::Value) -> Option<LintelExtra> {
value
.get("x-lintel")
.and_then(|v| serde_json::from_value::<LintelExtra>(v.clone()).ok())
}
pub async fn fetch_one(cache: &SchemaCache, url: &str) -> Result<(serde_json::Value, CacheStatus)> {
let (value, status) = cache.fetch(url).await.map_err(|e| anyhow::anyhow!("{e}"))?;
Ok((value, status))
}
pub async fn write_schema_json(
value: &serde_json::Value,
path: &Path,
processed: &ProcessedSchemas,
) -> Result<()> {
processed.insert(path, value);
let text = serde_json::to_string_pretty(value)?;
if text.len() as u64 > MAX_SCHEMA_SIZE {
anyhow::bail!(
"schema too large ({} MiB, limit {} MiB)",
text.len() / (1024 * 1024),
MAX_SCHEMA_SIZE / (1024 * 1024),
);
}
if let Some(parent) = path.parent() {
tokio::fs::create_dir_all(parent).await?;
}
tokio::fs::write(path, format!("{text}\n")).await?;
Ok(())
}