#![warn(unreachable_pub)]
#![forbid(unsafe_code)]
mod tls_crypto;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use magnetar_auth_oauth2::ClientCredentialsFlow;
use magnetar_proto::MessageId;
use reqwest::header::{AUTHORIZATION, HeaderMap, HeaderValue};
use reqwest::{Method, RequestBuilder, Response, StatusCode};
use serde::{Deserialize, Serialize};
use url::Url;
pub const DEFAULT_TIMEOUT: Duration = Duration::from_secs(60);
pub const PACKAGE_REGISTER_TIMEOUT: Duration = Duration::from_secs(300);
#[derive(Clone, Default)]
pub enum AdminAuth {
#[default]
None,
Token(String),
OAuth2(Arc<ClientCredentialsFlow>),
}
impl std::fmt::Debug for AdminAuth {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::None => f.write_str("None"),
Self::Token(_) => f.debug_tuple("Token").field(&"<redacted>").finish(),
Self::OAuth2(flow) => f.debug_tuple("OAuth2").field(flow).finish(),
}
}
}
#[derive(Debug, Clone)]
pub struct AdminClient {
base_url: Url,
base_url_v3: Url,
http: reqwest::Client,
auth: AdminAuth,
}
impl AdminClient {
#[must_use]
pub fn builder() -> AdminClientBuilder {
AdminClientBuilder::default()
}
#[must_use]
pub fn base_url(&self) -> &Url {
&self.base_url
}
#[must_use]
pub fn auth(&self) -> &AdminAuth {
&self.auth
}
pub async fn cluster_list(&self) -> Result<Vec<String>, AdminError> {
let url = self.url(&["clusters"])?;
let resp = self.send(self.http.request(Method::GET, url)).await?;
json_ok(resp).await
}
pub async fn cluster_failure_domains_list(
&self,
cluster: &str,
) -> Result<serde_json::Value, AdminError> {
validate_segment(cluster)?;
let url = self.url(&["clusters", cluster, "failureDomains"])?;
let resp = self.send(self.http.request(Method::GET, url)).await?;
json_ok(resp).await
}
pub async fn cluster_failure_domain_get(
&self,
cluster: &str,
domain: &str,
) -> Result<serde_json::Value, AdminError> {
validate_segment(cluster)?;
validate_segment(domain)?;
let url = self.url(&["clusters", cluster, "failureDomains", domain])?;
let resp = self.send(self.http.request(Method::GET, url)).await?;
json_ok(resp).await
}
pub async fn namespace_isolation_policies_list(
&self,
cluster: &str,
) -> Result<serde_json::Value, AdminError> {
validate_segment(cluster)?;
let url = self.url(&["clusters", cluster, "namespaceIsolationPolicies"])?;
let resp = self.send(self.http.request(Method::GET, url)).await?;
match json_ok::<serde_json::Value>(resp).await {
Ok(v) => Ok(v),
Err(AdminError::Status {
code: 404, body, ..
}) if body.contains("NamespaceIsolationPolicies") => {
Ok(serde_json::Value::Object(serde_json::Map::new()))
}
Err(e) => Err(e),
}
}
pub async fn brokers_list(&self, cluster: &str) -> Result<Vec<String>, AdminError> {
validate_segment(cluster)?;
let url = self.url(&["brokers", cluster])?;
let resp = self.send(self.http.request(Method::GET, url)).await?;
json_ok(resp).await
}
pub async fn brokers_leader(&self) -> Result<serde_json::Value, AdminError> {
let url = self.url(&["brokers", "leaderBroker"])?;
let resp = self.send(self.http.request(Method::GET, url)).await?;
json_ok(resp).await
}
pub async fn brokers_dynamic_config_keys(&self) -> Result<Vec<String>, AdminError> {
let url = self.url(&["brokers", "configuration"])?;
let resp = self.send(self.http.request(Method::GET, url)).await?;
let v: serde_json::Value = json_ok(resp).await?;
match v {
serde_json::Value::Array(items) => Ok(items
.into_iter()
.filter_map(|x| x.as_str().map(str::to_owned))
.collect()),
serde_json::Value::Object(map) => Ok(map.into_iter().map(|(k, _)| k).collect()),
other => Err(AdminError::Protocol(format!(
"brokers/configuration returned unexpected shape: {other}"
))),
}
}
pub async fn brokers_dynamic_config_overrides(&self) -> Result<serde_json::Value, AdminError> {
let url = self.url(&["brokers", "configuration", "values"])?;
let resp = self.send(self.http.request(Method::GET, url)).await?;
json_ok(resp).await
}
pub async fn brokers_runtime_config(&self) -> Result<serde_json::Value, AdminError> {
let url = self.url(&["brokers", "configuration", "runtime"])?;
let resp = self.send(self.http.request(Method::GET, url)).await?;
json_ok(resp).await
}
pub async fn brokers_internal_config(&self) -> Result<serde_json::Value, AdminError> {
let url = self.url(&["brokers", "internal-configuration"])?;
let resp = self.send(self.http.request(Method::GET, url)).await?;
json_ok(resp).await
}
pub async fn brokers_health_check(&self) -> Result<String, AdminError> {
let url = self.url(&["brokers", "health"])?;
let resp = self.send(self.http.request(Method::GET, url)).await?;
let resp = ensure_status(resp).await?;
Ok(resp.resp.text().await?)
}
pub async fn brokers_owned_namespaces(
&self,
cluster: &str,
broker: &str,
) -> Result<serde_json::Value, AdminError> {
validate_segment(cluster)?;
validate_segment(broker)?;
let url = self.url(&["brokers", cluster, broker, "ownedNamespaces"])?;
let resp = self.send(self.http.request(Method::GET, url)).await?;
json_ok(resp).await
}
pub async fn brokers_set_dynamic_config(
&self,
name: &str,
value: &str,
) -> Result<(), AdminError> {
validate_segment(name)?;
validate_segment(value)?;
let url = self.url(&["brokers", "configuration", name, value])?;
let resp = self.send(self.http.request(Method::POST, url)).await?;
empty_ok(resp).await
}
pub async fn brokers_delete_dynamic_config(&self, name: &str) -> Result<(), AdminError> {
validate_segment(name)?;
let url = self.url(&["brokers", "configuration", name])?;
let resp = self.send(self.http.request(Method::DELETE, url)).await?;
empty_ok(resp).await
}
pub async fn bookies_list_all(&self) -> Result<serde_json::Value, AdminError> {
let url = self.url(&["bookies", "all"])?;
let resp = self.send(self.http.request(Method::GET, url)).await?;
json_ok(resp).await
}
pub async fn bookies_racks_info(&self) -> Result<serde_json::Value, AdminError> {
let url = self.url(&["bookies", "racks-info"])?;
let resp = self.send(self.http.request(Method::GET, url)).await?;
json_ok(resp).await
}
pub async fn bookies_set_rack(
&self,
bookie: &str,
group: &str,
info: BookieInfo,
) -> Result<(), AdminError> {
validate_segment(bookie)?;
let mut url = self.url(&["bookies", "racks-info", bookie])?;
url.query_pairs_mut().append_pair("group", group);
let resp = self
.send(self.http.request(Method::POST, url).json(&info))
.await?;
empty_ok(resp).await
}
pub async fn bookies_delete_rack(&self, bookie: &str) -> Result<(), AdminError> {
validate_segment(bookie)?;
let url = self.url(&["bookies", "racks-info", bookie])?;
let resp = self.send(self.http.request(Method::DELETE, url)).await?;
empty_ok(resp).await
}
pub async fn schema_get_latest(&self, topic: &str) -> Result<serde_json::Value, AdminError> {
let (tenant, namespace, name) = split_topic(topic)?;
let url = self.url(&["schemas", tenant, namespace, name, "schema"])?;
let resp = self.send(self.http.request(Method::GET, url)).await?;
json_ok(resp).await
}
pub async fn schema_get_version(
&self,
topic: &str,
version: i64,
) -> Result<serde_json::Value, AdminError> {
let (tenant, namespace, name) = split_topic(topic)?;
let v = version.to_string();
let url = self.url(&["schemas", tenant, namespace, name, "schema", &v])?;
let resp = self.send(self.http.request(Method::GET, url)).await?;
json_ok(resp).await
}
pub async fn schema_list_versions(
&self,
topic: &str,
) -> Result<Vec<serde_json::Value>, AdminError> {
let (tenant, namespace, name) = split_topic(topic)?;
let url = self.url(&["schemas", tenant, namespace, name, "schemas"])?;
let resp = self.send(self.http.request(Method::GET, url)).await?;
let v: serde_json::Value = json_ok(resp).await?;
match v {
serde_json::Value::Array(items) => Ok(items),
serde_json::Value::Object(mut envelope) => {
if let Some(serde_json::Value::Array(items)) = envelope.remove("getSchemaResponses")
{
return Ok(items);
}
Err(AdminError::Protocol(format!(
"schemas/.../schemas envelope missing `getSchemaResponses` array: {}",
serde_json::Value::Object(envelope)
)))
}
other => Err(AdminError::Protocol(format!(
"schemas/.../schemas returned unexpected shape: {other}"
))),
}
}
pub async fn schema_post(
&self,
topic: &str,
payload: PostSchemaPayload,
) -> Result<serde_json::Value, AdminError> {
let (tenant, namespace, name) = split_topic(topic)?;
let url = self.url(&["schemas", tenant, namespace, name, "schema"])?;
let resp = self
.send(self.http.request(Method::POST, url).json(&payload))
.await?;
json_ok(resp).await
}
pub async fn schema_delete(&self, topic: &str, force: bool) -> Result<(), AdminError> {
let (tenant, namespace, name) = split_topic(topic)?;
let mut url = self.url(&["schemas", tenant, namespace, name, "schema"])?;
url.query_pairs_mut()
.append_pair("force", if force { "true" } else { "false" });
let resp = self.send(self.http.request(Method::DELETE, url)).await?;
empty_ok(resp).await
}
pub async fn schema_compatibility_check(
&self,
topic: &str,
payload: PostSchemaPayload,
) -> Result<serde_json::Value, AdminError> {
let (tenant, namespace, name) = split_topic(topic)?;
let url = self.url(&["schemas", tenant, namespace, name, "compatibility"])?;
let resp = self
.send(self.http.request(Method::POST, url).json(&payload))
.await?;
json_ok(resp).await
}
pub async fn functions_list_by_namespace(
&self,
tenant: &str,
namespace: &str,
) -> Result<Vec<String>, AdminError> {
validate_segment(tenant)?;
validate_segment(namespace)?;
let url = self.url_v3(&["functions", tenant, namespace])?;
let resp = self.send(self.http.request(Method::GET, url)).await?;
json_ok(resp).await
}
pub async fn function_get(
&self,
tenant: &str,
namespace: &str,
name: &str,
) -> Result<serde_json::Value, AdminError> {
validate_segment(tenant)?;
validate_segment(namespace)?;
validate_segment(name)?;
let url = self.url_v3(&["functions", tenant, namespace, name])?;
let resp = self.send(self.http.request(Method::GET, url)).await?;
json_ok(resp).await
}
pub async fn function_status(
&self,
tenant: &str,
namespace: &str,
name: &str,
) -> Result<serde_json::Value, AdminError> {
validate_segment(tenant)?;
validate_segment(namespace)?;
validate_segment(name)?;
let url = self.url_v3(&["functions", tenant, namespace, name, "status"])?;
let resp = self.send(self.http.request(Method::GET, url)).await?;
json_ok(resp).await
}
pub async fn function_stats(
&self,
tenant: &str,
namespace: &str,
name: &str,
) -> Result<serde_json::Value, AdminError> {
validate_segment(tenant)?;
validate_segment(namespace)?;
validate_segment(name)?;
let url = self.url_v3(&["functions", tenant, namespace, name, "stats"])?;
let resp = self.send(self.http.request(Method::GET, url)).await?;
json_ok(resp).await
}
pub async fn function_instance_status(
&self,
tenant: &str,
namespace: &str,
name: &str,
instance_id: i32,
) -> Result<serde_json::Value, AdminError> {
validate_segment(tenant)?;
validate_segment(namespace)?;
validate_segment(name)?;
let id = instance_id.to_string();
let url = self.url_v3(&["functions", tenant, namespace, name, &id, "status"])?;
let resp = self.send(self.http.request(Method::GET, url)).await?;
json_ok(resp).await
}
pub async fn function_instance_stats(
&self,
tenant: &str,
namespace: &str,
name: &str,
instance_id: i32,
) -> Result<serde_json::Value, AdminError> {
validate_segment(tenant)?;
validate_segment(namespace)?;
validate_segment(name)?;
let id = instance_id.to_string();
let url = self.url_v3(&["functions", tenant, namespace, name, &id, "stats"])?;
let resp = self.send(self.http.request(Method::GET, url)).await?;
json_ok(resp).await
}
pub async fn function_create_with_url(
&self,
tenant: &str,
namespace: &str,
name: &str,
url: &str,
config: FunctionConfig,
) -> Result<(), AdminError> {
validate_segment(tenant)?;
validate_segment(namespace)?;
validate_segment(name)?;
let endpoint = self.url_v3(&["functions", tenant, namespace, name])?;
let form = function_pkg_form(url, &config)?;
let resp = self
.send(
self.http
.request(Method::POST, endpoint)
.multipart(form)
.timeout(PACKAGE_REGISTER_TIMEOUT),
)
.await?;
empty_ok(resp).await
}
pub async fn function_update_with_url(
&self,
tenant: &str,
namespace: &str,
name: &str,
url: &str,
config: FunctionConfig,
) -> Result<(), AdminError> {
validate_segment(tenant)?;
validate_segment(namespace)?;
validate_segment(name)?;
let endpoint = self.url_v3(&["functions", tenant, namespace, name])?;
let form = function_pkg_form(url, &config)?;
let resp = self
.send(
self.http
.request(Method::PUT, endpoint)
.multipart(form)
.timeout(PACKAGE_REGISTER_TIMEOUT),
)
.await?;
empty_ok(resp).await
}
pub async fn function_delete(
&self,
tenant: &str,
namespace: &str,
name: &str,
) -> Result<(), AdminError> {
validate_segment(tenant)?;
validate_segment(namespace)?;
validate_segment(name)?;
let url = self.url_v3(&["functions", tenant, namespace, name])?;
let resp = self.send(self.http.request(Method::DELETE, url)).await?;
empty_ok(resp).await
}
pub async fn function_start(
&self,
tenant: &str,
namespace: &str,
name: &str,
) -> Result<(), AdminError> {
validate_segment(tenant)?;
validate_segment(namespace)?;
validate_segment(name)?;
let url = self.url_v3(&["functions", tenant, namespace, name, "start"])?;
let resp = self.send(self.http.request(Method::POST, url)).await?;
empty_ok(resp).await
}
pub async fn function_stop(
&self,
tenant: &str,
namespace: &str,
name: &str,
) -> Result<(), AdminError> {
validate_segment(tenant)?;
validate_segment(namespace)?;
validate_segment(name)?;
let url = self.url_v3(&["functions", tenant, namespace, name, "stop"])?;
let resp = self.send(self.http.request(Method::POST, url)).await?;
empty_ok(resp).await
}
pub async fn function_restart(
&self,
tenant: &str,
namespace: &str,
name: &str,
) -> Result<(), AdminError> {
validate_segment(tenant)?;
validate_segment(namespace)?;
validate_segment(name)?;
let url = self.url_v3(&["functions", tenant, namespace, name, "restart"])?;
let resp = self.send(self.http.request(Method::POST, url)).await?;
empty_ok(resp).await
}
pub async fn function_start_instance(
&self,
tenant: &str,
namespace: &str,
name: &str,
instance_id: i32,
) -> Result<(), AdminError> {
validate_segment(tenant)?;
validate_segment(namespace)?;
validate_segment(name)?;
let id = instance_id.to_string();
let url = self.url_v3(&["functions", tenant, namespace, name, &id, "start"])?;
let resp = self.send(self.http.request(Method::POST, url)).await?;
empty_ok(resp).await
}
pub async fn function_stop_instance(
&self,
tenant: &str,
namespace: &str,
name: &str,
instance_id: i32,
) -> Result<(), AdminError> {
validate_segment(tenant)?;
validate_segment(namespace)?;
validate_segment(name)?;
let id = instance_id.to_string();
let url = self.url_v3(&["functions", tenant, namespace, name, &id, "stop"])?;
let resp = self.send(self.http.request(Method::POST, url)).await?;
empty_ok(resp).await
}
pub async fn tenants_list(&self) -> Result<Vec<String>, AdminError> {
let url = self.url(&["tenants"])?;
let resp = self.send(self.http.request(Method::GET, url)).await?;
json_ok(resp).await
}
pub async fn tenant_create(&self, name: &str, info: TenantInfo) -> Result<(), AdminError> {
let url = self.url(&["tenants", name])?;
let resp = self
.send(self.http.request(Method::PUT, url).json(&info))
.await?;
empty_ok(resp).await
}
pub async fn tenant_delete(&self, name: &str) -> Result<(), AdminError> {
let url = self.url(&["tenants", name])?;
let resp = self.send(self.http.request(Method::DELETE, url)).await?;
empty_ok(resp).await
}
pub async fn namespaces_list(&self, tenant: &str) -> Result<Vec<String>, AdminError> {
let url = self.url(&["namespaces", tenant])?;
let resp = self.send(self.http.request(Method::GET, url)).await?;
json_ok(resp).await
}
pub async fn namespace_create(&self, ns: &str) -> Result<(), AdminError> {
let (tenant, namespace) = split_namespace(ns)?;
let url = self.url(&["namespaces", tenant, namespace])?;
let resp = self.send(self.http.request(Method::PUT, url)).await?;
empty_ok(resp).await
}
pub async fn namespace_delete(&self, ns: &str) -> Result<(), AdminError> {
let (tenant, namespace) = split_namespace(ns)?;
let url = self.url(&["namespaces", tenant, namespace])?;
let resp = self.send(self.http.request(Method::DELETE, url)).await?;
empty_ok(resp).await
}
pub async fn namespace_get_retention(&self, ns: &str) -> Result<RetentionPolicies, AdminError> {
let (tenant, namespace) = split_namespace(ns)?;
let url = self.url(&["namespaces", tenant, namespace, "retention"])?;
let resp = self.send(self.http.request(Method::GET, url)).await?;
json_ok_or_default(resp).await
}
pub async fn namespace_set_retention(
&self,
ns: &str,
policy: RetentionPolicies,
) -> Result<(), AdminError> {
let (tenant, namespace) = split_namespace(ns)?;
let url = self.url(&["namespaces", tenant, namespace, "retention"])?;
let resp = self
.send(self.http.request(Method::POST, url).json(&policy))
.await?;
empty_ok(resp).await
}
pub async fn namespace_remove_retention(&self, ns: &str) -> Result<(), AdminError> {
let (tenant, namespace) = split_namespace(ns)?;
let url = self.url(&["namespaces", tenant, namespace, "retention"])?;
let resp = self.send(self.http.request(Method::DELETE, url)).await?;
empty_ok(resp).await
}
pub async fn namespace_get_backlog_quotas(
&self,
ns: &str,
) -> Result<serde_json::Value, AdminError> {
let (tenant, namespace) = split_namespace(ns)?;
let url = self.url(&["namespaces", tenant, namespace, "backlogQuotaMap"])?;
let resp = self.send(self.http.request(Method::GET, url)).await?;
json_ok(resp).await
}
pub async fn namespace_set_backlog_quota(
&self,
ns: &str,
backlog_quota_type: BacklogQuotaType,
quota: BacklogQuota,
) -> Result<(), AdminError> {
let (tenant, namespace) = split_namespace(ns)?;
let mut url = self.url(&["namespaces", tenant, namespace, "backlogQuota"])?;
url.query_pairs_mut()
.append_pair("backlogQuotaType", backlog_quota_type.as_query_value());
let resp = self
.send(self.http.request(Method::POST, url).json("a))
.await?;
empty_ok(resp).await
}
pub async fn namespace_remove_backlog_quota(
&self,
ns: &str,
backlog_quota_type: BacklogQuotaType,
) -> Result<(), AdminError> {
let (tenant, namespace) = split_namespace(ns)?;
let mut url = self.url(&["namespaces", tenant, namespace, "backlogQuota"])?;
url.query_pairs_mut()
.append_pair("backlogQuotaType", backlog_quota_type.as_query_value());
let resp = self.send(self.http.request(Method::DELETE, url)).await?;
empty_ok(resp).await
}
pub async fn namespace_get_message_ttl(&self, ns: &str) -> Result<Option<i32>, AdminError> {
let (tenant, namespace) = split_namespace(ns)?;
let url = self.url(&["namespaces", tenant, namespace, "messageTTL"])?;
let resp = self.send(self.http.request(Method::GET, url)).await?;
json_ok_optional(resp).await
}
pub async fn namespace_set_message_ttl(
&self,
ns: &str,
ttl_seconds: i32,
) -> Result<(), AdminError> {
let (tenant, namespace) = split_namespace(ns)?;
let url = self.url(&["namespaces", tenant, namespace, "messageTTL"])?;
let resp = self
.send(self.http.request(Method::POST, url).json(&ttl_seconds))
.await?;
empty_ok(resp).await
}
pub async fn namespace_remove_message_ttl(&self, ns: &str) -> Result<(), AdminError> {
let (tenant, namespace) = split_namespace(ns)?;
let url = self.url(&["namespaces", tenant, namespace, "messageTTL"])?;
let resp = self.send(self.http.request(Method::DELETE, url)).await?;
empty_ok(resp).await
}
pub async fn namespace_get_persistence(
&self,
ns: &str,
) -> Result<PersistencePolicies, AdminError> {
let (tenant, namespace) = split_namespace(ns)?;
let url = self.url(&["namespaces", tenant, namespace, "persistence"])?;
let resp = self.send(self.http.request(Method::GET, url)).await?;
json_ok_or_default(resp).await
}
pub async fn namespace_set_persistence(
&self,
ns: &str,
policy: PersistencePolicies,
) -> Result<(), AdminError> {
let (tenant, namespace) = split_namespace(ns)?;
let url = self.url(&["namespaces", tenant, namespace, "persistence"])?;
let resp = self
.send(self.http.request(Method::POST, url).json(&policy))
.await?;
empty_ok(resp).await
}
pub async fn namespace_remove_persistence(&self, ns: &str) -> Result<(), AdminError> {
let (tenant, namespace) = split_namespace(ns)?;
let url = self.url(&["namespaces", tenant, namespace, "persistence"])?;
let resp = self.send(self.http.request(Method::DELETE, url)).await?;
empty_ok(resp).await
}
pub async fn namespace_get_dispatch_rate(&self, ns: &str) -> Result<DispatchRate, AdminError> {
let (tenant, namespace) = split_namespace(ns)?;
let url = self.url(&["namespaces", tenant, namespace, "dispatchRate"])?;
let resp = self.send(self.http.request(Method::GET, url)).await?;
json_ok_or_default(resp).await
}
pub async fn namespace_set_dispatch_rate(
&self,
ns: &str,
rate: DispatchRate,
) -> Result<(), AdminError> {
let (tenant, namespace) = split_namespace(ns)?;
let url = self.url(&["namespaces", tenant, namespace, "dispatchRate"])?;
let resp = self
.send(self.http.request(Method::POST, url).json(&rate))
.await?;
empty_ok(resp).await
}
pub async fn namespace_remove_dispatch_rate(&self, ns: &str) -> Result<(), AdminError> {
let (tenant, namespace) = split_namespace(ns)?;
let url = self.url(&["namespaces", tenant, namespace, "dispatchRate"])?;
let resp = self.send(self.http.request(Method::DELETE, url)).await?;
empty_ok(resp).await
}
pub async fn namespace_get_subscription_dispatch_rate(
&self,
ns: &str,
) -> Result<DispatchRate, AdminError> {
let (tenant, namespace) = split_namespace(ns)?;
let url = self.url(&["namespaces", tenant, namespace, "subscriptionDispatchRate"])?;
let resp = self.send(self.http.request(Method::GET, url)).await?;
json_ok(resp).await
}
pub async fn namespace_set_subscription_dispatch_rate(
&self,
ns: &str,
rate: DispatchRate,
) -> Result<(), AdminError> {
let (tenant, namespace) = split_namespace(ns)?;
let url = self.url(&["namespaces", tenant, namespace, "subscriptionDispatchRate"])?;
let resp = self
.send(self.http.request(Method::POST, url).json(&rate))
.await?;
empty_ok(resp).await
}
pub async fn namespace_remove_subscription_dispatch_rate(
&self,
ns: &str,
) -> Result<(), AdminError> {
let (tenant, namespace) = split_namespace(ns)?;
let url = self.url(&["namespaces", tenant, namespace, "subscriptionDispatchRate"])?;
let resp = self.send(self.http.request(Method::DELETE, url)).await?;
empty_ok(resp).await
}
pub async fn namespace_get_replicator_dispatch_rate(
&self,
ns: &str,
) -> Result<DispatchRate, AdminError> {
let (tenant, namespace) = split_namespace(ns)?;
let url = self.url(&["namespaces", tenant, namespace, "replicatorDispatchRate"])?;
let resp = self.send(self.http.request(Method::GET, url)).await?;
json_ok(resp).await
}
pub async fn namespace_set_replicator_dispatch_rate(
&self,
ns: &str,
rate: DispatchRate,
) -> Result<(), AdminError> {
let (tenant, namespace) = split_namespace(ns)?;
let url = self.url(&["namespaces", tenant, namespace, "replicatorDispatchRate"])?;
let resp = self
.send(self.http.request(Method::POST, url).json(&rate))
.await?;
empty_ok(resp).await
}
pub async fn namespace_remove_replicator_dispatch_rate(
&self,
ns: &str,
) -> Result<(), AdminError> {
let (tenant, namespace) = split_namespace(ns)?;
let url = self.url(&["namespaces", tenant, namespace, "replicatorDispatchRate"])?;
let resp = self.send(self.http.request(Method::DELETE, url)).await?;
empty_ok(resp).await
}
pub async fn namespace_get_publish_rate(&self, ns: &str) -> Result<PublishRate, AdminError> {
let (tenant, namespace) = split_namespace(ns)?;
let url = self.url(&["namespaces", tenant, namespace, "publishRate"])?;
let resp = self.send(self.http.request(Method::GET, url)).await?;
json_ok_or_default(resp).await
}
pub async fn namespace_set_publish_rate(
&self,
ns: &str,
rate: PublishRate,
) -> Result<(), AdminError> {
let (tenant, namespace) = split_namespace(ns)?;
let url = self.url(&["namespaces", tenant, namespace, "publishRate"])?;
let resp = self
.send(self.http.request(Method::POST, url).json(&rate))
.await?;
empty_ok(resp).await
}
pub async fn namespace_remove_publish_rate(&self, ns: &str) -> Result<(), AdminError> {
let (tenant, namespace) = split_namespace(ns)?;
let url = self.url(&["namespaces", tenant, namespace, "publishRate"])?;
let resp = self.send(self.http.request(Method::DELETE, url)).await?;
empty_ok(resp).await
}
pub async fn namespace_get_deduplication(&self, ns: &str) -> Result<Option<bool>, AdminError> {
let (tenant, namespace) = split_namespace(ns)?;
let url = self.url(&["namespaces", tenant, namespace, "deduplication"])?;
let resp = self.send(self.http.request(Method::GET, url)).await?;
json_ok_optional(resp).await
}
pub async fn namespace_set_deduplication(
&self,
ns: &str,
enabled: bool,
) -> Result<(), AdminError> {
let (tenant, namespace) = split_namespace(ns)?;
let url = self.url(&["namespaces", tenant, namespace, "deduplication"])?;
let resp = self
.send(self.http.request(Method::POST, url).json(&enabled))
.await?;
empty_ok(resp).await
}
pub async fn namespace_remove_deduplication(&self, ns: &str) -> Result<(), AdminError> {
let (tenant, namespace) = split_namespace(ns)?;
let url = self.url(&["namespaces", tenant, namespace, "deduplication"])?;
let resp = self.send(self.http.request(Method::DELETE, url)).await?;
empty_ok(resp).await
}
pub async fn namespace_get_deduplication_snapshot_interval(
&self,
ns: &str,
) -> Result<Option<i32>, AdminError> {
let (tenant, namespace) = split_namespace(ns)?;
let url = self.url(&[
"namespaces",
tenant,
namespace,
"deduplicationSnapshotInterval",
])?;
let resp = self.send(self.http.request(Method::GET, url)).await?;
json_ok_optional(resp).await
}
pub async fn namespace_set_deduplication_snapshot_interval(
&self,
ns: &str,
interval_entries: i32,
) -> Result<(), AdminError> {
let (tenant, namespace) = split_namespace(ns)?;
let url = self.url(&[
"namespaces",
tenant,
namespace,
"deduplicationSnapshotInterval",
])?;
let resp = self
.send(self.http.request(Method::POST, url).json(&interval_entries))
.await?;
empty_ok(resp).await
}
pub async fn namespace_remove_deduplication_snapshot_interval(
&self,
ns: &str,
) -> Result<(), AdminError> {
let (tenant, namespace) = split_namespace(ns)?;
let url = self.url(&[
"namespaces",
tenant,
namespace,
"deduplicationSnapshotInterval",
])?;
let resp = self.send(self.http.request(Method::DELETE, url)).await?;
empty_ok(resp).await
}
pub async fn namespace_get_compaction_threshold(
&self,
ns: &str,
) -> Result<Option<i64>, AdminError> {
let (tenant, namespace) = split_namespace(ns)?;
let url = self.url(&["namespaces", tenant, namespace, "compactionThreshold"])?;
let resp = self.send(self.http.request(Method::GET, url)).await?;
json_ok_optional(resp).await
}
pub async fn namespace_set_compaction_threshold(
&self,
ns: &str,
threshold_bytes: i64,
) -> Result<(), AdminError> {
let (tenant, namespace) = split_namespace(ns)?;
let url = self.url(&["namespaces", tenant, namespace, "compactionThreshold"])?;
let resp = self
.send(self.http.request(Method::PUT, url).json(&threshold_bytes))
.await?;
empty_ok(resp).await
}
pub async fn namespace_remove_compaction_threshold(&self, ns: &str) -> Result<(), AdminError> {
let (tenant, namespace) = split_namespace(ns)?;
let url = self.url(&["namespaces", tenant, namespace, "compactionThreshold"])?;
let resp = self.send(self.http.request(Method::DELETE, url)).await?;
empty_ok(resp).await
}
pub async fn namespace_get_delayed_delivery(
&self,
ns: &str,
) -> Result<Option<DelayedDeliveryPolicies>, AdminError> {
let (tenant, namespace) = split_namespace(ns)?;
let url = self.url(&["namespaces", tenant, namespace, "delayedDelivery"])?;
let resp = self.send(self.http.request(Method::GET, url)).await?;
json_ok_optional(resp).await
}
pub async fn namespace_set_delayed_delivery(
&self,
ns: &str,
policy: DelayedDeliveryPolicies,
) -> Result<(), AdminError> {
let (tenant, namespace) = split_namespace(ns)?;
let url = self.url(&["namespaces", tenant, namespace, "delayedDelivery"])?;
let resp = self
.send(self.http.request(Method::POST, url).json(&policy))
.await?;
empty_ok(resp).await
}
pub async fn namespace_remove_delayed_delivery(&self, ns: &str) -> Result<(), AdminError> {
let (tenant, namespace) = split_namespace(ns)?;
let url = self.url(&["namespaces", tenant, namespace, "delayedDelivery"])?;
let resp = self.send(self.http.request(Method::DELETE, url)).await?;
empty_ok(resp).await
}
pub async fn namespace_get_max_producers_per_topic(
&self,
ns: &str,
) -> Result<Option<i32>, AdminError> {
let (tenant, namespace) = split_namespace(ns)?;
let url = self.url(&["namespaces", tenant, namespace, "maxProducersPerTopic"])?;
let resp = self.send(self.http.request(Method::GET, url)).await?;
json_ok_optional(resp).await
}
pub async fn namespace_set_max_producers_per_topic(
&self,
ns: &str,
max_producers: i32,
) -> Result<(), AdminError> {
let (tenant, namespace) = split_namespace(ns)?;
let url = self.url(&["namespaces", tenant, namespace, "maxProducersPerTopic"])?;
let resp = self
.send(self.http.request(Method::POST, url).json(&max_producers))
.await?;
empty_ok(resp).await
}
pub async fn namespace_remove_max_producers_per_topic(
&self,
ns: &str,
) -> Result<(), AdminError> {
let (tenant, namespace) = split_namespace(ns)?;
let url = self.url(&["namespaces", tenant, namespace, "maxProducersPerTopic"])?;
let resp = self.send(self.http.request(Method::DELETE, url)).await?;
empty_ok(resp).await
}
pub async fn namespace_get_max_consumers_per_topic(
&self,
ns: &str,
) -> Result<Option<i32>, AdminError> {
let (tenant, namespace) = split_namespace(ns)?;
let url = self.url(&["namespaces", tenant, namespace, "maxConsumersPerTopic"])?;
let resp = self.send(self.http.request(Method::GET, url)).await?;
json_ok_optional(resp).await
}
pub async fn namespace_set_max_consumers_per_topic(
&self,
ns: &str,
max_consumers: i32,
) -> Result<(), AdminError> {
let (tenant, namespace) = split_namespace(ns)?;
let url = self.url(&["namespaces", tenant, namespace, "maxConsumersPerTopic"])?;
let resp = self
.send(self.http.request(Method::POST, url).json(&max_consumers))
.await?;
empty_ok(resp).await
}
pub async fn namespace_remove_max_consumers_per_topic(
&self,
ns: &str,
) -> Result<(), AdminError> {
let (tenant, namespace) = split_namespace(ns)?;
let url = self.url(&["namespaces", tenant, namespace, "maxConsumersPerTopic"])?;
let resp = self.send(self.http.request(Method::DELETE, url)).await?;
empty_ok(resp).await
}
pub async fn namespace_get_max_unacked_messages_per_consumer(
&self,
ns: &str,
) -> Result<Option<i32>, AdminError> {
let (tenant, namespace) = split_namespace(ns)?;
let url = self.url(&[
"namespaces",
tenant,
namespace,
"maxUnackedMessagesPerConsumer",
])?;
let resp = self.send(self.http.request(Method::GET, url)).await?;
json_ok_optional(resp).await
}
pub async fn namespace_set_max_unacked_messages_per_consumer(
&self,
ns: &str,
max_unacked: i32,
) -> Result<(), AdminError> {
let (tenant, namespace) = split_namespace(ns)?;
let url = self.url(&[
"namespaces",
tenant,
namespace,
"maxUnackedMessagesPerConsumer",
])?;
let resp = self
.send(self.http.request(Method::POST, url).json(&max_unacked))
.await?;
empty_ok(resp).await
}
pub async fn namespace_remove_max_unacked_messages_per_consumer(
&self,
ns: &str,
) -> Result<(), AdminError> {
let (tenant, namespace) = split_namespace(ns)?;
let url = self.url(&[
"namespaces",
tenant,
namespace,
"maxUnackedMessagesPerConsumer",
])?;
let resp = self.send(self.http.request(Method::DELETE, url)).await?;
empty_ok(resp).await
}
pub async fn namespace_get_max_unacked_messages_per_subscription(
&self,
ns: &str,
) -> Result<Option<i32>, AdminError> {
let (tenant, namespace) = split_namespace(ns)?;
let url = self.url(&[
"namespaces",
tenant,
namespace,
"maxUnackedMessagesPerSubscription",
])?;
let resp = self.send(self.http.request(Method::GET, url)).await?;
json_ok_optional(resp).await
}
pub async fn namespace_set_max_unacked_messages_per_subscription(
&self,
ns: &str,
max_unacked: i32,
) -> Result<(), AdminError> {
let (tenant, namespace) = split_namespace(ns)?;
let url = self.url(&[
"namespaces",
tenant,
namespace,
"maxUnackedMessagesPerSubscription",
])?;
let resp = self
.send(self.http.request(Method::POST, url).json(&max_unacked))
.await?;
empty_ok(resp).await
}
pub async fn namespace_remove_max_unacked_messages_per_subscription(
&self,
ns: &str,
) -> Result<(), AdminError> {
let (tenant, namespace) = split_namespace(ns)?;
let url = self.url(&[
"namespaces",
tenant,
namespace,
"maxUnackedMessagesPerSubscription",
])?;
let resp = self.send(self.http.request(Method::DELETE, url)).await?;
empty_ok(resp).await
}
pub async fn topics_list(&self, namespace: &str) -> Result<Vec<String>, AdminError> {
let (tenant, namespace) = split_namespace(namespace)?;
let url = self.url(&["persistent", tenant, namespace])?;
let resp = self.send(self.http.request(Method::GET, url)).await?;
json_ok(resp).await
}
pub async fn topic_create_non_partitioned(&self, topic: &str) -> Result<(), AdminError> {
self.topic_create_non_partitioned_with_properties(topic, &HashMap::new())
.await
}
async fn topic_create_non_partitioned_with_properties(
&self,
topic: &str,
properties: &HashMap<String, String>,
) -> Result<(), AdminError> {
let (tenant, namespace, name) = split_topic(topic)?;
let url = self.url(&["persistent", tenant, namespace, name])?;
let resp = self
.send(self.http.request(Method::PUT, url).json(properties))
.await?;
empty_ok(resp).await
}
pub async fn topic_create_partitioned(
&self,
topic: &str,
partitions: u32,
) -> Result<(), AdminError> {
let (tenant, namespace, name) = split_topic(topic)?;
let url = self.url(&["persistent", tenant, namespace, name, "partitions"])?;
let resp = self
.send(self.http.request(Method::PUT, url).json(&partitions))
.await?;
empty_ok(resp).await
}
pub async fn topic_delete(&self, topic: &str, force: bool) -> Result<(), AdminError> {
let (tenant, namespace, name) = split_topic(topic)?;
let partitions = self.topic_partitions_count(topic).await?;
let force_str = if force { "true" } else { "false" };
let mut url = if partitions > 0 {
self.url(&["persistent", tenant, namespace, name, "partitions"])?
} else {
self.url(&["persistent", tenant, namespace, name])?
};
url.query_pairs_mut().append_pair("force", force_str);
let resp = self.send(self.http.request(Method::DELETE, url)).await?;
empty_ok(resp).await
}
pub async fn topic_stats(&self, topic: &str) -> Result<TopicStats, AdminError> {
let (tenant, namespace, name) = split_topic(topic)?;
let url = self.url(&["persistent", tenant, namespace, name, "stats"])?;
let resp = self.send(self.http.request(Method::GET, url)).await?;
json_ok(resp).await
}
pub async fn topic_partitioned_stats(&self, topic: &str) -> Result<TopicStats, AdminError> {
let (tenant, namespace, name) = split_topic(topic)?;
let mut url = self.url(&["persistent", tenant, namespace, name, "partitioned-stats"])?;
url.query_pairs_mut().append_pair("perPartition", "false");
let resp = self.send(self.http.request(Method::GET, url)).await?;
json_ok(resp).await
}
pub async fn topic_partitions_count(&self, topic: &str) -> Result<u32, AdminError> {
let (tenant, namespace, name) = split_topic(topic)?;
let url = self.url(&["persistent", tenant, namespace, name, "partitions"])?;
let resp = self.send(self.http.request(Method::GET, url)).await?;
let meta: PartitionedTopicMetadata = json_ok(resp).await?;
Ok(meta.partitions)
}
pub async fn topic_get_message_id_by_index(
&self,
topic: &str,
index: i64,
) -> Result<MessageId, AdminError> {
let (tenant, namespace, name) = split_topic(topic)?;
let mut url = self.url(&["persistent", tenant, namespace, name, "getMessageIdByIndex"])?;
url.query_pairs_mut()
.append_pair("index", &index.to_string());
let resp = self.send(self.http.request(Method::GET, url)).await?;
let dto: MessageIdResponse = json_ok(resp).await?;
dto.try_into_message_id()
}
pub async fn topic_compact(&self, topic: &str) -> Result<(), AdminError> {
let (tenant, namespace, name) = split_topic(topic)?;
let url = self.url(&["persistent", tenant, namespace, name, "compaction"])?;
let resp = self.send(self.http.request(Method::PUT, url)).await?;
empty_ok(resp).await
}
pub async fn topic_compaction_status(
&self,
topic: &str,
) -> Result<LongRunningProcessStatus, AdminError> {
let (tenant, namespace, name) = split_topic(topic)?;
let url = self.url(&["persistent", tenant, namespace, name, "compaction"])?;
let resp = self.send(self.http.request(Method::GET, url)).await?;
json_ok(resp).await
}
pub async fn topic_unload(&self, topic: &str) -> Result<(), AdminError> {
let (tenant, namespace, name) = split_topic(topic)?;
let url = self.url(&["persistent", tenant, namespace, name, "unload"])?;
let resp = self.send(self.http.request(Method::PUT, url)).await?;
empty_ok(resp).await
}
pub async fn topic_terminate(&self, topic: &str) -> Result<Option<MessageId>, AdminError> {
let (tenant, namespace, name) = split_topic(topic)?;
let url = self.url(&["persistent", tenant, namespace, name, "terminate"])?;
let resp = self.send(self.http.request(Method::POST, url)).await?;
let dto: MessageIdResponse = json_ok(resp).await?;
if dto.ledger_id < 0 && dto.entry_id < 0 {
return Ok(None);
}
dto.try_into_message_id().map(Some)
}
pub async fn topic_update_partitions(
&self,
topic: &str,
new_partitions: u32,
) -> Result<(), AdminError> {
let (tenant, namespace, name) = split_topic(topic)?;
let url = self.url(&["persistent", tenant, namespace, name, "partitions"])?;
let resp = self
.send(self.http.request(Method::POST, url).json(&new_partitions))
.await?;
empty_ok(resp).await
}
pub async fn topic_get_retention(&self, topic: &str) -> Result<RetentionPolicies, AdminError> {
let (tenant, namespace, name) = split_topic(topic)?;
let url = self.url(&["persistent", tenant, namespace, name, "retention"])?;
let resp = self.send(self.http.request(Method::GET, url)).await?;
json_ok_or_default(resp).await
}
pub async fn topic_set_retention(
&self,
topic: &str,
policy: RetentionPolicies,
) -> Result<(), AdminError> {
let (tenant, namespace, name) = split_topic(topic)?;
let url = self.url(&["persistent", tenant, namespace, name, "retention"])?;
let resp = self
.send(self.http.request(Method::POST, url).json(&policy))
.await?;
empty_ok(resp).await
}
pub async fn topic_remove_retention(&self, topic: &str) -> Result<(), AdminError> {
let (tenant, namespace, name) = split_topic(topic)?;
let url = self.url(&["persistent", tenant, namespace, name, "retention"])?;
let resp = self.send(self.http.request(Method::DELETE, url)).await?;
empty_ok(resp).await
}
pub async fn topic_get_backlog_quotas(
&self,
topic: &str,
) -> Result<serde_json::Value, AdminError> {
let (tenant, namespace, name) = split_topic(topic)?;
let url = self.url(&["persistent", tenant, namespace, name, "backlogQuotaMap"])?;
let resp = self.send(self.http.request(Method::GET, url)).await?;
json_ok(resp).await
}
pub async fn topic_set_backlog_quota(
&self,
topic: &str,
backlog_quota_type: BacklogQuotaType,
quota: BacklogQuota,
) -> Result<(), AdminError> {
let (tenant, namespace, name) = split_topic(topic)?;
let mut url = self.url(&["persistent", tenant, namespace, name, "backlogQuota"])?;
url.query_pairs_mut()
.append_pair("backlogQuotaType", backlog_quota_type.as_query_value());
let resp = self
.send(self.http.request(Method::POST, url).json("a))
.await?;
empty_ok(resp).await
}
pub async fn topic_remove_backlog_quota(
&self,
topic: &str,
backlog_quota_type: BacklogQuotaType,
) -> Result<(), AdminError> {
let (tenant, namespace, name) = split_topic(topic)?;
let mut url = self.url(&["persistent", tenant, namespace, name, "backlogQuota"])?;
url.query_pairs_mut()
.append_pair("backlogQuotaType", backlog_quota_type.as_query_value());
let resp = self.send(self.http.request(Method::DELETE, url)).await?;
empty_ok(resp).await
}
pub async fn topic_get_message_ttl(&self, topic: &str) -> Result<Option<i32>, AdminError> {
let (tenant, namespace, name) = split_topic(topic)?;
let url = self.url(&["persistent", tenant, namespace, name, "messageTTL"])?;
let resp = self.send(self.http.request(Method::GET, url)).await?;
json_ok_optional(resp).await
}
pub async fn topic_set_message_ttl(
&self,
topic: &str,
ttl_seconds: i32,
) -> Result<(), AdminError> {
let (tenant, namespace, name) = split_topic(topic)?;
let mut url = self.url(&["persistent", tenant, namespace, name, "messageTTL"])?;
url.query_pairs_mut()
.append_pair("messageTTL", &ttl_seconds.to_string());
let resp = self.send(self.http.request(Method::POST, url)).await?;
empty_ok(resp).await
}
pub async fn topic_remove_message_ttl(&self, topic: &str) -> Result<(), AdminError> {
let (tenant, namespace, name) = split_topic(topic)?;
let url = self.url(&["persistent", tenant, namespace, name, "messageTTL"])?;
let resp = self.send(self.http.request(Method::DELETE, url)).await?;
empty_ok(resp).await
}
pub async fn topic_get_persistence(
&self,
topic: &str,
) -> Result<Option<PersistencePolicies>, AdminError> {
let (tenant, namespace, name) = split_topic(topic)?;
let url = self.url(&["persistent", tenant, namespace, name, "persistence"])?;
let resp = self.send(self.http.request(Method::GET, url)).await?;
json_ok_optional(resp).await
}
pub async fn topic_set_persistence(
&self,
topic: &str,
policy: PersistencePolicies,
) -> Result<(), AdminError> {
let (tenant, namespace, name) = split_topic(topic)?;
let url = self.url(&["persistent", tenant, namespace, name, "persistence"])?;
let resp = self
.send(self.http.request(Method::POST, url).json(&policy))
.await?;
empty_ok(resp).await
}
pub async fn topic_remove_persistence(&self, topic: &str) -> Result<(), AdminError> {
let (tenant, namespace, name) = split_topic(topic)?;
let url = self.url(&["persistent", tenant, namespace, name, "persistence"])?;
let resp = self.send(self.http.request(Method::DELETE, url)).await?;
empty_ok(resp).await
}
pub async fn topic_get_dispatch_rate(
&self,
topic: &str,
) -> Result<Option<DispatchRate>, AdminError> {
let (tenant, namespace, name) = split_topic(topic)?;
let url = self.url(&["persistent", tenant, namespace, name, "dispatchRate"])?;
let resp = self.send(self.http.request(Method::GET, url)).await?;
json_ok_optional(resp).await
}
pub async fn topic_set_dispatch_rate(
&self,
topic: &str,
rate: DispatchRate,
) -> Result<(), AdminError> {
let (tenant, namespace, name) = split_topic(topic)?;
let url = self.url(&["persistent", tenant, namespace, name, "dispatchRate"])?;
let resp = self
.send(self.http.request(Method::POST, url).json(&rate))
.await?;
empty_ok(resp).await
}
pub async fn topic_remove_dispatch_rate(&self, topic: &str) -> Result<(), AdminError> {
let (tenant, namespace, name) = split_topic(topic)?;
let url = self.url(&["persistent", tenant, namespace, name, "dispatchRate"])?;
let resp = self.send(self.http.request(Method::DELETE, url)).await?;
empty_ok(resp).await
}
pub async fn topic_get_subscription_dispatch_rate(
&self,
topic: &str,
) -> Result<Option<DispatchRate>, AdminError> {
let (tenant, namespace, name) = split_topic(topic)?;
let url = self.url(&[
"persistent",
tenant,
namespace,
name,
"subscriptionDispatchRate",
])?;
let resp = self.send(self.http.request(Method::GET, url)).await?;
json_ok_optional(resp).await
}
pub async fn topic_set_subscription_dispatch_rate(
&self,
topic: &str,
rate: DispatchRate,
) -> Result<(), AdminError> {
let (tenant, namespace, name) = split_topic(topic)?;
let url = self.url(&[
"persistent",
tenant,
namespace,
name,
"subscriptionDispatchRate",
])?;
let resp = self
.send(self.http.request(Method::POST, url).json(&rate))
.await?;
empty_ok(resp).await
}
pub async fn topic_remove_subscription_dispatch_rate(
&self,
topic: &str,
) -> Result<(), AdminError> {
let (tenant, namespace, name) = split_topic(topic)?;
let url = self.url(&[
"persistent",
tenant,
namespace,
name,
"subscriptionDispatchRate",
])?;
let resp = self.send(self.http.request(Method::DELETE, url)).await?;
empty_ok(resp).await
}
pub async fn topic_get_replicator_dispatch_rate(
&self,
topic: &str,
) -> Result<Option<DispatchRate>, AdminError> {
let (tenant, namespace, name) = split_topic(topic)?;
let url = self.url(&[
"persistent",
tenant,
namespace,
name,
"replicatorDispatchRate",
])?;
let resp = self.send(self.http.request(Method::GET, url)).await?;
json_ok_optional(resp).await
}
pub async fn topic_set_replicator_dispatch_rate(
&self,
topic: &str,
rate: DispatchRate,
) -> Result<(), AdminError> {
let (tenant, namespace, name) = split_topic(topic)?;
let url = self.url(&[
"persistent",
tenant,
namespace,
name,
"replicatorDispatchRate",
])?;
let resp = self
.send(self.http.request(Method::POST, url).json(&rate))
.await?;
empty_ok(resp).await
}
pub async fn topic_remove_replicator_dispatch_rate(
&self,
topic: &str,
) -> Result<(), AdminError> {
let (tenant, namespace, name) = split_topic(topic)?;
let url = self.url(&[
"persistent",
tenant,
namespace,
name,
"replicatorDispatchRate",
])?;
let resp = self.send(self.http.request(Method::DELETE, url)).await?;
empty_ok(resp).await
}
pub async fn topic_get_publish_rate(
&self,
topic: &str,
) -> Result<Option<PublishRate>, AdminError> {
let (tenant, namespace, name) = split_topic(topic)?;
let url = self.url(&["persistent", tenant, namespace, name, "publishRate"])?;
let resp = self.send(self.http.request(Method::GET, url)).await?;
json_ok_optional(resp).await
}
pub async fn topic_set_publish_rate(
&self,
topic: &str,
rate: PublishRate,
) -> Result<(), AdminError> {
let (tenant, namespace, name) = split_topic(topic)?;
let url = self.url(&["persistent", tenant, namespace, name, "publishRate"])?;
let resp = self
.send(self.http.request(Method::POST, url).json(&rate))
.await?;
empty_ok(resp).await
}
pub async fn topic_remove_publish_rate(&self, topic: &str) -> Result<(), AdminError> {
let (tenant, namespace, name) = split_topic(topic)?;
let url = self.url(&["persistent", tenant, namespace, name, "publishRate"])?;
let resp = self.send(self.http.request(Method::DELETE, url)).await?;
empty_ok(resp).await
}
pub async fn topic_get_max_producers(&self, topic: &str) -> Result<Option<i32>, AdminError> {
let (tenant, namespace, name) = split_topic(topic)?;
let url = self.url(&["persistent", tenant, namespace, name, "maxProducers"])?;
let resp = self.send(self.http.request(Method::GET, url)).await?;
json_ok_optional(resp).await
}
pub async fn topic_set_max_producers(
&self,
topic: &str,
max_producers: i32,
) -> Result<(), AdminError> {
let (tenant, namespace, name) = split_topic(topic)?;
let url = self.url(&["persistent", tenant, namespace, name, "maxProducers"])?;
let resp = self
.send(self.http.request(Method::POST, url).json(&max_producers))
.await?;
empty_ok(resp).await
}
pub async fn topic_remove_max_producers(&self, topic: &str) -> Result<(), AdminError> {
let (tenant, namespace, name) = split_topic(topic)?;
let url = self.url(&["persistent", tenant, namespace, name, "maxProducers"])?;
let resp = self.send(self.http.request(Method::DELETE, url)).await?;
empty_ok(resp).await
}
pub async fn topic_get_max_consumers(&self, topic: &str) -> Result<Option<i32>, AdminError> {
let (tenant, namespace, name) = split_topic(topic)?;
let url = self.url(&["persistent", tenant, namespace, name, "maxConsumers"])?;
let resp = self.send(self.http.request(Method::GET, url)).await?;
json_ok_optional(resp).await
}
pub async fn topic_set_max_consumers(
&self,
topic: &str,
max_consumers: i32,
) -> Result<(), AdminError> {
let (tenant, namespace, name) = split_topic(topic)?;
let url = self.url(&["persistent", tenant, namespace, name, "maxConsumers"])?;
let resp = self
.send(self.http.request(Method::POST, url).json(&max_consumers))
.await?;
empty_ok(resp).await
}
pub async fn topic_remove_max_consumers(&self, topic: &str) -> Result<(), AdminError> {
let (tenant, namespace, name) = split_topic(topic)?;
let url = self.url(&["persistent", tenant, namespace, name, "maxConsumers"])?;
let resp = self.send(self.http.request(Method::DELETE, url)).await?;
empty_ok(resp).await
}
pub async fn subscriptions_list(&self, topic: &str) -> Result<Vec<String>, AdminError> {
let (tenant, namespace, name) = split_topic(topic)?;
let url = self.url(&["persistent", tenant, namespace, name, "subscriptions"])?;
let resp = self.send(self.http.request(Method::GET, url)).await?;
json_ok(resp).await
}
pub async fn subscription_reset_cursor_to_position(
&self,
topic: &str,
subscription: &str,
message_id: MessageId,
is_excluded: bool,
) -> Result<(), AdminError> {
let (tenant, namespace, name) = split_topic(topic)?;
validate_segment(subscription)?;
let url = self.url(&[
"persistent",
tenant,
namespace,
name,
"subscription",
subscription,
"resetcursor",
])?;
let body = ResetCursorData {
ledger_id: message_id_field_for_wire(message_id.ledger_id),
entry_id: message_id_field_for_wire(message_id.entry_id),
partition_index: message_id.partition,
batch_index: message_id.batch_index,
is_excluded,
};
let resp = self
.send(self.http.request(Method::POST, url).json(&body))
.await?;
empty_ok(resp).await
}
pub async fn subscription_reset_cursor_to_timestamp(
&self,
topic: &str,
subscription: &str,
timestamp_millis: u64,
) -> Result<(), AdminError> {
let (tenant, namespace, name) = split_topic(topic)?;
validate_segment(subscription)?;
let timestamp = timestamp_millis.to_string();
let url = self.url(&[
"persistent",
tenant,
namespace,
name,
"subscription",
subscription,
"resetcursor",
×tamp,
])?;
let resp = self.send(self.http.request(Method::POST, url)).await?;
empty_ok(resp).await
}
pub async fn subscription_skip_messages(
&self,
topic: &str,
subscription: &str,
num_messages: u64,
) -> Result<(), AdminError> {
let (tenant, namespace, name) = split_topic(topic)?;
validate_segment(subscription)?;
let n = num_messages.to_string();
let url = self.url(&[
"persistent",
tenant,
namespace,
name,
"subscription",
subscription,
"skip",
&n,
])?;
let resp = self.send(self.http.request(Method::POST, url)).await?;
empty_ok(resp).await
}
pub async fn subscription_skip_all_messages(
&self,
topic: &str,
subscription: &str,
) -> Result<(), AdminError> {
let (tenant, namespace, name) = split_topic(topic)?;
validate_segment(subscription)?;
let url = self.url(&[
"persistent",
tenant,
namespace,
name,
"subscription",
subscription,
"skip_all",
])?;
let resp = self.send(self.http.request(Method::POST, url)).await?;
empty_ok(resp).await
}
pub async fn subscription_expire_messages(
&self,
topic: &str,
subscription: &str,
expire_time_seconds: u64,
) -> Result<(), AdminError> {
let (tenant, namespace, name) = split_topic(topic)?;
validate_segment(subscription)?;
let s = expire_time_seconds.to_string();
let url = self.url(&[
"persistent",
tenant,
namespace,
name,
"subscription",
subscription,
"expireMessages",
&s,
])?;
let resp = self.send(self.http.request(Method::POST, url)).await?;
empty_ok(resp).await
}
pub async fn subscription_delete(
&self,
topic: &str,
subscription: &str,
force: bool,
) -> Result<(), AdminError> {
let (tenant, namespace, name) = split_topic(topic)?;
validate_segment(subscription)?;
let mut url = self.url(&[
"persistent",
tenant,
namespace,
name,
"subscription",
subscription,
])?;
if force {
url.query_pairs_mut().append_pair("force", "true");
}
let resp = self.send(self.http.request(Method::DELETE, url)).await?;
empty_ok(resp).await
}
pub async fn create_shadow_topic(&self, source: &str, shadow: &str) -> Result<(), AdminError> {
let (source_tenant, source_namespace, source_name) = split_topic(source)?;
let (shadow_tenant, shadow_namespace, shadow_name) = split_topic(shadow)?;
let source = format!("persistent://{source_tenant}/{source_namespace}/{source_name}");
let shadow = format!("persistent://{shadow_tenant}/{shadow_namespace}/{shadow_name}");
let mut properties = HashMap::new();
properties.insert("PULSAR.SHADOW_SOURCE".to_owned(), source.clone());
self.topic_create_non_partitioned_with_properties(&shadow, &properties)
.await?;
self.set_shadow_topics(&source, &[shadow]).await
}
async fn set_shadow_topics(&self, source: &str, shadows: &[String]) -> Result<(), AdminError> {
let (tenant, namespace, name) = split_topic(source)?;
let url = self.url(&["persistent", tenant, namespace, name, "shadowTopics"])?;
let resp = self
.send(self.http.request(Method::PUT, url).json(shadows))
.await?;
empty_ok(resp).await
}
pub async fn delete_shadow_topic(&self, shadow: &str, force: bool) -> Result<(), AdminError> {
let (tenant, namespace, name) = split_topic(shadow)?;
let mut url = self.url(&["persistent", tenant, namespace, name])?;
url.query_pairs_mut()
.append_pair("force", if force { "true" } else { "false" });
let resp = self.send(self.http.request(Method::DELETE, url)).await?;
empty_ok(resp).await
}
pub async fn get_shadow_topics(&self, source: &str) -> Result<Vec<String>, AdminError> {
let (tenant, namespace, name) = split_topic(source)?;
let url = self.url(&["persistent", tenant, namespace, name, "shadowTopics"])?;
let resp = self.send(self.http.request(Method::GET, url)).await?;
json_ok_or_default(resp).await
}
pub async fn get_shadow_source(&self, shadow: &str) -> Result<Option<String>, AdminError> {
let (tenant, namespace, name) = split_topic(shadow)?;
let url = self.url(&["persistent", tenant, namespace, name, "properties"])?;
let resp = self.send(self.http.request(Method::GET, url)).await?;
let mut properties: HashMap<String, String> = json_ok_or_default(resp).await?;
Ok(properties
.remove("PULSAR.SHADOW_SOURCE")
.filter(|source| !source.is_empty()))
}
pub async fn sources_list_by_namespace(
&self,
tenant: &str,
namespace: &str,
) -> Result<Vec<String>, AdminError> {
validate_segment(tenant)?;
validate_segment(namespace)?;
let url = self.url_v3(&["sources", tenant, namespace])?;
let resp = self.send(self.http.request(Method::GET, url)).await?;
json_ok(resp).await
}
pub async fn source_get(
&self,
tenant: &str,
namespace: &str,
name: &str,
) -> Result<serde_json::Value, AdminError> {
validate_segment(tenant)?;
validate_segment(namespace)?;
validate_segment(name)?;
let url = self.url_v3(&["sources", tenant, namespace, name])?;
let resp = self.send(self.http.request(Method::GET, url)).await?;
json_ok(resp).await
}
pub async fn source_status(
&self,
tenant: &str,
namespace: &str,
name: &str,
) -> Result<serde_json::Value, AdminError> {
validate_segment(tenant)?;
validate_segment(namespace)?;
validate_segment(name)?;
let url = self.url_v3(&["sources", tenant, namespace, name, "status"])?;
let resp = self.send(self.http.request(Method::GET, url)).await?;
json_ok(resp).await
}
pub async fn source_create_with_url(
&self,
tenant: &str,
namespace: &str,
name: &str,
url: &str,
config: SourceConfig,
) -> Result<(), AdminError> {
validate_segment(tenant)?;
validate_segment(namespace)?;
validate_segment(name)?;
let endpoint = self.url_v3(&["sources", tenant, namespace, name])?;
let form = build_url_config_multipart(url, "sourceConfig", &config)?;
let resp = self
.send(
self.http
.request(Method::POST, endpoint)
.multipart(form)
.timeout(PACKAGE_REGISTER_TIMEOUT),
)
.await?;
empty_ok(resp).await
}
pub async fn source_update_with_url(
&self,
tenant: &str,
namespace: &str,
name: &str,
url: &str,
config: SourceConfig,
) -> Result<(), AdminError> {
validate_segment(tenant)?;
validate_segment(namespace)?;
validate_segment(name)?;
let endpoint = self.url_v3(&["sources", tenant, namespace, name])?;
let form = build_url_config_multipart(url, "sourceConfig", &config)?;
let resp = self
.send(
self.http
.request(Method::PUT, endpoint)
.multipart(form)
.timeout(PACKAGE_REGISTER_TIMEOUT),
)
.await?;
empty_ok(resp).await
}
pub async fn source_delete(
&self,
tenant: &str,
namespace: &str,
name: &str,
) -> Result<(), AdminError> {
validate_segment(tenant)?;
validate_segment(namespace)?;
validate_segment(name)?;
let url = self.url_v3(&["sources", tenant, namespace, name])?;
let resp = self.send(self.http.request(Method::DELETE, url)).await?;
empty_ok(resp).await
}
pub async fn source_start(
&self,
tenant: &str,
namespace: &str,
name: &str,
) -> Result<(), AdminError> {
validate_segment(tenant)?;
validate_segment(namespace)?;
validate_segment(name)?;
let url = self.url_v3(&["sources", tenant, namespace, name, "start"])?;
let resp = self.send(self.http.request(Method::POST, url)).await?;
empty_ok(resp).await
}
pub async fn source_stop(
&self,
tenant: &str,
namespace: &str,
name: &str,
) -> Result<(), AdminError> {
validate_segment(tenant)?;
validate_segment(namespace)?;
validate_segment(name)?;
let url = self.url_v3(&["sources", tenant, namespace, name, "stop"])?;
let resp = self.send(self.http.request(Method::POST, url)).await?;
empty_ok(resp).await
}
pub async fn source_restart(
&self,
tenant: &str,
namespace: &str,
name: &str,
) -> Result<(), AdminError> {
validate_segment(tenant)?;
validate_segment(namespace)?;
validate_segment(name)?;
let url = self.url_v3(&["sources", tenant, namespace, name, "restart"])?;
let resp = self.send(self.http.request(Method::POST, url)).await?;
empty_ok(resp).await
}
pub async fn sinks_list_by_namespace(
&self,
tenant: &str,
namespace: &str,
) -> Result<Vec<String>, AdminError> {
validate_segment(tenant)?;
validate_segment(namespace)?;
let url = self.url_v3(&["sinks", tenant, namespace])?;
let resp = self.send(self.http.request(Method::GET, url)).await?;
json_ok(resp).await
}
pub async fn sink_get(
&self,
tenant: &str,
namespace: &str,
name: &str,
) -> Result<serde_json::Value, AdminError> {
validate_segment(tenant)?;
validate_segment(namespace)?;
validate_segment(name)?;
let url = self.url_v3(&["sinks", tenant, namespace, name])?;
let resp = self.send(self.http.request(Method::GET, url)).await?;
json_ok(resp).await
}
pub async fn sink_status(
&self,
tenant: &str,
namespace: &str,
name: &str,
) -> Result<serde_json::Value, AdminError> {
validate_segment(tenant)?;
validate_segment(namespace)?;
validate_segment(name)?;
let url = self.url_v3(&["sinks", tenant, namespace, name, "status"])?;
let resp = self.send(self.http.request(Method::GET, url)).await?;
json_ok(resp).await
}
pub async fn sink_create_with_url(
&self,
tenant: &str,
namespace: &str,
name: &str,
url: &str,
config: SinkConfig,
) -> Result<(), AdminError> {
validate_segment(tenant)?;
validate_segment(namespace)?;
validate_segment(name)?;
let endpoint = self.url_v3(&["sinks", tenant, namespace, name])?;
let form = build_url_config_multipart(url, "sinkConfig", &config)?;
let resp = self
.send(
self.http
.request(Method::POST, endpoint)
.multipart(form)
.timeout(PACKAGE_REGISTER_TIMEOUT),
)
.await?;
empty_ok(resp).await
}
pub async fn sink_update_with_url(
&self,
tenant: &str,
namespace: &str,
name: &str,
url: &str,
config: SinkConfig,
) -> Result<(), AdminError> {
validate_segment(tenant)?;
validate_segment(namespace)?;
validate_segment(name)?;
let endpoint = self.url_v3(&["sinks", tenant, namespace, name])?;
let form = build_url_config_multipart(url, "sinkConfig", &config)?;
let resp = self
.send(
self.http
.request(Method::PUT, endpoint)
.multipart(form)
.timeout(PACKAGE_REGISTER_TIMEOUT),
)
.await?;
empty_ok(resp).await
}
pub async fn sink_delete(
&self,
tenant: &str,
namespace: &str,
name: &str,
) -> Result<(), AdminError> {
validate_segment(tenant)?;
validate_segment(namespace)?;
validate_segment(name)?;
let url = self.url_v3(&["sinks", tenant, namespace, name])?;
let resp = self.send(self.http.request(Method::DELETE, url)).await?;
empty_ok(resp).await
}
pub async fn sink_start(
&self,
tenant: &str,
namespace: &str,
name: &str,
) -> Result<(), AdminError> {
validate_segment(tenant)?;
validate_segment(namespace)?;
validate_segment(name)?;
let url = self.url_v3(&["sinks", tenant, namespace, name, "start"])?;
let resp = self.send(self.http.request(Method::POST, url)).await?;
empty_ok(resp).await
}
pub async fn sink_stop(
&self,
tenant: &str,
namespace: &str,
name: &str,
) -> Result<(), AdminError> {
validate_segment(tenant)?;
validate_segment(namespace)?;
validate_segment(name)?;
let url = self.url_v3(&["sinks", tenant, namespace, name, "stop"])?;
let resp = self.send(self.http.request(Method::POST, url)).await?;
empty_ok(resp).await
}
pub async fn sink_restart(
&self,
tenant: &str,
namespace: &str,
name: &str,
) -> Result<(), AdminError> {
validate_segment(tenant)?;
validate_segment(namespace)?;
validate_segment(name)?;
let url = self.url_v3(&["sinks", tenant, namespace, name, "restart"])?;
let resp = self.send(self.http.request(Method::POST, url)).await?;
empty_ok(resp).await
}
pub async fn packages_list(
&self,
pkg_type: PackageType,
tenant: &str,
namespace: &str,
) -> Result<Vec<String>, AdminError> {
validate_segment(tenant)?;
validate_segment(namespace)?;
let url = self.url_v3(&["packages", pkg_type.as_str(), tenant, namespace])?;
let resp = self.send(self.http.request(Method::GET, url)).await?;
json_ok(resp).await
}
pub async fn package_versions_list(
&self,
pkg_type: PackageType,
tenant: &str,
namespace: &str,
name: &str,
) -> Result<Vec<String>, AdminError> {
validate_segment(tenant)?;
validate_segment(namespace)?;
validate_segment(name)?;
let url = self.url_v3(&["packages", pkg_type.as_str(), tenant, namespace, name])?;
let resp = self.send(self.http.request(Method::GET, url)).await?;
json_ok(resp).await
}
pub async fn package_metadata_get(
&self,
pkg_type: PackageType,
tenant: &str,
namespace: &str,
name: &str,
version: &str,
) -> Result<serde_json::Value, AdminError> {
validate_segment(tenant)?;
validate_segment(namespace)?;
validate_segment(name)?;
validate_segment(version)?;
let url = self.url_v3(&[
"packages",
pkg_type.as_str(),
tenant,
namespace,
name,
version,
"metadata",
])?;
let resp = self.send(self.http.request(Method::GET, url)).await?;
json_ok(resp).await
}
pub async fn package_metadata_set(
&self,
pkg_type: PackageType,
tenant: &str,
namespace: &str,
name: &str,
version: &str,
metadata: PackageMetadata,
) -> Result<(), AdminError> {
validate_segment(tenant)?;
validate_segment(namespace)?;
validate_segment(name)?;
validate_segment(version)?;
let url = self.url_v3(&[
"packages",
pkg_type.as_str(),
tenant,
namespace,
name,
version,
"metadata",
])?;
let resp = self
.send(self.http.request(Method::PUT, url).json(&metadata))
.await?;
empty_ok(resp).await
}
pub async fn package_delete(
&self,
pkg_type: PackageType,
tenant: &str,
namespace: &str,
name: &str,
version: &str,
) -> Result<(), AdminError> {
validate_segment(tenant)?;
validate_segment(namespace)?;
validate_segment(name)?;
validate_segment(version)?;
let url = self.url_v3(&[
"packages",
pkg_type.as_str(),
tenant,
namespace,
name,
version,
])?;
let resp = self.send(self.http.request(Method::DELETE, url)).await?;
empty_ok(resp).await
}
fn url(&self, segments: &[&str]) -> Result<Url, AdminError> {
Self::url_for(&self.base_url, segments)
}
fn url_v3(&self, segments: &[&str]) -> Result<Url, AdminError> {
Self::url_for(&self.base_url_v3, segments)
}
fn url_for(base: &Url, segments: &[&str]) -> Result<Url, AdminError> {
let mut url = base.clone();
{
let mut path = url
.path_segments_mut()
.map_err(|()| AdminError::Builder("base url is cannot-be-a-base".into()))?;
path.pop_if_empty();
for segment in segments {
path.push(segment);
}
}
Ok(url)
}
async fn send(&self, req: RequestBuilder) -> Result<ApiResponse, AdminError> {
let req = match &self.auth {
AdminAuth::None => req,
AdminAuth::Token(tok) => bearer(req, tok)?,
AdminAuth::OAuth2(flow) => {
flow.ensure_fresh()
.await
.map_err(|err| AdminError::Auth(format!("oauth2 token refresh: {err}")))?;
let token = flow
.cached_access_token()
.filter(|t| !t.is_empty())
.ok_or_else(|| {
AdminError::Auth("oauth2 returned an empty access token".to_owned())
})?;
let tok = std::str::from_utf8(&token).map_err(|err| {
AdminError::Auth(format!("oauth2 access token is not valid utf-8: {err}"))
})?;
bearer(req, tok)?
}
};
let request = req.build()?;
let method = request.method().clone();
let url = request.url().clone();
let resp = self.http.execute(request).await?;
Ok(ApiResponse { method, url, resp })
}
}
struct ApiResponse {
method: Method,
url: Url,
resp: Response,
}
#[derive(Debug, Clone, Default, Deserialize, Serialize)]
#[serde(default, rename_all = "camelCase")]
pub struct FunctionConfig {
pub tenant: String,
pub namespace: String,
pub name: String,
#[serde(rename = "className")]
pub class_name: String,
pub inputs: Vec<String>,
pub output: String,
pub runtime: String,
pub parallelism: i32,
#[serde(rename = "userConfig", skip_serializing_if = "Option::is_none")]
pub user_config: Option<serde_json::Value>,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct TenantInfo {
#[serde(rename = "adminRoles")]
pub admin_roles: Vec<String>,
#[serde(rename = "allowedClusters")]
pub allowed_clusters: Vec<String>,
}
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "camelCase")]
struct MessageIdResponse {
ledger_id: i64,
entry_id: i64,
#[serde(default = "default_partition_index")]
partition_index: i32,
}
fn default_partition_index() -> i32 {
-1
}
impl MessageIdResponse {
fn try_into_message_id(self) -> Result<MessageId, AdminError> {
let ledger_id = u64::try_from(self.ledger_id).map_err(|_| {
AdminError::Protocol(format!("negative ledgerId from broker: {}", self.ledger_id))
})?;
let entry_id = u64::try_from(self.entry_id).map_err(|_| {
AdminError::Protocol(format!("negative entryId from broker: {}", self.entry_id))
})?;
Ok(MessageId {
ledger_id,
entry_id,
partition: self.partition_index,
batch_index: -1,
batch_size: -1,
#[cfg(feature = "scalable-topics")]
segment_id: None,
})
}
}
#[derive(Debug, Clone, Default, Deserialize)]
#[serde(default)]
pub struct TopicStats {
#[serde(rename = "msgRateIn")]
pub msg_rate_in: f64,
#[serde(rename = "msgRateOut")]
pub msg_rate_out: f64,
#[serde(rename = "msgThroughputIn")]
pub msg_throughput_in: f64,
#[serde(rename = "msgThroughputOut")]
pub msg_throughput_out: f64,
#[serde(rename = "averageMsgSize")]
pub average_msg_size: f64,
#[serde(rename = "msgInCounter")]
pub msg_in_counter: i64,
#[serde(rename = "bytesInCounter")]
pub bytes_in_counter: i64,
#[serde(rename = "storageSize")]
pub storage_size: i64,
#[serde(rename = "backlogSize")]
pub backlog_size: i64,
pub publishers: Vec<serde_json::Value>,
pub subscriptions: serde_json::Value,
}
#[derive(Debug, Clone, Default, Deserialize)]
#[serde(default)]
struct PartitionedTopicMetadata {
partitions: u32,
}
#[derive(Debug, Clone, Copy, Default, Deserialize, Serialize)]
#[serde(default, rename_all = "camelCase")]
pub struct RetentionPolicies {
pub retention_time_in_minutes: i32,
#[serde(rename = "retentionSizeInMB")]
pub retention_size_in_mb: i64,
}
#[derive(Debug, Clone, Copy, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct PersistencePolicies {
#[serde(default = "default_bookkeeper_quorum")]
pub bookkeeper_ensemble: i32,
#[serde(default = "default_bookkeeper_quorum")]
pub bookkeeper_write_quorum: i32,
#[serde(default = "default_bookkeeper_quorum")]
pub bookkeeper_ack_quorum: i32,
#[serde(default)]
pub managed_ledger_max_mark_delete_rate: f64,
}
impl Default for PersistencePolicies {
fn default() -> Self {
Self {
bookkeeper_ensemble: 2,
bookkeeper_write_quorum: 2,
bookkeeper_ack_quorum: 2,
managed_ledger_max_mark_delete_rate: 0.0,
}
}
}
#[inline]
fn default_bookkeeper_quorum() -> i32 {
2
}
#[derive(Debug, Clone, Copy, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct DispatchRate {
#[serde(default = "neg_one_i32")]
pub dispatch_throttling_rate_in_msg: i32,
#[serde(default = "neg_one_i64")]
pub dispatch_throttling_rate_in_byte: i64,
#[serde(default = "default_rate_period_seconds")]
pub rate_period_in_second: i32,
#[serde(default)]
pub relative_to_publish_rate: bool,
}
impl Default for DispatchRate {
fn default() -> Self {
Self {
dispatch_throttling_rate_in_msg: -1,
dispatch_throttling_rate_in_byte: -1,
rate_period_in_second: 1,
relative_to_publish_rate: false,
}
}
}
#[derive(Debug, Clone, Copy, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct PublishRate {
#[serde(default = "neg_one_i32")]
pub publish_throttling_rate_in_msg: i32,
#[serde(default = "neg_one_i64")]
pub publish_throttling_rate_in_byte: i64,
}
impl Default for PublishRate {
fn default() -> Self {
Self {
publish_throttling_rate_in_msg: -1,
publish_throttling_rate_in_byte: -1,
}
}
}
#[inline]
fn neg_one_i32() -> i32 {
-1
}
#[inline]
fn neg_one_i64() -> i64 {
-1
}
#[inline]
fn default_rate_period_seconds() -> i32 {
1
}
#[derive(Debug, Clone, Copy, Default, Deserialize, Serialize)]
#[serde(default, rename_all = "camelCase")]
pub struct DelayedDeliveryPolicies {
pub active: bool,
#[serde(rename = "tickTime")]
pub tick_time_millis: i64,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct BacklogQuota {
#[serde(default = "neg_one_i64")]
pub limit_size: i64,
#[serde(default = "neg_one_i32")]
pub limit_time: i32,
#[serde(default)]
pub policy: String,
}
impl Default for BacklogQuota {
fn default() -> Self {
Self {
limit_size: -1,
limit_time: -1,
policy: String::new(),
}
}
}
#[derive(Debug, Clone, Default, Deserialize, Serialize)]
#[serde(default, rename_all = "camelCase")]
pub struct BookieInfo {
pub rack: String,
pub hostname: String,
}
#[derive(Debug, Clone, Default, Deserialize, Serialize)]
#[serde(default)]
pub struct PostSchemaPayload {
#[serde(rename = "type")]
pub schema_type: String,
pub schema: String,
pub properties: std::collections::HashMap<String, String>,
}
#[derive(Debug, Clone, Default, Deserialize, Serialize)]
#[serde(default, rename_all = "camelCase")]
pub struct SourceConfig {
pub tenant: String,
pub namespace: String,
pub name: String,
pub class_name: String,
pub topic_name: String,
pub parallelism: i32,
#[serde(skip_serializing_if = "Option::is_none")]
pub configs: Option<serde_json::Value>,
}
#[derive(Debug, Clone, Default, Deserialize, Serialize)]
#[serde(default, rename_all = "camelCase")]
pub struct SinkConfig {
pub tenant: String,
pub namespace: String,
pub name: String,
pub class_name: String,
pub inputs: Vec<String>,
pub parallelism: i32,
#[serde(skip_serializing_if = "Option::is_none")]
pub configs: Option<serde_json::Value>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PackageType {
Function,
Source,
Sink,
}
impl PackageType {
#[must_use]
pub fn as_str(self) -> &'static str {
match self {
Self::Function => "function",
Self::Source => "source",
Self::Sink => "sink",
}
}
}
impl std::str::FromStr for PackageType {
type Err = AdminError;
fn from_str(s: &str) -> Result<Self, AdminError> {
match s.to_ascii_lowercase().as_str() {
"function" | "functions" => Ok(Self::Function),
"source" | "sources" => Ok(Self::Source),
"sink" | "sinks" => Ok(Self::Sink),
other => Err(AdminError::InvalidName(format!(
"unknown package type {other:?} (expected: function | source | sink)"
))),
}
}
}
#[derive(Debug, Clone, Default, Deserialize, Serialize)]
#[serde(default, rename_all = "camelCase")]
pub struct PackageMetadata {
pub description: String,
pub contact: String,
pub modification_time: i64,
pub properties: std::collections::HashMap<String, String>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum BacklogQuotaType {
DestinationStorage,
MessageAge,
}
impl BacklogQuotaType {
#[must_use]
pub fn as_query_value(self) -> &'static str {
match self {
Self::DestinationStorage => "destination_storage",
Self::MessageAge => "message_age",
}
}
}
#[derive(Debug, Clone, Default, Deserialize, Serialize)]
#[serde(default, rename_all = "camelCase")]
pub struct LongRunningProcessStatus {
pub status: String,
pub last_error: String,
}
#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
struct ResetCursorData {
ledger_id: i64,
entry_id: i64,
partition_index: i32,
batch_index: i32,
#[serde(rename = "isExcluded")]
is_excluded: bool,
}
#[inline]
fn message_id_field_for_wire(value: u64) -> i64 {
if value == u64::MAX {
-1
} else {
value as i64
}
}
#[derive(Debug, Default)]
pub struct AdminClientBuilder {
base_url: Option<Url>,
auth: AdminAuth,
timeout: Option<Duration>,
tls_trust_cert_pem: Option<Vec<u8>>,
tls_allow_insecure: bool,
}
impl AdminClientBuilder {
#[must_use]
pub fn service_url(mut self, url: Url) -> Self {
self.base_url = Some(url);
self
}
#[must_use]
pub fn token(mut self, token: String) -> Self {
self.auth = AdminAuth::Token(token);
self
}
#[must_use]
pub fn oauth2(mut self, flow: Arc<ClientCredentialsFlow>) -> Self {
self.auth = AdminAuth::OAuth2(flow);
self
}
#[must_use]
pub fn tls_trust_cert_pem(mut self, pem: Vec<u8>) -> Self {
self.tls_trust_cert_pem = Some(pem);
self
}
#[must_use]
pub fn tls_allow_insecure(mut self, allow: bool) -> Self {
self.tls_allow_insecure = allow;
self
}
#[must_use]
pub fn timeout(mut self, dur: Duration) -> Self {
self.timeout = Some(dur);
self
}
pub fn build(self) -> Result<AdminClient, AdminError> {
let base_url = self
.base_url
.ok_or_else(|| AdminError::Builder("service_url is required".into()))?;
if base_url.cannot_be_a_base() {
return Err(AdminError::Builder(format!(
"service_url cannot be a base url: {base_url}"
)));
}
let base_url = {
let mut b = base_url.clone();
if !b.path().ends_with('/') {
b.set_path(&format!("{}/", b.path()));
}
b
};
let base_url_v3 = base_url.join("admin/v3/")?;
let base_url = base_url.join("admin/v2/")?;
tls_crypto::install_default_provider();
let timeout = self.timeout.unwrap_or(DEFAULT_TIMEOUT);
let http_builder = reqwest::Client::builder().timeout(timeout);
let http = apply_tls_options(
http_builder,
self.tls_trust_cert_pem,
self.tls_allow_insecure,
)?
.build()
.map_err(AdminError::Http)?;
Ok(AdminClient {
base_url,
base_url_v3,
http,
auth: self.auth,
})
}
}
#[derive(Debug, thiserror::Error)]
pub enum AdminError {
#[error("http error: {0}")]
Http(#[from] reqwest::Error),
#[error("api error {code} from {method} {url}: {body}")]
Status {
method: String,
url: String,
code: u16,
body: String,
},
#[error(
"unexpected response from {method} {url}: HTTP {status}, content-type {content_type}, body: {snippet}"
)]
Decode {
method: String,
url: String,
status: u16,
content_type: String,
snippet: String,
#[source]
source: serde_json::Error,
},
#[error("json encode: {0}")]
Json(#[from] serde_json::Error),
#[error("invalid url: {0}")]
Url(#[from] url::ParseError),
#[error("invalid builder: {0}")]
Builder(String),
#[error("auth error: {0}")]
Auth(String),
#[error("invalid name: {0}")]
InvalidName(String),
#[error("broker protocol violation: {0}")]
Protocol(String),
}
#[cfg(any(
feature = "crypto-aws-lc-rs",
feature = "crypto-ring",
feature = "crypto-openssl",
feature = "crypto-fips",
))]
fn apply_tls_options(
mut builder: reqwest::ClientBuilder,
trust_cert_pem: Option<Vec<u8>>,
allow_insecure: bool,
) -> Result<reqwest::ClientBuilder, AdminError> {
if let Some(pem) = trust_cert_pem {
let cert = reqwest::Certificate::from_pem(&pem)
.map_err(|err| AdminError::Builder(format!("invalid tls trust cert PEM: {err}")))?;
builder = builder.add_root_certificate(cert);
}
if allow_insecure {
builder = builder.danger_accept_invalid_certs(true);
}
Ok(builder)
}
#[cfg(not(any(
feature = "crypto-aws-lc-rs",
feature = "crypto-ring",
feature = "crypto-openssl",
feature = "crypto-fips",
)))]
fn apply_tls_options(
builder: reqwest::ClientBuilder,
trust_cert_pem: Option<Vec<u8>>,
allow_insecure: bool,
) -> Result<reqwest::ClientBuilder, AdminError> {
if trust_cert_pem.is_some() || allow_insecure {
return Err(AdminError::Builder(
"TLS options (trust cert / allow-insecure) require a crypto-* feature".to_owned(),
));
}
Ok(builder)
}
fn bearer(req: RequestBuilder, tok: &str) -> Result<RequestBuilder, AdminError> {
let value = format!("Bearer {tok}");
let mut headers = HeaderMap::new();
let header_value = HeaderValue::from_str(&value)
.map_err(|err| AdminError::Builder(format!("invalid bearer token: {err}")))?;
headers.insert(AUTHORIZATION, header_value);
Ok(req.headers(headers))
}
const DECODE_SNIPPET_LIMIT: usize = 256;
fn decode_error(
method: &Method,
url: &Url,
status: u16,
headers: &HeaderMap,
body: &[u8],
source: serde_json::Error,
) -> AdminError {
let content_type = headers
.get(reqwest::header::CONTENT_TYPE)
.and_then(|v| v.to_str().ok())
.map_or_else(|| "<none>".to_owned(), ToOwned::to_owned);
let snippet = if body.len() > DECODE_SNIPPET_LIMIT {
format!(
"{}… (truncated)",
String::from_utf8_lossy(&body[..DECODE_SNIPPET_LIMIT])
)
} else {
String::from_utf8_lossy(body).into_owned()
};
AdminError::Decode {
method: method.to_string(),
url: url.to_string(),
status,
content_type,
snippet,
source,
}
}
async fn json_ok<T>(api: ApiResponse) -> Result<T, AdminError>
where
T: for<'de> Deserialize<'de>,
{
let api = ensure_status(api).await?;
let ApiResponse { method, url, resp } = api;
let status = resp.status().as_u16();
let headers = resp.headers().clone();
let bytes = resp.bytes().await?;
serde_json::from_slice(&bytes)
.map_err(|err| decode_error(&method, &url, status, &headers, &bytes, err))
}
async fn json_ok_or_default<T>(api: ApiResponse) -> Result<T, AdminError>
where
T: for<'de> Deserialize<'de> + Default,
{
let api = ensure_status(api).await?;
let ApiResponse { method, url, resp } = api;
if resp.status() == StatusCode::NO_CONTENT {
return Ok(T::default());
}
let status = resp.status().as_u16();
let headers = resp.headers().clone();
let bytes = resp.bytes().await?;
if bytes.is_empty() {
return Ok(T::default());
}
if bytes.as_ref().trim_ascii() == b"null" {
return Ok(T::default());
}
serde_json::from_slice::<T>(&bytes)
.map_err(|err| decode_error(&method, &url, status, &headers, &bytes, err))
}
async fn json_ok_optional<T>(api: ApiResponse) -> Result<Option<T>, AdminError>
where
T: for<'de> Deserialize<'de>,
{
let api = ensure_status(api).await?;
let ApiResponse { method, url, resp } = api;
if resp.status() == StatusCode::NO_CONTENT {
return Ok(None);
}
let status = resp.status().as_u16();
let headers = resp.headers().clone();
let bytes = resp.bytes().await?;
if bytes.is_empty() {
return Ok(None);
}
serde_json::from_slice::<Option<T>>(&bytes)
.map_err(|err| decode_error(&method, &url, status, &headers, &bytes, err))
}
async fn empty_ok(api: ApiResponse) -> Result<(), AdminError> {
let _ = ensure_status(api).await?;
Ok(())
}
async fn ensure_status(api: ApiResponse) -> Result<ApiResponse, AdminError> {
let status = api.resp.status();
if status.is_success() || status == StatusCode::NO_CONTENT {
return Ok(api);
}
let code = status.as_u16();
let ApiResponse { method, url, resp } = api;
let body = resp
.text()
.await
.unwrap_or_else(|err| format!("<failed to read body: {err}>"));
Err(AdminError::Status {
method: method.to_string(),
url: url.to_string(),
code,
body,
})
}
fn validate_segment(segment: &str) -> Result<(), AdminError> {
if segment.is_empty() {
return Err(AdminError::InvalidName("empty path segment".into()));
}
if segment == "." || segment == ".." {
return Err(AdminError::InvalidName(format!(
"dot segment is not a valid name: {segment:?}",
)));
}
if segment.contains("%2F") || segment.contains("%2f") {
return Err(AdminError::InvalidName(format!(
"percent-encoded slash in segment: {segment:?}",
)));
}
if segment.bytes().any(|b| b < 0x20 || b == 0x7f) {
return Err(AdminError::InvalidName(format!(
"control byte in segment: {segment:?}",
)));
}
Ok(())
}
pub fn split_namespace(ns: &str) -> Result<(&str, &str), AdminError> {
let (tenant, namespace) = ns.split_once('/').ok_or_else(|| {
AdminError::InvalidName(format!("expected tenant/namespace, got {ns:?} (no '/')"))
})?;
if tenant.is_empty() || namespace.is_empty() || namespace.contains('/') {
return Err(AdminError::InvalidName(format!(
"expected tenant/namespace, got {ns:?}"
)));
}
validate_segment(tenant)?;
validate_segment(namespace)?;
Ok((tenant, namespace))
}
fn split_topic(topic: &str) -> Result<(&str, &str, &str), AdminError> {
let rest = topic.strip_prefix("persistent://").unwrap_or(topic);
let mut parts = rest.splitn(3, '/');
let tenant = parts.next().unwrap_or("");
let namespace = parts.next().unwrap_or("");
let name = parts.next().unwrap_or("");
if tenant.is_empty() || namespace.is_empty() || name.is_empty() || name.contains('/') {
return Err(AdminError::InvalidName(format!(
"expected [persistent://]tenant/namespace/topic, got {topic:?}"
)));
}
validate_segment(tenant)?;
validate_segment(namespace)?;
validate_segment(name)?;
Ok((tenant, namespace, name))
}
fn function_pkg_form(
pkg_url: &str,
config: &FunctionConfig,
) -> Result<reqwest::multipart::Form, AdminError> {
build_url_config_multipart(pkg_url, "functionConfig", config)
}
pub fn split_function_id(id: &str) -> Result<(&str, &str, &str), AdminError> {
let mut parts = id.splitn(3, '/');
let tenant = parts.next().unwrap_or("");
let namespace = parts.next().unwrap_or("");
let name = parts.next().unwrap_or("");
if tenant.is_empty() || namespace.is_empty() || name.is_empty() || name.contains('/') {
return Err(AdminError::InvalidName(format!(
"expected tenant/namespace/name, got {id:?}"
)));
}
validate_segment(tenant)?;
validate_segment(namespace)?;
validate_segment(name)?;
Ok((tenant, namespace, name))
}
fn build_url_config_multipart<T: Serialize>(
pkg_url: &str,
config_field: &str,
config: &T,
) -> Result<reqwest::multipart::Form, AdminError> {
let body = serde_json::to_string(config)?;
let config_part = reqwest::multipart::Part::text(body)
.mime_str("application/json")
.expect("application/json is a well-formed media type");
let form = reqwest::multipart::Form::new()
.text("url", pkg_url.to_owned())
.part(config_field.to_owned(), config_part);
Ok(form)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn builder_requires_service_url() {
let err = AdminClient::builder().build().unwrap_err();
assert!(matches!(err, AdminError::Builder(_)));
}
#[test]
fn builder_appends_admin_v2_prefix() {
let client = AdminClient::builder()
.service_url("http://localhost:8080".parse().unwrap())
.build()
.unwrap();
assert_eq!(
client.base_url().as_str(),
"http://localhost:8080/admin/v2/"
);
}
#[test]
fn builder_carries_token() {
let client = AdminClient::builder()
.service_url("http://localhost:8080".parse().unwrap())
.token("abc".into())
.build()
.unwrap();
assert!(matches!(client.auth(), AdminAuth::Token(t) if t == "abc"));
}
#[test]
fn admin_auth_token_debug_redacts_secret() {
let auth = AdminAuth::Token("super-secret-jwt".to_owned());
let rendered = format!("{auth:?}");
assert!(
!rendered.contains("super-secret-jwt"),
"raw token leaked through Debug: {rendered}",
);
assert!(
rendered.contains("<redacted>"),
"expected redaction sentinel in {rendered}"
);
assert!(
rendered.contains("Token"),
"expected variant name in {rendered}"
);
let none_rendered = format!("{:?}", AdminAuth::None);
assert_eq!(none_rendered, "None");
}
#[test]
fn admin_client_debug_does_not_leak_token() {
let client = AdminClient::builder()
.service_url("http://localhost:8080".parse().unwrap())
.token("leaky-token".into())
.build()
.unwrap();
let rendered = format!("{client:?}");
assert!(
!rendered.contains("leaky-token"),
"raw token leaked through AdminClient Debug: {rendered}",
);
}
#[test]
fn split_namespace_ok() {
assert_eq!(
split_namespace("public/default").unwrap(),
("public", "default")
);
}
#[test]
fn split_namespace_rejects_missing_slash() {
assert!(matches!(
split_namespace("public"),
Err(AdminError::InvalidName(_))
));
}
#[test]
fn split_namespace_rejects_extra_segment() {
assert!(matches!(
split_namespace("public/default/extra"),
Err(AdminError::InvalidName(_))
));
}
#[test]
fn split_topic_with_scheme() {
let (t, n, name) = split_topic("persistent://acme/svc/orders").unwrap();
assert_eq!((t, n, name), ("acme", "svc", "orders"));
}
#[test]
fn split_topic_without_scheme() {
let (t, n, name) = split_topic("acme/svc/orders").unwrap();
assert_eq!((t, n, name), ("acme", "svc", "orders"));
}
#[test]
fn split_topic_rejects_short_name() {
assert!(matches!(
split_topic("acme/svc"),
Err(AdminError::InvalidName(_))
));
}
#[test]
fn split_function_id_ok() {
let (t, n, name) = split_function_id("public/default/my-fn").unwrap();
assert_eq!((t, n, name), ("public", "default", "my-fn"));
}
#[test]
fn split_function_id_rejects_short_name() {
assert!(matches!(
split_function_id("public/default"),
Err(AdminError::InvalidName(_))
));
}
#[test]
fn split_function_id_rejects_persistent_scheme() {
assert!(matches!(
split_function_id("persistent://acme/svc/fn"),
Err(AdminError::InvalidName(_))
));
}
#[test]
fn message_id_response_deserialises_java_camelcase() {
let json = r#"{"ledgerId":12345,"entryId":67890,"partitionIndex":0}"#;
let dto: MessageIdResponse = serde_json::from_str(json).unwrap();
let msg = dto.try_into_message_id().unwrap();
assert_eq!(msg.ledger_id, 12345);
assert_eq!(msg.entry_id, 67890);
assert_eq!(msg.partition, 0);
assert_eq!(msg.batch_index, -1);
assert_eq!(msg.batch_size, -1);
}
#[test]
fn message_id_response_defaults_partition_for_non_partitioned_topic() {
let json = r#"{"ledgerId":1,"entryId":2}"#;
let dto: MessageIdResponse = serde_json::from_str(json).unwrap();
assert_eq!(dto.try_into_message_id().unwrap().partition, -1);
}
#[test]
fn url_helper_emits_single_slash_after_admin_v2() {
let client = AdminClient::builder()
.service_url("http://broker.example:8080".parse().unwrap())
.build()
.unwrap();
let url = client.url(&["clusters"]).unwrap();
assert_eq!(url.as_str(), "http://broker.example:8080/admin/v2/clusters");
let url2 = client
.url(&["persistent", "public", "default", "topic", "stats"])
.unwrap();
assert_eq!(
url2.as_str(),
"http://broker.example:8080/admin/v2/persistent/public/default/topic/stats"
);
}
#[test]
fn url_helper_preserves_path_prefix_without_trailing_slash() {
let client = AdminClient::builder()
.service_url("http://broker.example:8080/pulsar".parse().unwrap())
.build()
.unwrap();
let url = client.url(&["clusters"]).unwrap();
assert_eq!(
url.as_str(),
"http://broker.example:8080/pulsar/admin/v2/clusters"
);
let url_v3 = client.url_v3(&["functions", "a", "b"]).unwrap();
assert_eq!(
url_v3.as_str(),
"http://broker.example:8080/pulsar/admin/v3/functions/a/b"
);
}
#[test]
fn split_topic_rejects_dot_segments() {
assert!(matches!(
split_topic("persistent://../foo/bar"),
Err(AdminError::InvalidName(_))
));
assert!(matches!(
split_topic("./foo/bar"),
Err(AdminError::InvalidName(_))
));
assert!(matches!(
split_topic("tenant/./topic"),
Err(AdminError::InvalidName(_))
));
}
#[test]
fn split_topic_rejects_control_bytes_and_percent_encoded_slash() {
assert!(matches!(
split_topic("tenant/ns/topic%2Fevil"),
Err(AdminError::InvalidName(_))
));
assert!(matches!(
split_topic("tenant/ns/top\0ic"),
Err(AdminError::InvalidName(_))
));
}
}