schema_registry_api/service/
mode.rs

1use crate::{Mode, SchemaRegistryError, SubjectName};
2
3use super::SchemaRegistry;
4
5/// The subject client
6#[derive(Debug)]
7pub struct ModeClient<'sr> {
8    pub(super) sr: &'sr SchemaRegistry,
9}
10
11impl ModeClient<'_> {
12    /// Get the current mode for Schema Registry at a global level.
13    ///
14    /// # Errors
15    ///
16    /// Fail if we cannot send the query
17    /// Fail if the schema registry return an error
18    #[tracing::instrument(skip(self))]
19    pub async fn get(&self) -> Result<Mode, SchemaRegistryError> {
20        let url = self.sr.base_url.join("mode")?;
21        self.sr.get(url).await
22    }
23
24    /// Update the global Schema Registry mode.
25    ///
26    /// # Errors
27    ///
28    /// Fail if we cannot send the query
29    /// Fail if the schema registry return an error
30    #[tracing::instrument(skip(self))]
31    pub async fn set(&self, mode: Mode, force: Option<bool>) -> Result<Mode, SchemaRegistryError> {
32        let mut url = self.sr.base_url.join("mode")?;
33        if let Some(force) = force {
34            let query = format!("force={force}");
35            url.set_query(Some(&query));
36        }
37        self.sr.put(url, &mode).await
38    }
39
40    /// Get the mode for a subject.
41    ///
42    /// # Errors
43    ///
44    /// Fail if we cannot send the query
45    /// Fail if the schema registry return an error
46    #[tracing::instrument(skip(self))]
47    pub async fn get_subject(&self, subject: &SubjectName) -> Result<Mode, SchemaRegistryError> {
48        let path = format!("mode/{subject}");
49        let url = self.sr.base_url.join(&path)?;
50        self.sr.get(url).await
51    }
52
53    /// Update the mode for the specified subject.
54    ///
55    /// # Errors
56    ///
57    /// Fail if we cannot send the query
58    /// Fail if the schema registry return an error
59    #[tracing::instrument(skip(self))]
60    pub async fn set_subject(
61        &self,
62        subject: &SubjectName,
63        mode: Mode,
64        force: Option<bool>,
65    ) -> Result<Mode, SchemaRegistryError> {
66        let path = format!("mode/{subject}");
67        let mut url = self.sr.base_url.join(&path)?;
68        if let Some(force) = force {
69            let query = format!("force={force}");
70            url.set_query(Some(&query));
71        }
72        self.sr.put(url, &mode).await
73    }
74
75    /// Deletes the subject-level mode for the specified subject and reverts to the global default.
76    ///
77    /// # Errors
78    ///
79    /// Fail if we cannot send the query
80    /// Fail if the schema registry return an error
81    #[tracing::instrument(skip(self))]
82    pub async fn delete_subject(&self, subject: &SubjectName) -> Result<Mode, SchemaRegistryError> {
83        let path = format!("mode/{subject}");
84        let url = self.sr.base_url.join(&path)?;
85        self.sr.delete(url).await
86    }
87}