schema_registry_cli/command/
subject.rs1use std::ffi::OsStr;
2use std::path::Path;
3use std::str::FromStr;
4
5use schema_registry_api::{
6 CompatibilityResult, SchemaId, SchemaRegistry, SchemaVersion, Subject, SubjectName,
7};
8
9use crate::{
10 CheckCompatibility, DeleteSubject, ListSubjects, RegisterSchemaSettings, Result,
11 SchemaRegistrySettings,
12};
13
14use super::build_register_schema;
15
16pub async fn list_subjects(
22 client_settings: SchemaRegistrySettings,
23 all: bool,
24 prefix: Option<&str>,
25) -> Result<Vec<Subject>> {
26 let client = SchemaRegistry::try_from(client_settings)?;
27 let names = client.subject().list(prefix, Some(all)).await?;
28 let mut result = Vec::with_capacity(names.len());
29 for name in &names {
30 let subject = client
31 .subject()
32 .version(name, SchemaVersion::Latest)
33 .await?;
34 if let Some(subject) = subject {
35 result.push(subject);
36 }
37 }
38 Ok(result)
39}
40
41pub(crate) async fn display_list_subjects(list_subjects: ListSubjects) -> Result<()> {
42 let ListSubjects {
43 schema_registry,
44 all,
45 prefix,
46 } = list_subjects;
47 let subjects = self::list_subjects(schema_registry, all, prefix.as_deref()).await?;
48
49 if subjects.is_empty() {
51 println!("No subject founds");
52 } else {
53 println!("Found {} subjects", subjects.len());
54 for subject in &subjects {
55 let Subject {
56 subject,
57 id,
58 version,
59 schema_type,
60 ..
61 } = subject;
62 println!("Subject '{subject}', schema [{schema_type}] #{id} v{version}");
63 }
64 }
65
66 Ok(())
67}
68
69pub async fn check_compatibility(
75 client_settings: SchemaRegistrySettings,
76 subject: Option<SubjectName>,
77 path: &Path,
78 version: Option<SchemaVersion>,
79 verbose: bool,
80) -> Result<CompatibilityResult> {
81 let client = SchemaRegistry::try_from(client_settings)?;
82 let schema = build_register_schema(path)?;
83 let subject = subject_from_path_option(path, subject)?;
84 let result = if let Some(version) = version {
85 client
86 .compatibility()
87 .check_version(&subject, version, &schema, Some(verbose))
88 .await?
89 } else {
90 client
91 .compatibility()
92 .check_versions(&subject, &schema, Some(verbose))
93 .await?
94 };
95 Ok(result)
96}
97
98pub(crate) async fn display_check_compatibility(
99 check_compatibility: CheckCompatibility,
100 verbose: bool,
101) -> Result<()> {
102 let CheckCompatibility {
103 schema_registry,
104 subject,
105 version,
106 path,
107 } = check_compatibility;
108
109 let result =
110 self::check_compatibility(schema_registry, subject, &path, version, verbose).await?;
111
112 if result.is_compatible {
114 println!("✅ the schema is compatible");
115 } else {
116 println!("❌ the schema is NOT compatible");
117 }
118
119 Ok(())
120}
121
122fn subject_from_path_option(path: &Path, subject: Option<SubjectName>) -> Result<SubjectName> {
123 if let Some(s) = subject {
124 Ok(s)
125 } else {
126 let name = path
127 .file_stem()
128 .and_then(OsStr::to_str)
129 .expect("Should have a file name");
130 let result = SubjectName::from_str(name)?;
131 Ok(result)
132 }
133}
134
135pub async fn register_schema(
141 client_settings: SchemaRegistrySettings,
142 subject: Option<SubjectName>,
143 path: &Path,
144 normalize: bool,
145) -> Result<SchemaId> {
146 let client = SchemaRegistry::try_from(client_settings)?;
147 let schema = build_register_schema(path)?;
148 let subject = subject_from_path_option(path, subject)?;
149 let result = client
150 .subject()
151 .new_version(&subject, &schema, Some(normalize))
152 .await?;
153 Ok(result.id)
154}
155
156pub(crate) async fn display_register_schema(register_schema: RegisterSchemaSettings) -> Result<()> {
157 let RegisterSchemaSettings {
158 schema_registry,
159 subject,
160 normalize,
161 path,
162 } = register_schema;
163
164 let result = self::register_schema(schema_registry, subject, &path, normalize).await?;
165
166 println!("Registered schema id: {result}");
168
169 Ok(())
170}
171
172pub async fn delete_subject(
178 client_settings: SchemaRegistrySettings,
179 subject: &SubjectName,
180 version: Option<SchemaVersion>,
181 permanent: bool,
182) -> Result<Vec<SchemaVersion>> {
183 let client = SchemaRegistry::try_from(client_settings)?;
184 let result = if let Some(version) = version {
185 client
186 .subject()
187 .delete_version(subject, version, Some(permanent))
188 .await?
189 .into_iter()
190 .collect()
191 } else {
192 client.subject().delete(subject, Some(permanent)).await?
193 };
194 Ok(result)
195}
196
197pub(crate) async fn display_delete_subject(list_subjects: DeleteSubject) -> Result<()> {
198 let DeleteSubject {
199 schema_registry,
200 subject,
201 version,
202 permanent,
203 } = list_subjects;
204 let versions = self::delete_subject(schema_registry, &subject, version, permanent).await?;
205
206 if versions.is_empty() {
208 println!("No subject version deleted");
209 } else {
210 println!("Delete {} versions", versions.len());
211 for version in &versions {
212 println!("{version}");
213 }
214 }
215
216 Ok(())
217}