schema_registry_api/service/
schema.rs

1use crate::{Schema, SchemaId, SchemaRegistryError, SchemaType, SubjectName, SubjectVersion};
2
3use super::SchemaRegistry;
4
5/// The subject client
6#[derive(Debug)]
7pub struct SchemaClient<'sr> {
8    pub(super) sr: &'sr SchemaRegistry,
9}
10
11impl SchemaClient<'_> {
12    /// Get the schema string identified by the input ID.
13    ///
14    /// # Errors
15    ///
16    /// Fail if we cannot send the query
17    /// Fail if the schema registry return an error
18    #[tracing::instrument(skip(self))]
19    pub async fn get(
20        &self,
21        id: SchemaId,
22        subject: Option<&SubjectName>,
23    ) -> Result<Option<Schema>, SchemaRegistryError> {
24        let path = format!("schemas/ids/{id}");
25        let mut url = self.sr.base_url.join(&path)?;
26        if let Some(subject) = subject {
27            let query = format!("subject={subject}");
28            url.set_query(Some(&query));
29        }
30        self.sr.get_optional(url).await
31    }
32
33    /// Retrieves only the schema identified by the input ID.
34    ///
35    /// # Errors
36    ///
37    /// Fail if we cannot send the query
38    /// Fail if the schema registry return an error
39    #[tracing::instrument(skip(self))]
40    pub async fn get_schema(
41        &self,
42        id: SchemaId,
43        subject: Option<SubjectName>,
44    ) -> Result<Option<String>, SchemaRegistryError> {
45        let path = format!("schemas/ids/{id}/schema");
46        let mut url = self.sr.base_url.join(&path)?;
47        if let Some(subject) = subject {
48            let query = format!("subject={subject}");
49            url.set_query(Some(&query));
50        }
51        self.sr.get_optional(url).await
52    }
53
54    /// Get the schema types that are registered with Schema Registry.
55    ///
56    /// # Errors
57    ///
58    /// Fail if we cannot send the query
59    /// Fail if the schema registry return an error
60    #[tracing::instrument(skip(self))]
61    pub async fn types(&self) -> Result<Vec<SchemaType>, SchemaRegistryError> {
62        let url = self.sr.base_url.join("schemas/types")?;
63        self.sr.get(url).await
64    }
65
66    /// Get the subject-version pairs identified by the input ID.
67    ///
68    /// # Errors
69    ///
70    /// Fail if we cannot send the query
71    /// Fail if the schema registry return an error
72    #[tracing::instrument(skip(self))]
73    pub async fn versions(&self, id: SchemaId) -> Result<Vec<SubjectVersion>, SchemaRegistryError> {
74        let path = format!("schemas/ids/{id}/versions");
75        let url = self.sr.base_url.join(&path)?;
76        self.sr.get(url).await
77    }
78}