use crate::{
RegisterSchema, RegisteredSchema, SchemaId, SchemaRegistryError, SchemaVersion, Subject,
SubjectName,
};
use super::SchemaRegistry;
#[derive(Debug)]
pub struct SubjectClient<'sr> {
pub(super) sr: &'sr SchemaRegistry,
}
impl SubjectClient<'_> {
#[tracing::instrument(skip(self))]
pub async fn list(
&self,
subject_prefix: Option<&str>,
deleted: Option<bool>,
) -> Result<Vec<SubjectName>, SchemaRegistryError> {
let mut url = self.sr.base_url.join("subjects")?;
if let Some(subject_prefix) = subject_prefix {
url.query_pairs_mut()
.append_pair("subjectPrefix", subject_prefix);
}
if let Some(deleted) = deleted {
url.query_pairs_mut()
.append_pair("deleted", deleted.to_string().as_str());
}
self.sr.get(url).await
}
#[tracing::instrument(skip(self))]
pub async fn versions(
&self,
name: &SubjectName,
) -> Result<Vec<SchemaVersion>, SchemaRegistryError> {
let path = format!("subjects/{name}/versions");
let url = self.sr.base_url.join(&path)?;
self.sr.get(url).await
}
#[tracing::instrument(skip(self))]
pub async fn delete(
&self,
name: &SubjectName,
permanent: Option<bool>,
) -> Result<Vec<SchemaVersion>, SchemaRegistryError> {
let path = format!("subjects/{name}");
let mut url = self.sr.base_url.join(&path)?;
if let Some(permanent) = permanent {
let query = format!("permanent={permanent}");
url.set_query(Some(&query));
}
self.sr.delete(url).await
}
#[tracing::instrument(skip(self))]
pub async fn version(
&self,
name: &SubjectName,
version: SchemaVersion,
) -> Result<Option<Subject>, SchemaRegistryError> {
let path = format!("subjects/{name}/versions/{version}");
let url = self.sr.base_url.join(&path)?;
self.sr.get_optional(url).await
}
#[tracing::instrument(skip(self))]
pub async fn schema(
&self,
name: &SubjectName,
version: SchemaVersion,
) -> Result<Option<String>, SchemaRegistryError> {
let path = format!("subjects/{name}/versions/{version}/schema");
let url = self.sr.base_url.join(&path)?;
self.sr.get_optional_string(url).await
}
#[tracing::instrument(skip(self))]
pub async fn new_version(
&self,
name: &SubjectName,
schema: &RegisterSchema,
normalize: Option<bool>,
) -> Result<RegisteredSchema, SchemaRegistryError> {
let path = format!("subjects/{name}/versions");
let mut url = self.sr.base_url.join(&path)?;
if let Some(normalize) = normalize {
let query = format!("normalize={normalize}");
url.set_query(Some(&query));
}
self.sr.post(url, schema).await
}
#[tracing::instrument(skip(self))]
pub async fn check_schema(
&self,
name: &SubjectName,
schema: &RegisterSchema,
normalize: Option<bool>,
) -> Result<Subject, SchemaRegistryError> {
let path = format!("subjects/{name}");
let mut url = self.sr.base_url.join(&path)?;
if let Some(normalize) = normalize {
let query = format!("normalize={normalize}");
url.set_query(Some(&query));
}
self.sr.post(url, schema).await
}
#[tracing::instrument(skip(self))]
pub async fn delete_version(
&self,
name: &SubjectName,
version: SchemaVersion,
permanent: Option<bool>,
) -> Result<Option<SchemaVersion>, SchemaRegistryError> {
let path = format!("subjects/{name}/versions/{version}");
let mut url = self.sr.base_url.join(&path)?;
if let Some(permanent) = permanent {
let query = format!("permanent={permanent}");
url.set_query(Some(&query));
}
self.sr.delete_option(url).await
}
#[tracing::instrument(skip(self))]
pub async fn referenced_by(
&self,
name: &SubjectName,
version: SchemaVersion,
) -> Result<Vec<SchemaId>, SchemaRegistryError> {
let path = format!("subjects/{name}/versions/{version}/referencedby");
let url = self.sr.base_url.join(&path)?;
self.sr.get(url).await
}
}