schema_registry_api/service/
subject.rs

1use crate::{
2    RegisterSchema, RegisteredSchema, SchemaId, SchemaRegistryError, SchemaVersion, Subject,
3    SubjectName,
4};
5
6use super::SchemaRegistry;
7
8/// The subject client
9#[derive(Debug)]
10pub struct SubjectClient<'sr> {
11    pub(super) sr: &'sr SchemaRegistry,
12}
13
14impl SubjectClient<'_> {
15    /// A list of all subjects.
16    ///
17    /// # Errors
18    ///
19    /// Fail if we cannot send the query
20    /// Fail if the schema registry return an error
21    #[tracing::instrument(skip(self))]
22    pub async fn list(
23        &self,
24        subject_prefix: Option<&str>,
25        deleted: Option<bool>,
26    ) -> Result<Vec<SubjectName>, SchemaRegistryError> {
27        let mut url = self.sr.base_url.join("subjects")?;
28        if let Some(subject_prefix) = subject_prefix {
29            url.query_pairs_mut()
30                .append_pair("subjectPrefix", subject_prefix);
31        }
32        if let Some(deleted) = deleted {
33            url.query_pairs_mut()
34                .append_pair("deleted", deleted.to_string().as_str());
35        }
36        self.sr.get(url).await
37    }
38
39    /// Get a list of versions registered under the specified subject.
40    ///
41    /// # Errors
42    ///
43    /// Fail if we cannot send the query
44    /// Fail if the schema registry return an error
45    #[tracing::instrument(skip(self))]
46    pub async fn versions(
47        &self,
48        name: &SubjectName,
49    ) -> Result<Vec<SchemaVersion>, SchemaRegistryError> {
50        let path = format!("subjects/{name}/versions");
51        let url = self.sr.base_url.join(&path)?;
52        self.sr.get(url).await
53    }
54
55    /// Deletes the specified subject and its associated compatibility level if registered.
56    ///
57    /// # Errors
58    ///
59    /// Fail if we cannot send the query
60    /// Fail if the schema registry return an error
61    #[tracing::instrument(skip(self))]
62    pub async fn delete(
63        &self,
64        name: &SubjectName,
65        permanent: Option<bool>,
66    ) -> Result<Vec<SchemaVersion>, SchemaRegistryError> {
67        let path = format!("subjects/{name}");
68        let mut url = self.sr.base_url.join(&path)?;
69        if let Some(permanent) = permanent {
70            let query = format!("permanent={permanent}");
71            url.set_query(Some(&query));
72        }
73
74        self.sr.delete(url).await
75    }
76
77    /// Get a specific version of the schema registered under this subject.
78    ///
79    /// # Errors
80    ///
81    /// Fail if we cannot send the query
82    /// Fail if the schema registry return an error
83    #[tracing::instrument(skip(self))]
84    pub async fn version(
85        &self,
86        name: &SubjectName,
87        version: SchemaVersion,
88    ) -> Result<Option<Subject>, SchemaRegistryError> {
89        let path = format!("subjects/{name}/versions/{version}");
90        let url = self.sr.base_url.join(&path)?;
91        self.sr.get_optional(url).await
92    }
93
94    /// Get the schema for the specified version of this subject.
95    ///
96    /// # Errors
97    ///
98    /// Fail if we cannot send the query
99    /// Fail if the schema registry return an error
100    #[tracing::instrument(skip(self))]
101    pub async fn schema(
102        &self,
103        name: &SubjectName,
104        version: SchemaVersion,
105    ) -> Result<Option<String>, SchemaRegistryError> {
106        let path = format!("subjects/{name}/versions/{version}/schema");
107        let url = self.sr.base_url.join(&path)?;
108        self.sr.get_optional_string(url).await
109    }
110
111    /// Register a new schema under the specified subject.
112    ///
113    /// # Errors
114    ///
115    /// Fail if we cannot send the query
116    /// Fail if the schema registry return an error
117    #[tracing::instrument(skip(self))]
118    pub async fn new_version(
119        &self,
120        name: &SubjectName,
121        schema: &RegisterSchema,
122        normalize: Option<bool>,
123    ) -> Result<RegisteredSchema, SchemaRegistryError> {
124        let path = format!("subjects/{name}/versions");
125        let mut url = self.sr.base_url.join(&path)?;
126        if let Some(normalize) = normalize {
127            let query = format!("normalize={normalize}");
128            url.set_query(Some(&query));
129        }
130        self.sr.post(url, schema).await
131    }
132
133    /// Check if a schema has already been registered under the specified subject.
134    ///
135    /// # Errors
136    ///
137    /// Fail if we cannot send the query
138    /// Fail if the schema registry return an error
139    #[tracing::instrument(skip(self))]
140    pub async fn check_schema(
141        &self,
142        name: &SubjectName,
143        schema: &RegisterSchema,
144        normalize: Option<bool>,
145    ) -> Result<Subject, SchemaRegistryError> {
146        let path = format!("subjects/{name}");
147        let mut url = self.sr.base_url.join(&path)?;
148        if let Some(normalize) = normalize {
149            let query = format!("normalize={normalize}");
150            url.set_query(Some(&query));
151        }
152        self.sr.post(url, schema).await
153    }
154
155    /// Deletes a specific version of the schema registered under this subject.
156    ///
157    /// # Errors
158    ///
159    /// Fail if we cannot send the query
160    /// Fail if the schema registry return an error
161    #[tracing::instrument(skip(self))]
162    pub async fn delete_version(
163        &self,
164        name: &SubjectName,
165        version: SchemaVersion,
166        permanent: Option<bool>,
167    ) -> Result<Option<SchemaVersion>, SchemaRegistryError> {
168        let path = format!("subjects/{name}/versions/{version}");
169        let mut url = self.sr.base_url.join(&path)?;
170        if let Some(permanent) = permanent {
171            let query = format!("permanent={permanent}");
172            url.set_query(Some(&query));
173        }
174
175        self.sr.delete_option(url).await
176    }
177
178    /// Get a list of IDs of schemas that reference the schema with the given subject and version.
179    ///
180    /// # Errors
181    ///
182    /// Fail if we cannot send the query
183    /// Fail if the schema registry return an error
184    #[tracing::instrument(skip(self))]
185    pub async fn referenced_by(
186        &self,
187        name: &SubjectName,
188        version: SchemaVersion,
189    ) -> Result<Vec<SchemaId>, SchemaRegistryError> {
190        let path = format!("subjects/{name}/versions/{version}/referencedby");
191        let url = self.sr.base_url.join(&path)?;
192        self.sr.get(url).await
193    }
194}