use std::ffi::OsStr;
use std::path::Path;
use std::str::FromStr;
use schema_registry_api::{
CompatibilityResult, SchemaId, SchemaRegistry, SchemaVersion, Subject, SubjectName,
};
use crate::{
CheckCompatibility, DeleteSubject, ListSubjects, RegisterSchemaSettings, Result,
SchemaRegistrySettings,
};
use super::build_register_schema;
pub async fn list_subjects(
client_settings: SchemaRegistrySettings,
all: bool,
prefix: Option<&str>,
) -> Result<Vec<Subject>> {
let client = SchemaRegistry::try_from(client_settings)?;
let names = client.subject().list(prefix, Some(all)).await?;
let mut result = Vec::with_capacity(names.len());
for name in &names {
let subject = client
.subject()
.version(name, SchemaVersion::Latest)
.await?;
if let Some(subject) = subject {
result.push(subject);
}
}
Ok(result)
}
pub(crate) async fn display_list_subjects(list_subjects: ListSubjects) -> Result<()> {
let ListSubjects {
schema_registry,
all,
prefix,
} = list_subjects;
let subjects = self::list_subjects(schema_registry, all, prefix.as_deref()).await?;
if subjects.is_empty() {
println!("No subject founds");
} else {
println!("Found {} subjects", subjects.len());
for subject in &subjects {
let Subject {
subject,
id,
version,
schema_type,
..
} = subject;
println!("Subject '{subject}', schema [{schema_type}] #{id} v{version}");
}
}
Ok(())
}
pub async fn check_compatibility(
client_settings: SchemaRegistrySettings,
subject: Option<SubjectName>,
path: &Path,
version: Option<SchemaVersion>,
verbose: bool,
) -> Result<CompatibilityResult> {
let client = SchemaRegistry::try_from(client_settings)?;
let schema = build_register_schema(path)?;
let subject = subject_from_path_option(path, subject)?;
let result = if let Some(version) = version {
client
.compatibility()
.check_version(&subject, version, &schema, Some(verbose))
.await?
} else {
client
.compatibility()
.check_versions(&subject, &schema, Some(verbose))
.await?
};
Ok(result)
}
pub(crate) async fn display_check_compatibility(
check_compatibility: CheckCompatibility,
verbose: bool,
) -> Result<()> {
let CheckCompatibility {
schema_registry,
subject,
version,
path,
} = check_compatibility;
let result =
self::check_compatibility(schema_registry, subject, &path, version, verbose).await?;
if result.is_compatible {
println!("✅ the schema is compatible");
} else {
println!("❌ the schema is NOT compatible");
}
Ok(())
}
fn subject_from_path_option(path: &Path, subject: Option<SubjectName>) -> Result<SubjectName> {
if let Some(s) = subject {
Ok(s)
} else {
let name = path
.file_stem()
.and_then(OsStr::to_str)
.expect("Should have a file name");
let result = SubjectName::from_str(name)?;
Ok(result)
}
}
pub async fn register_schema(
client_settings: SchemaRegistrySettings,
subject: Option<SubjectName>,
path: &Path,
normalize: bool,
) -> Result<SchemaId> {
let client = SchemaRegistry::try_from(client_settings)?;
let schema = build_register_schema(path)?;
let subject = subject_from_path_option(path, subject)?;
let result = client
.subject()
.new_version(&subject, &schema, Some(normalize))
.await?;
Ok(result.id)
}
pub(crate) async fn display_register_schema(register_schema: RegisterSchemaSettings) -> Result<()> {
let RegisterSchemaSettings {
schema_registry,
subject,
normalize,
path,
} = register_schema;
let result = self::register_schema(schema_registry, subject, &path, normalize).await?;
println!("Registered schema id: {result}");
Ok(())
}
pub async fn delete_subject(
client_settings: SchemaRegistrySettings,
subject: &SubjectName,
version: Option<SchemaVersion>,
permanent: bool,
) -> Result<Vec<SchemaVersion>> {
let client = SchemaRegistry::try_from(client_settings)?;
let result = if let Some(version) = version {
client
.subject()
.delete_version(subject, version, Some(permanent))
.await?
.into_iter()
.collect()
} else {
client.subject().delete(subject, Some(permanent)).await?
};
Ok(result)
}
pub(crate) async fn display_delete_subject(list_subjects: DeleteSubject) -> Result<()> {
let DeleteSubject {
schema_registry,
subject,
version,
permanent,
} = list_subjects;
let versions = self::delete_subject(schema_registry, &subject, version, permanent).await?;
if versions.is_empty() {
println!("No subject version deleted");
} else {
println!("Delete {} versions", versions.len());
for version in &versions {
println!("{version}");
}
}
Ok(())
}