schema_registry_cli/command/
subject.rs

1use std::ffi::OsStr;
2use std::path::Path;
3use std::str::FromStr;
4
5use schema_registry_api::{
6    CompatibilityResult, SchemaId, SchemaRegistry, SchemaVersion, Subject, SubjectName,
7};
8
9use crate::{
10    CheckCompatibility, DeleteSubject, ListSubjects, RegisterSchemaSettings, Result,
11    SchemaRegistrySettings,
12};
13
14use super::build_register_schema;
15
16/// List subjects
17///
18/// # Errors
19///
20/// Fail if the API fail
21pub async fn list_subjects(
22    client_settings: SchemaRegistrySettings,
23    all: bool,
24    prefix: Option<&str>,
25) -> Result<Vec<Subject>> {
26    let client = SchemaRegistry::try_from(client_settings)?;
27    let names = client.subject().list(prefix, Some(all)).await?;
28    let mut result = Vec::with_capacity(names.len());
29    for name in &names {
30        let subject = client
31            .subject()
32            .version(name, SchemaVersion::Latest)
33            .await?;
34        if let Some(subject) = subject {
35            result.push(subject);
36        }
37    }
38    Ok(result)
39}
40
41pub(crate) async fn display_list_subjects(list_subjects: ListSubjects) -> Result<()> {
42    let ListSubjects {
43        schema_registry,
44        all,
45        prefix,
46    } = list_subjects;
47    let subjects = self::list_subjects(schema_registry, all, prefix.as_deref()).await?;
48
49    // Display
50    if subjects.is_empty() {
51        println!("No subject founds");
52    } else {
53        println!("Found {} subjects", subjects.len());
54        for subject in &subjects {
55            let Subject {
56                subject,
57                id,
58                version,
59                schema_type,
60                ..
61            } = subject;
62            println!("Subject '{subject}', schema [{schema_type}] #{id} v{version}");
63        }
64    }
65
66    Ok(())
67}
68
69/// Check schema compatibility for a subject
70///
71/// # Errors
72///
73/// Fail if the API fail
74pub async fn check_compatibility(
75    client_settings: SchemaRegistrySettings,
76    subject: Option<SubjectName>,
77    path: &Path,
78    version: Option<SchemaVersion>,
79    verbose: bool,
80) -> Result<CompatibilityResult> {
81    let client = SchemaRegistry::try_from(client_settings)?;
82    let schema = build_register_schema(path)?;
83    let subject = subject_from_path_option(path, subject)?;
84    let result = if let Some(version) = version {
85        client
86            .compatibility()
87            .check_version(&subject, version, &schema, Some(verbose))
88            .await?
89    } else {
90        client
91            .compatibility()
92            .check_versions(&subject, &schema, Some(verbose))
93            .await?
94    };
95    Ok(result)
96}
97
98pub(crate) async fn display_check_compatibility(
99    check_compatibility: CheckCompatibility,
100    verbose: bool,
101) -> Result<()> {
102    let CheckCompatibility {
103        schema_registry,
104        subject,
105        version,
106        path,
107    } = check_compatibility;
108
109    let result =
110        self::check_compatibility(schema_registry, subject, &path, version, verbose).await?;
111
112    // Display
113    if result.is_compatible {
114        println!("✅ the schema is compatible");
115    } else {
116        println!("❌ the schema is NOT compatible");
117    }
118
119    Ok(())
120}
121
122fn subject_from_path_option(path: &Path, subject: Option<SubjectName>) -> Result<SubjectName> {
123    if let Some(s) = subject {
124        Ok(s)
125    } else {
126        let name = path
127            .file_stem()
128            .and_then(OsStr::to_str)
129            .expect("Should have a file name");
130        let result = SubjectName::from_str(name)?;
131        Ok(result)
132    }
133}
134
135/// Register a schema for a subject
136///
137/// # Errors
138///
139/// Fail if the API fail
140pub async fn register_schema(
141    client_settings: SchemaRegistrySettings,
142    subject: Option<SubjectName>,
143    path: &Path,
144    normalize: bool,
145) -> Result<SchemaId> {
146    let client = SchemaRegistry::try_from(client_settings)?;
147    let schema = build_register_schema(path)?;
148    let subject = subject_from_path_option(path, subject)?;
149    let result = client
150        .subject()
151        .new_version(&subject, &schema, Some(normalize))
152        .await?;
153    Ok(result.id)
154}
155
156pub(crate) async fn display_register_schema(register_schema: RegisterSchemaSettings) -> Result<()> {
157    let RegisterSchemaSettings {
158        schema_registry,
159        subject,
160        normalize,
161        path,
162    } = register_schema;
163
164    let result = self::register_schema(schema_registry, subject, &path, normalize).await?;
165
166    // Display
167    println!("Registered schema id: {result}");
168
169    Ok(())
170}
171
172/// Delete subject
173///
174/// # Errors
175///
176/// Fail if the API fail
177pub async fn delete_subject(
178    client_settings: SchemaRegistrySettings,
179    subject: &SubjectName,
180    version: Option<SchemaVersion>,
181    permanent: bool,
182) -> Result<Vec<SchemaVersion>> {
183    let client = SchemaRegistry::try_from(client_settings)?;
184    let result = if let Some(version) = version {
185        client
186            .subject()
187            .delete_version(subject, version, Some(permanent))
188            .await?
189            .into_iter()
190            .collect()
191    } else {
192        client.subject().delete(subject, Some(permanent)).await?
193    };
194    Ok(result)
195}
196
197pub(crate) async fn display_delete_subject(list_subjects: DeleteSubject) -> Result<()> {
198    let DeleteSubject {
199        schema_registry,
200        subject,
201        version,
202        permanent,
203    } = list_subjects;
204    let versions = self::delete_subject(schema_registry, &subject, version, permanent).await?;
205
206    // Display
207    if versions.is_empty() {
208        println!("No subject version deleted");
209    } else {
210        println!("Delete {} versions", versions.len());
211        for version in &versions {
212            println!("{version}");
213        }
214    }
215
216    Ok(())
217}