schema_registry_api/service/
config.rs

1use crate::{Compatibility, GetCompatibility, SchemaRegistryError, SubjectName};
2
3use super::SchemaRegistry;
4
5/// The configuration client
6#[derive(Debug)]
7pub struct ConfigClient<'sr> {
8    pub(super) sr: &'sr SchemaRegistry,
9}
10
11impl ConfigClient<'_> {
12    /// Update global compatibility level.
13    ///
14    /// # Errors
15    ///
16    /// Fail if we cannot send the query
17    /// Fail if the schema registry return an error
18    #[tracing::instrument(skip(self))]
19    pub async fn set(
20        &self,
21        compatibility: Compatibility,
22    ) -> Result<Compatibility, SchemaRegistryError> {
23        let url = self.sr.base_url.join("config")?;
24        self.sr.put(url, &compatibility).await
25    }
26
27    /// Get global compatibility level.
28    ///
29    /// # Errors
30    ///
31    /// Fail if we cannot send the query
32    /// Fail if the schema registry return an error
33    #[tracing::instrument(skip(self))]
34    pub async fn get(&self) -> Result<GetCompatibility, SchemaRegistryError> {
35        let url = self.sr.base_url.join("config")?;
36        self.sr.get(url).await
37    }
38
39    /// Update compatibility level for the specified subject.
40    ///
41    /// # Errors
42    ///
43    /// Fail if we cannot send the query
44    /// Fail if the schema registry return an error
45    #[tracing::instrument(skip(self))]
46    pub async fn set_subject(
47        &self,
48        subject: &SubjectName,
49        compatibility: Compatibility,
50    ) -> Result<Compatibility, SchemaRegistryError> {
51        let path = format!("config/{subject}");
52        let url = self.sr.base_url.join(&path)?;
53        self.sr.put(url, &compatibility).await
54    }
55
56    /// Update compatibility level for the specified subject.
57    ///
58    /// # Errors
59    ///
60    /// Fail if we cannot send the query
61    /// Fail if the schema registry return an error
62    #[tracing::instrument(skip(self))]
63    pub async fn get_subject(
64        &self,
65        subject: &SubjectName,
66        default_to_global: Option<bool>,
67    ) -> Result<GetCompatibility, SchemaRegistryError> {
68        let path = format!("config/{subject}");
69        let mut url = self.sr.base_url.join(&path)?;
70        if let Some(default_to_global) = default_to_global {
71            let query = format!("defaultToGlobal={default_to_global}");
72            url.set_query(Some(&query));
73        }
74        self.sr.get(url).await
75    }
76
77    /// Deletes the specified subject-level compatibility level config and reverts to the global default.
78    ///
79    /// # Errors
80    ///
81    /// Fail if we cannot send the query
82    /// Fail if the schema registry return an error
83    #[tracing::instrument(skip(self))]
84    pub async fn delete_subject(
85        &self,
86        subject: &SubjectName,
87    ) -> Result<GetCompatibility, SchemaRegistryError> {
88        let path = format!("config/{subject}");
89        let url = self.sr.base_url.join(&path)?;
90        self.sr.delete(url).await
91    }
92}