use std::sync::Arc;
use google_cloud_gax::conn::Channel;
use google_cloud_gax::create_request;
use google_cloud_gax::grpc::Response;
use google_cloud_gax::grpc::Status;
use google_cloud_gax::retry::{invoke, MapErr, RetrySetting};
use google_cloud_googleapis::pubsub::v1::schema_service_client::SchemaServiceClient;
use google_cloud_googleapis::pubsub::v1::{
CreateSchemaRequest, DeleteSchemaRequest, GetSchemaRequest, ListSchemasRequest, Schema, ValidateMessageRequest,
ValidateMessageResponse, ValidateSchemaRequest, ValidateSchemaResponse,
};
use crate::apiv1::conn_pool::ConnectionManager;
#[derive(Clone, Debug)]
pub(crate) struct SchemaClient {
cm: Arc<ConnectionManager>,
}
#[allow(dead_code)]
impl SchemaClient {
pub fn new(cm: ConnectionManager) -> SchemaClient {
SchemaClient { cm: Arc::new(cm) }
}
fn client(&self) -> SchemaServiceClient<Channel> {
SchemaServiceClient::new(self.cm.conn())
}
pub async fn create_schema(
&self,
req: CreateSchemaRequest,
retry: Option<RetrySetting>,
) -> Result<Response<Schema>, Status> {
let parent = &req.parent;
let action = || async {
let mut client = self.client();
let request = create_request(format!("parent={parent}"), req.clone());
client.create_schema(request).await.map_transient_err()
};
invoke(retry, action).await
}
pub async fn get_schema(
&self,
req: GetSchemaRequest,
retry: Option<RetrySetting>,
) -> Result<Response<Schema>, Status> {
let name = &req.name;
let action = || async {
let mut client = self.client();
let request = create_request(format!("name={name}"), req.clone());
client.get_schema(request).await.map_transient_err()
};
invoke(retry, action).await
}
pub async fn list_schemas(
&self,
mut req: ListSchemasRequest,
retry: Option<RetrySetting>,
) -> Result<Vec<Schema>, Status> {
let project = &req.parent;
let mut all = vec![];
loop {
let action = || async {
let mut client = self.client();
let request = create_request(format!("project={project}"), req.clone());
client
.list_schemas(request)
.await
.map(|d| d.into_inner())
.map_transient_err()
};
let response = invoke(retry.clone(), action).await?;
all.extend(response.schemas.into_iter());
if response.next_page_token.is_empty() {
return Ok(all);
}
req.page_token = response.next_page_token;
}
}
pub async fn delete_schema(
&self,
req: DeleteSchemaRequest,
retry: Option<RetrySetting>,
) -> Result<Response<()>, Status> {
let name = &req.name;
let action = || async {
let mut client = self.client();
let request = create_request(format!("name={name}"), req.clone());
client.delete_schema(request).await.map_transient_err()
};
invoke(retry, action).await
}
pub async fn validate_schema(
&self,
req: ValidateSchemaRequest,
retry: Option<RetrySetting>,
) -> Result<Response<ValidateSchemaResponse>, Status> {
let parent = &req.parent;
let action = || async {
let mut client = self.client();
let request = create_request(format!("parent={parent}"), req.clone());
client.validate_schema(request).await.map_transient_err()
};
invoke(retry, action).await
}
pub async fn validate_message(
&self,
req: ValidateMessageRequest,
retry: Option<RetrySetting>,
) -> Result<Response<ValidateMessageResponse>, Status> {
let parent = &req.parent;
let action = || async {
let mut client = self.client();
let request = create_request(format!("parent={parent}"), req.clone());
client.validate_message(request).await.map_transient_err()
};
invoke(retry, action).await
}
}