schema_registry_api/service/
subject.rs1use crate::{
2 RegisterSchema, RegisteredSchema, SchemaId, SchemaRegistryError, SchemaVersion, Subject,
3 SubjectName,
4};
5
6use super::SchemaRegistry;
7
8#[derive(Debug)]
10pub struct SubjectClient<'sr> {
11 pub(super) sr: &'sr SchemaRegistry,
12}
13
14impl SubjectClient<'_> {
15 #[tracing::instrument(skip(self))]
22 pub async fn list(
23 &self,
24 subject_prefix: Option<&str>,
25 deleted: Option<bool>,
26 ) -> Result<Vec<SubjectName>, SchemaRegistryError> {
27 let mut url = self.sr.base_url.join("subjects")?;
28 if let Some(subject_prefix) = subject_prefix {
29 url.query_pairs_mut()
30 .append_pair("subjectPrefix", subject_prefix);
31 }
32 if let Some(deleted) = deleted {
33 url.query_pairs_mut()
34 .append_pair("deleted", deleted.to_string().as_str());
35 }
36 self.sr.get(url).await
37 }
38
39 #[tracing::instrument(skip(self))]
46 pub async fn versions(
47 &self,
48 name: &SubjectName,
49 ) -> Result<Vec<SchemaVersion>, SchemaRegistryError> {
50 let path = format!("subjects/{name}/versions");
51 let url = self.sr.base_url.join(&path)?;
52 self.sr.get(url).await
53 }
54
55 #[tracing::instrument(skip(self))]
62 pub async fn delete(
63 &self,
64 name: &SubjectName,
65 permanent: Option<bool>,
66 ) -> Result<Vec<SchemaVersion>, SchemaRegistryError> {
67 let path = format!("subjects/{name}");
68 let mut url = self.sr.base_url.join(&path)?;
69 if let Some(permanent) = permanent {
70 let query = format!("permanent={permanent}");
71 url.set_query(Some(&query));
72 }
73
74 self.sr.delete(url).await
75 }
76
77 #[tracing::instrument(skip(self))]
84 pub async fn version(
85 &self,
86 name: &SubjectName,
87 version: SchemaVersion,
88 ) -> Result<Option<Subject>, SchemaRegistryError> {
89 let path = format!("subjects/{name}/versions/{version}");
90 let url = self.sr.base_url.join(&path)?;
91 self.sr.get_optional(url).await
92 }
93
94 #[tracing::instrument(skip(self))]
101 pub async fn schema(
102 &self,
103 name: &SubjectName,
104 version: SchemaVersion,
105 ) -> Result<Option<String>, SchemaRegistryError> {
106 let path = format!("subjects/{name}/versions/{version}/schema");
107 let url = self.sr.base_url.join(&path)?;
108 self.sr.get_optional_string(url).await
109 }
110
111 #[tracing::instrument(skip(self))]
118 pub async fn new_version(
119 &self,
120 name: &SubjectName,
121 schema: &RegisterSchema,
122 normalize: Option<bool>,
123 ) -> Result<RegisteredSchema, SchemaRegistryError> {
124 let path = format!("subjects/{name}/versions");
125 let mut url = self.sr.base_url.join(&path)?;
126 if let Some(normalize) = normalize {
127 let query = format!("normalize={normalize}");
128 url.set_query(Some(&query));
129 }
130 self.sr.post(url, schema).await
131 }
132
133 #[tracing::instrument(skip(self))]
140 pub async fn check_schema(
141 &self,
142 name: &SubjectName,
143 schema: &RegisterSchema,
144 normalize: Option<bool>,
145 ) -> Result<Subject, SchemaRegistryError> {
146 let path = format!("subjects/{name}");
147 let mut url = self.sr.base_url.join(&path)?;
148 if let Some(normalize) = normalize {
149 let query = format!("normalize={normalize}");
150 url.set_query(Some(&query));
151 }
152 self.sr.post(url, schema).await
153 }
154
155 #[tracing::instrument(skip(self))]
162 pub async fn delete_version(
163 &self,
164 name: &SubjectName,
165 version: SchemaVersion,
166 permanent: Option<bool>,
167 ) -> Result<Option<SchemaVersion>, SchemaRegistryError> {
168 let path = format!("subjects/{name}/versions/{version}");
169 let mut url = self.sr.base_url.join(&path)?;
170 if let Some(permanent) = permanent {
171 let query = format!("permanent={permanent}");
172 url.set_query(Some(&query));
173 }
174
175 self.sr.delete_option(url).await
176 }
177
178 #[tracing::instrument(skip(self))]
185 pub async fn referenced_by(
186 &self,
187 name: &SubjectName,
188 version: SchemaVersion,
189 ) -> Result<Vec<SchemaId>, SchemaRegistryError> {
190 let path = format!("subjects/{name}/versions/{version}/referencedby");
191 let url = self.sr.base_url.join(&path)?;
192 self.sr.get(url).await
193 }
194}