pub struct AdminClient { /* private fields */ }Expand description
Apache Pulsar admin REST client.
Holds two pre-computed base URLs: base_url anchored at
/admin/v2/ (clusters / tenants / namespaces / topics /
subscriptions / brokers / bookies / schemas) and base_url_v3
anchored at /admin/v3/ (Pulsar Functions / IO Sources / IO Sinks
/ Packages). The split mirrors Pulsar’s own routing — Java’s
PulsarAdmin keeps them separate too. Both URLs are derived from
the same broker root at builder time, so a caller never has to
know which version family an endpoint belongs to.
Implementations§
Source§impl AdminClient
impl AdminClient
Sourcepub fn builder() -> AdminClientBuilder
pub fn builder() -> AdminClientBuilder
Start building an admin client.
Sourcepub fn base_url(&self) -> &Url
pub fn base_url(&self) -> &Url
Return the base URL the client targets (with the trailing /admin/v2/
component already appended). Exposed for tests and diagnostics.
Sourcepub fn auth(&self) -> &AdminAuth
pub fn auth(&self) -> &AdminAuth
Return the configured auth strategy. Exposed for tests and diagnostics.
Sourcepub async fn cluster_list(&self) -> Result<Vec<String>, AdminError>
pub async fn cluster_list(&self) -> Result<Vec<String>, AdminError>
List clusters.
GET /admin/v2/clusters.
Java: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Clusters.java
(@Path("/clusters")) + admin/impl/ClustersBase.java#getClusters.
Sourcepub async fn cluster_failure_domains_list(
&self,
cluster: &str,
) -> Result<Value, AdminError>
pub async fn cluster_failure_domains_list( &self, cluster: &str, ) -> Result<Value, AdminError>
List failure-domains configured on a cluster.
GET /admin/v2/clusters/{cluster}/failureDomains. The broker returns
a Map<String, FailureDomain> keyed by domain name; each value
carries a brokers: Set<String> member. The map is exposed as a
raw serde_json::Value for forward-compat — broker minor versions
add fields.
Java: ClustersBase#getFailureDomains.
Sourcepub async fn cluster_failure_domain_get(
&self,
cluster: &str,
domain: &str,
) -> Result<Value, AdminError>
pub async fn cluster_failure_domain_get( &self, cluster: &str, domain: &str, ) -> Result<Value, AdminError>
Get one failure-domain by name.
GET /admin/v2/clusters/{cluster}/failureDomains/{domain}.
Java: ClustersBase#getDomain.
Sourcepub async fn namespace_isolation_policies_list(
&self,
cluster: &str,
) -> Result<Value, AdminError>
pub async fn namespace_isolation_policies_list( &self, cluster: &str, ) -> Result<Value, AdminError>
List namespace-isolation policies configured on a cluster.
GET /admin/v2/clusters/{cluster}/namespaceIsolationPolicies. The
broker returns a Map<String, NamespaceIsolationData> carrying
the namespace regex, primary/secondary broker lists, and the
auto-failover policy. Exposed as raw JSON for forward-compat.
A cluster with no isolation policies configured surfaces as
404 NamespaceIsolationPolicies for cluster ... does not exist
(Pulsar 4.0.x) rather than an empty map; we mirror the Java
client’s Map<String, _> surface by mapping that specific 404 to
an empty {} object. Other 404s (auth, wrong cluster) still
surface as AdminError::Status.
Java: ClustersBase#getNamespaceIsolationPolicies.
Sourcepub async fn brokers_list(
&self,
cluster: &str,
) -> Result<Vec<String>, AdminError>
pub async fn brokers_list( &self, cluster: &str, ) -> Result<Vec<String>, AdminError>
List active brokers in a cluster.
GET /admin/v2/brokers/{cluster}. Returns a list of host:port
strings — one entry per broker that’s currently registered with
the cluster’s metadata store. Java: BrokersBase#getActiveBrokers.
Sourcepub async fn brokers_leader(&self) -> Result<Value, AdminError>
pub async fn brokers_leader(&self) -> Result<Value, AdminError>
Get the current leader broker for the cluster.
GET /admin/v2/brokers/leaderBroker. Returns { serviceUrl, brokerId }. Exposed as raw JSON for forward-compat — newer
brokers add clusterName and similar fields.
Java: BrokersBase#getLeaderBroker.
Sourcepub async fn brokers_dynamic_config_keys(
&self,
) -> Result<Vec<String>, AdminError>
pub async fn brokers_dynamic_config_keys( &self, ) -> Result<Vec<String>, AdminError>
List the names of all dynamic-config keys the broker exposes.
GET /admin/v2/brokers/configuration. Returns the bare list of
ServiceConfiguration fields tagged @FieldContext(dynamic = true)
— the set of keys that brokers_set_dynamic_config accepts. Use
Self::brokers_dynamic_config_overrides for the current values.
Pulsar 4 normally returns a JSON array (List<String> from
BrokerService#getDynamicConfiguration), but some packaging /
proxy paths surface the underlying Map<String, ConfigField>
shape instead. We accept both — array → values, object → keys —
to stay version-tolerant. Java: BrokersBase#getDynamicConfigurationName.
Sourcepub async fn brokers_dynamic_config_overrides(
&self,
) -> Result<Value, AdminError>
pub async fn brokers_dynamic_config_overrides( &self, ) -> Result<Value, AdminError>
Get the currently-overridden dynamic configuration values.
GET /admin/v2/brokers/configuration/values. Returns a
Map<String, String> of every dynamic key the operator has set
(the broker omits keys still on their static / default value).
Exposed as raw JSON because broker minor versions add keys.
Java: BrokersBase#getAllDynamicConfigurations.
Sourcepub async fn brokers_runtime_config(&self) -> Result<Value, AdminError>
pub async fn brokers_runtime_config(&self) -> Result<Value, AdminError>
Get the broker’s runtime (merged static + dynamic) configuration.
GET /admin/v2/brokers/configuration/runtime. Returns the full
Map<String, String> of ServiceConfiguration values as they
currently apply on the broker process — static defaults
overlaid with any brokers_set_dynamic_config overrides. Raw
JSON for forward-compat. Java: BrokersBase#getRuntimeConfiguration.
Sourcepub async fn brokers_internal_config(&self) -> Result<Value, AdminError>
pub async fn brokers_internal_config(&self) -> Result<Value, AdminError>
Get the broker’s internal-stack endpoints.
GET /admin/v2/brokers/internal-configuration. Returns the
InternalConfigurationData envelope — metadata-store URLs
(zookeeperServers, configurationMetadataStoreUrl),
BookKeeper metadata service URI, ledger root paths. Raw JSON
for forward-compat; the shape rolls between releases as the
metadata layer evolves.
Java: BrokersBase#getInternalConfigurationData.
Sourcepub async fn brokers_health_check(&self) -> Result<String, AdminError>
pub async fn brokers_health_check(&self) -> Result<String, AdminError>
Probe broker health — produces and consumes one heartbeat message on an internal topic.
GET /admin/v2/brokers/health. The broker returns the plain-text
string "ok" on success; non-200 surfaces as AdminError::Status.
Java: BrokersBase#healthCheck.
Sourcepub async fn brokers_owned_namespaces(
&self,
cluster: &str,
broker: &str,
) -> Result<Value, AdminError>
pub async fn brokers_owned_namespaces( &self, cluster: &str, broker: &str, ) -> Result<Value, AdminError>
List the namespaces a specific broker currently owns.
GET /admin/v2/brokers/{cluster}/{broker}/ownedNamespaces. The
broker argument must be the broker’s host:port (matching the
strings Self::brokers_list returns). Returns a
Map<String, NamespaceOwnershipStatus> keyed by namespace name —
raw JSON for forward-compat.
Java: BrokersBase#getOwnedNamespaces.
Sourcepub async fn brokers_set_dynamic_config(
&self,
name: &str,
value: &str,
) -> Result<(), AdminError>
pub async fn brokers_set_dynamic_config( &self, name: &str, value: &str, ) -> Result<(), AdminError>
Override a dynamic broker configuration value.
POST /admin/v2/brokers/configuration/{name}/{value}. Both the
key and the value travel in the URL path — there is no request
body — matching the broker’s updateDynamicConfiguration(@PathParam String configName, @PathParam String configValue) signature.
The key must be one of those returned by
Self::brokers_dynamic_config_keys; unknown keys yield 412.
Java: BrokersBase#updateDynamicConfiguration.
Sourcepub async fn brokers_delete_dynamic_config(
&self,
name: &str,
) -> Result<(), AdminError>
pub async fn brokers_delete_dynamic_config( &self, name: &str, ) -> Result<(), AdminError>
Drop a dynamic configuration override, reverting to the static value.
DELETE /admin/v2/brokers/configuration/{name}. After the call
the key disappears from Self::brokers_dynamic_config_overrides
and Self::brokers_runtime_config reflects the underlying
static / default value again.
Java: BrokersBase#deleteDynamicConfiguration.
Sourcepub async fn bookies_list_all(&self) -> Result<Value, AdminError>
pub async fn bookies_list_all(&self) -> Result<Value, AdminError>
List every bookie the broker knows about — both writable and
read-only — as registered in BookKeeper metadata.
GET /admin/v2/bookies/all. Returns the broker’s
BookiesClusterInfo envelope — a bookies: [{ address: "host:port" }]
array. Raw JSON for forward-compat.
Java: BookiesBase#getAllAvailableBookies.
Sourcepub async fn bookies_racks_info(&self) -> Result<Value, AdminError>
pub async fn bookies_racks_info(&self) -> Result<Value, AdminError>
Get every bookie’s group + rack assignment, as configured for the rack-aware placement policy.
GET /admin/v2/bookies/racks-info. Returns the nested
Map<group, Map<bookieAddress, BookieInfo>> shape Pulsar
persists in metadata. Raw JSON because the wire shape exposes
nested maps that change between releases (the default group
is implicit on older brokers).
Java: BookiesBase#getBookieRackInfo.
Sourcepub async fn bookies_set_rack(
&self,
bookie: &str,
group: &str,
info: BookieInfo,
) -> Result<(), AdminError>
pub async fn bookies_set_rack( &self, bookie: &str, group: &str, info: BookieInfo, ) -> Result<(), AdminError>
Set (or update) a bookie’s rack assignment.
POST /admin/v2/bookies/racks-info/{bookie}?group={group} with
a JSON BookieInfo body carrying only {rack, hostname}.
bookie is the host:port registered in BookKeeper metadata.
The placement policy picks up the new rack on its next
reconciliation tick.
group is a query parameter, not a body field — Pulsar’s
BookiesBase#updateBookieRackInfo(@PathParam("bookie") String, @QueryParam("group") String, BookieInfo) Jackson-binds the body
to {rack, hostname} only; an unknown group body field is
silently ignored and the query param defaults to null,
dropping the operator’s group choice on the wire.
Java: BookiesBase#updateBookieRackInfo.
Sourcepub async fn bookies_delete_rack(&self, bookie: &str) -> Result<(), AdminError>
pub async fn bookies_delete_rack(&self, bookie: &str) -> Result<(), AdminError>
Remove a bookie’s rack assignment.
DELETE /admin/v2/bookies/racks-info/{bookie}. The bookie falls
back to the placement policy’s default group / rack until
Self::bookies_set_rack is called again.
Java: BookiesBase#deleteBookieRackInfo.
Sourcepub async fn schema_get_latest(&self, topic: &str) -> Result<Value, AdminError>
pub async fn schema_get_latest(&self, topic: &str) -> Result<Value, AdminError>
Get the latest schema attached to a topic.
GET /admin/v2/schemas/{tenant}/{ns}/{topic}/schema. Returns
{ version, type, schema, properties, timestamp }; raw JSON
because the type axis (AVRO / JSON / PROTOBUF /
PROTOBUF_NATIVE / KEY_VALUE / STRING / BYTES / …) is
open-ended and broker minor versions add keys (deletion
tombstones surface as type: "DELETE" on the GET, for
instance). Java: SchemasResourceBase#getSchema.
Sourcepub async fn schema_get_version(
&self,
topic: &str,
version: i64,
) -> Result<Value, AdminError>
pub async fn schema_get_version( &self, topic: &str, version: i64, ) -> Result<Value, AdminError>
Get a specific schema version attached to a topic.
GET /admin/v2/schemas/{tenant}/{ns}/{topic}/schema/{version}.
version is the monotonically-increasing integer the broker
assigns at registration. Same wire shape as
Self::schema_get_latest.
Java: SchemasResourceBase#getSchema (with version path param).
Sourcepub async fn schema_list_versions(
&self,
topic: &str,
) -> Result<Vec<Value>, AdminError>
pub async fn schema_list_versions( &self, topic: &str, ) -> Result<Vec<Value>, AdminError>
List every schema version registered for a topic.
GET /admin/v2/schemas/{tenant}/{ns}/{topic}/schemas. Pulsar 4
wraps the per-version entries in a
GetAllVersionsSchemaResponse { getSchemaResponses: [...] }
envelope (verified against apache/pulsar@v4.0.4
SchemasResourceBase#convertToAllVersionsSchemaResponse). We
unwrap that envelope at the boundary so callers see the flat
Vec<Value> they expect; a bare-array shape (older or
alternative serialisations) is still accepted. Raw JSON
per-entry for forward-compat. Java:
SchemasResourceBase#getAllSchemas.
Sourcepub async fn schema_post(
&self,
topic: &str,
payload: PostSchemaPayload,
) -> Result<Value, AdminError>
pub async fn schema_post( &self, topic: &str, payload: PostSchemaPayload, ) -> Result<Value, AdminError>
Register a new schema version on a topic.
POST /admin/v2/schemas/{tenant}/{ns}/{topic}/schema with a JSON
PostSchemaPayload body. The broker returns { version: N };
raw JSON because the upstream response envelope wraps the
version under data on some 4.x point releases. Compatibility
is enforced server-side per the namespace’s
schemaCompatibilityStrategy — incompatible posts fail with
409. Java: SchemasResourceBase#postSchema.
Sourcepub async fn schema_delete(
&self,
topic: &str,
force: bool,
) -> Result<(), AdminError>
pub async fn schema_delete( &self, topic: &str, force: bool, ) -> Result<(), AdminError>
Delete a topic’s schema.
DELETE /admin/v2/schemas/{tenant}/{ns}/{topic}/schema?force={force}.
force = true skips the broker’s “is the schema in use”
guard — equivalent to pulsar-admin schemas delete --force.
Java: SchemasResourceBase#deleteSchema.
Sourcepub async fn schema_compatibility_check(
&self,
topic: &str,
payload: PostSchemaPayload,
) -> Result<Value, AdminError>
pub async fn schema_compatibility_check( &self, topic: &str, payload: PostSchemaPayload, ) -> Result<Value, AdminError>
Check whether a candidate schema would be compatible with the topic’s current schema.
POST /admin/v2/schemas/{tenant}/{ns}/{topic}/compatibility with
a JSON PostSchemaPayload body — the same shape
Self::schema_post sends, but the broker only evaluates
compatibility and never persists. Returns { isCompatible: bool, schemaCompatibilityStrategy: "..." }; raw JSON for
forward-compat.
Java: SchemasResourceBase#testCompatibility.
Sourcepub async fn functions_list_by_namespace(
&self,
tenant: &str,
namespace: &str,
) -> Result<Vec<String>, AdminError>
pub async fn functions_list_by_namespace( &self, tenant: &str, namespace: &str, ) -> Result<Vec<String>, AdminError>
List every function registered under a namespace.
GET /admin/v3/functions/{tenant}/{namespace}. Returns a JSON
array of bare function names (no tenant / namespace prefix) —
matches Java’s FunctionsBase#listFunctions body shape.
Sourcepub async fn function_get(
&self,
tenant: &str,
namespace: &str,
name: &str,
) -> Result<Value, AdminError>
pub async fn function_get( &self, tenant: &str, namespace: &str, name: &str, ) -> Result<Value, AdminError>
Get a function’s registered FunctionConfig.
GET /admin/v3/functions/{tenant}/{namespace}/{name}. The
FunctionConfig Java type has ~30 fields and grows on every
minor release — return raw JSON for forward-compat. Java:
FunctionsBase#getFunctionInfo.
Sourcepub async fn function_status(
&self,
tenant: &str,
namespace: &str,
name: &str,
) -> Result<Value, AdminError>
pub async fn function_status( &self, tenant: &str, namespace: &str, name: &str, ) -> Result<Value, AdminError>
Get a function’s aggregate status (all instances).
GET /admin/v3/functions/{tenant}/{namespace}/{name}/status.
Returns Java’s FunctionStatus envelope:
{numInstances, numRunning, instances: [...]}. Raw JSON because
the per-instance shape carries broker-version-dependent fields.
Java: FunctionsBase#getFunctionStatus.
Sourcepub async fn function_stats(
&self,
tenant: &str,
namespace: &str,
name: &str,
) -> Result<Value, AdminError>
pub async fn function_stats( &self, tenant: &str, namespace: &str, name: &str, ) -> Result<Value, AdminError>
Get a function’s aggregate runtime statistics (all instances).
GET /admin/v3/functions/{tenant}/{namespace}/{name}/stats.
Returns Java’s FunctionStats envelope — message rates,
processed counts, average latency, per-instance breakdown. Raw
JSON for forward-compat. Java: FunctionsBase#getFunctionStats.
Sourcepub async fn function_instance_status(
&self,
tenant: &str,
namespace: &str,
name: &str,
instance_id: i32,
) -> Result<Value, AdminError>
pub async fn function_instance_status( &self, tenant: &str, namespace: &str, name: &str, instance_id: i32, ) -> Result<Value, AdminError>
Get one instance’s status.
GET /admin/v3/functions/{tenant}/{namespace}/{name}/{instance_id}/status.
instance_id is the integer index the broker assigns at
schedule time (0..parallelism). Java:
FunctionsBase#getFunctionInstanceStatus.
Sourcepub async fn function_instance_stats(
&self,
tenant: &str,
namespace: &str,
name: &str,
instance_id: i32,
) -> Result<Value, AdminError>
pub async fn function_instance_stats( &self, tenant: &str, namespace: &str, name: &str, instance_id: i32, ) -> Result<Value, AdminError>
Get one instance’s runtime statistics.
GET /admin/v3/functions/{tenant}/{namespace}/{name}/{instance_id}/stats.
Java: FunctionsBase#getFunctionInstanceStats.
Sourcepub async fn function_create_with_url(
&self,
tenant: &str,
namespace: &str,
name: &str,
url: &str,
config: FunctionConfig,
) -> Result<(), AdminError>
pub async fn function_create_with_url( &self, tenant: &str, namespace: &str, name: &str, url: &str, config: FunctionConfig, ) -> Result<(), AdminError>
Register a function from a remote package URL.
POST /admin/v3/functions/{tenant}/{namespace}/{name} with a
multipart/form-data body carrying two parts: url=<pkg-url>
(the broker-resolvable HTTP / function:// / file:// URL of
the compiled package) and functionConfig=<json> (the
serialised FunctionConfig). The local-file upload path
(Java’s FunctionsBase#registerFunction with a
FormDataMultiPart data part) is intentionally out of scope
for this method — operators with a pre-built JAR served over
HTTP / S3 / GCS use the URL path.
The two-part envelope matches Java’s
FunctionsBase#registerFunction(@PathParam tenant, ..., @FormDataParam("url") String functionPkgUrl, @FormDataParam("functionConfig") FunctionConfig functionConfig)
— when data is null and url is non-null the broker takes the
URL fast-path and skips the upload step.
Sourcepub async fn function_update_with_url(
&self,
tenant: &str,
namespace: &str,
name: &str,
url: &str,
config: FunctionConfig,
) -> Result<(), AdminError>
pub async fn function_update_with_url( &self, tenant: &str, namespace: &str, name: &str, url: &str, config: FunctionConfig, ) -> Result<(), AdminError>
Update an existing function from a remote package URL.
PUT /admin/v3/functions/{tenant}/{namespace}/{name} with the
same two-part multipart/form-data shape as
Self::function_create_with_url. Java:
FunctionsBase#updateFunction with non-null pkgUrl.
Sourcepub async fn function_delete(
&self,
tenant: &str,
namespace: &str,
name: &str,
) -> Result<(), AdminError>
pub async fn function_delete( &self, tenant: &str, namespace: &str, name: &str, ) -> Result<(), AdminError>
Deregister (delete) a function.
DELETE /admin/v3/functions/{tenant}/{namespace}/{name}. The
broker stops every running instance and drops the
FunctionConfig from metadata. Java:
FunctionsBase#deregisterFunction.
Sourcepub async fn function_start(
&self,
tenant: &str,
namespace: &str,
name: &str,
) -> Result<(), AdminError>
pub async fn function_start( &self, tenant: &str, namespace: &str, name: &str, ) -> Result<(), AdminError>
Start every instance of a function (idempotent).
POST /admin/v3/functions/{tenant}/{namespace}/{name}/start. No
body. Java: FunctionsBase#startFunction.
Sourcepub async fn function_stop(
&self,
tenant: &str,
namespace: &str,
name: &str,
) -> Result<(), AdminError>
pub async fn function_stop( &self, tenant: &str, namespace: &str, name: &str, ) -> Result<(), AdminError>
Stop every instance of a function.
POST /admin/v3/functions/{tenant}/{namespace}/{name}/stop. The
broker leaves the FunctionConfig in metadata; a subsequent
function_start brings it back. Java:
FunctionsBase#stopFunction.
Sourcepub async fn function_restart(
&self,
tenant: &str,
namespace: &str,
name: &str,
) -> Result<(), AdminError>
pub async fn function_restart( &self, tenant: &str, namespace: &str, name: &str, ) -> Result<(), AdminError>
Restart every instance of a function.
POST /admin/v3/functions/{tenant}/{namespace}/{name}/restart.
Java: FunctionsBase#restartFunction.
Sourcepub async fn function_start_instance(
&self,
tenant: &str,
namespace: &str,
name: &str,
instance_id: i32,
) -> Result<(), AdminError>
pub async fn function_start_instance( &self, tenant: &str, namespace: &str, name: &str, instance_id: i32, ) -> Result<(), AdminError>
Start one specific instance.
POST /admin/v3/functions/{tenant}/{namespace}/{name}/{instance_id}/start.
Java: FunctionsBase#startFunctionInstance.
Sourcepub async fn function_stop_instance(
&self,
tenant: &str,
namespace: &str,
name: &str,
instance_id: i32,
) -> Result<(), AdminError>
pub async fn function_stop_instance( &self, tenant: &str, namespace: &str, name: &str, instance_id: i32, ) -> Result<(), AdminError>
Stop one specific instance.
POST /admin/v3/functions/{tenant}/{namespace}/{name}/{instance_id}/stop.
Java: FunctionsBase#stopFunctionInstance.
Sourcepub async fn tenants_list(&self) -> Result<Vec<String>, AdminError>
pub async fn tenants_list(&self) -> Result<Vec<String>, AdminError>
List tenants.
GET /admin/v2/tenants.
Java: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Tenants.java
(@Path("/tenants")) + admin/impl/TenantsBase.java#getTenants.
Sourcepub async fn tenant_create(
&self,
name: &str,
info: TenantInfo,
) -> Result<(), AdminError>
pub async fn tenant_create( &self, name: &str, info: TenantInfo, ) -> Result<(), AdminError>
Create a tenant.
PUT /admin/v2/tenants/{tenant} with a JSON TenantInfo body.
Java: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantsBase.java# createTenant.
Sourcepub async fn tenant_delete(&self, name: &str) -> Result<(), AdminError>
pub async fn tenant_delete(&self, name: &str) -> Result<(), AdminError>
Delete a tenant.
DELETE /admin/v2/tenants/{tenant}.
Java: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantsBase.java# deleteTenant.
Sourcepub async fn namespaces_list(
&self,
tenant: &str,
) -> Result<Vec<String>, AdminError>
pub async fn namespaces_list( &self, tenant: &str, ) -> Result<Vec<String>, AdminError>
List namespaces under a tenant.
GET /admin/v2/namespaces/{tenant}.
Java: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
(@Path("/namespaces") + @Path("/{tenant}")).
Sourcepub async fn namespace_create(&self, ns: &str) -> Result<(), AdminError>
pub async fn namespace_create(&self, ns: &str) -> Result<(), AdminError>
Create a namespace.
PUT /admin/v2/namespaces/{tenant}/{namespace}. The namespace argument
is tenant/namespace, matching how Pulsar expresses fully qualified
namespace names on the wire and CLI.
Java: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
(@PUT @Path("/{tenant}/{namespace}")).
Sourcepub async fn namespace_delete(&self, ns: &str) -> Result<(), AdminError>
pub async fn namespace_delete(&self, ns: &str) -> Result<(), AdminError>
Delete a namespace.
DELETE /admin/v2/namespaces/{tenant}/{namespace}.
Java: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
(@DELETE @Path("/{tenant}/{namespace}")).
Sourcepub async fn namespace_get_retention(
&self,
ns: &str,
) -> Result<RetentionPolicies, AdminError>
pub async fn namespace_get_retention( &self, ns: &str, ) -> Result<RetentionPolicies, AdminError>
Get a namespace’s retention policy.
GET /admin/v2/namespaces/{tenant}/{ns}/retention.
Returns RetentionPolicies { retentionTimeInMinutes, retentionSizeInMB }.
A fresh namespace, or a namespace whose retention was just
removed, surfaces as 204 / empty body / null — we fold those
to RetentionPolicies::default() (broker semantic).
Java: NamespacesBase#getRetention.
Sourcepub async fn namespace_set_retention(
&self,
ns: &str,
policy: RetentionPolicies,
) -> Result<(), AdminError>
pub async fn namespace_set_retention( &self, ns: &str, policy: RetentionPolicies, ) -> Result<(), AdminError>
Set a namespace’s retention policy.
POST /admin/v2/namespaces/{tenant}/{ns}/retention with a JSON
RetentionPolicies body. -1 means infinite (size or time).
Java: NamespacesBase#setRetention.
Sourcepub async fn namespace_remove_retention(
&self,
ns: &str,
) -> Result<(), AdminError>
pub async fn namespace_remove_retention( &self, ns: &str, ) -> Result<(), AdminError>
Remove a namespace’s retention policy (fall back to broker default).
DELETE /admin/v2/namespaces/{tenant}/{ns}/retention.
Java: NamespacesBase#removeRetention.
Sourcepub async fn namespace_get_backlog_quotas(
&self,
ns: &str,
) -> Result<Value, AdminError>
pub async fn namespace_get_backlog_quotas( &self, ns: &str, ) -> Result<Value, AdminError>
Get all backlog-quota policies on a namespace.
GET /admin/v2/namespaces/{tenant}/{ns}/backlogQuotaMap. Returns
Map<BacklogQuotaType, BacklogQuota> — kept as raw JSON because
broker versions add quota types (message_age since 2.10).
Java: NamespacesBase#getBacklogQuotaMap.
Sourcepub async fn namespace_set_backlog_quota(
&self,
ns: &str,
backlog_quota_type: BacklogQuotaType,
quota: BacklogQuota,
) -> Result<(), AdminError>
pub async fn namespace_set_backlog_quota( &self, ns: &str, backlog_quota_type: BacklogQuotaType, quota: BacklogQuota, ) -> Result<(), AdminError>
Set a backlog-quota policy on a namespace.
POST /admin/v2/namespaces/{tenant}/{ns}/backlogQuota?backlogQuotaType={type}
with a JSON BacklogQuota body. backlog_quota_type selects which
dimension to limit (destination_storage for byte size, message_age
for wall-clock TTL).
Java: NamespacesBase#setBacklogQuota.
Sourcepub async fn namespace_remove_backlog_quota(
&self,
ns: &str,
backlog_quota_type: BacklogQuotaType,
) -> Result<(), AdminError>
pub async fn namespace_remove_backlog_quota( &self, ns: &str, backlog_quota_type: BacklogQuotaType, ) -> Result<(), AdminError>
Remove a backlog-quota policy from a namespace.
DELETE /admin/v2/namespaces/{tenant}/{ns}/backlogQuota?backlogQuotaType={type}.
Java: NamespacesBase#removeBacklogQuota.
Sourcepub async fn namespace_get_message_ttl(
&self,
ns: &str,
) -> Result<Option<i32>, AdminError>
pub async fn namespace_get_message_ttl( &self, ns: &str, ) -> Result<Option<i32>, AdminError>
Get a namespace’s message-TTL (seconds).
GET /admin/v2/namespaces/{tenant}/{ns}/messageTTL. Returns a
bare integer (or null if no TTL is set — which decodes as
Option::None).
Java: NamespacesBase#getNamespaceMessageTTL.
Sourcepub async fn namespace_set_message_ttl(
&self,
ns: &str,
ttl_seconds: i32,
) -> Result<(), AdminError>
pub async fn namespace_set_message_ttl( &self, ns: &str, ttl_seconds: i32, ) -> Result<(), AdminError>
Set a namespace’s message-TTL (seconds).
POST /admin/v2/namespaces/{tenant}/{ns}/messageTTL with a bare
integer body. 0 disables (broker treats as no TTL).
Java: NamespacesBase#setNamespaceMessageTTL.
Sourcepub async fn namespace_remove_message_ttl(
&self,
ns: &str,
) -> Result<(), AdminError>
pub async fn namespace_remove_message_ttl( &self, ns: &str, ) -> Result<(), AdminError>
Remove a namespace’s message-TTL (fall back to broker default).
DELETE /admin/v2/namespaces/{tenant}/{ns}/messageTTL.
Java: NamespacesBase#removeNamespaceMessageTTL.
Sourcepub async fn namespace_get_persistence(
&self,
ns: &str,
) -> Result<PersistencePolicies, AdminError>
pub async fn namespace_get_persistence( &self, ns: &str, ) -> Result<PersistencePolicies, AdminError>
Get a namespace’s persistence policy.
GET /admin/v2/namespaces/{tenant}/{ns}/persistence. Returns the
BookKeeper ensemble / write-quorum / ack-quorum triple plus the
managed-ledger mark-delete rate cap. null body decodes to
PersistencePolicies::default() via #[serde(default)].
Java: NamespacesBase#getPersistence.
Sourcepub async fn namespace_set_persistence(
&self,
ns: &str,
policy: PersistencePolicies,
) -> Result<(), AdminError>
pub async fn namespace_set_persistence( &self, ns: &str, policy: PersistencePolicies, ) -> Result<(), AdminError>
Set a namespace’s persistence policy.
POST /admin/v2/namespaces/{tenant}/{ns}/persistence with a JSON
PersistencePolicies body.
Java: NamespacesBase#setPersistence.
Sourcepub async fn namespace_remove_persistence(
&self,
ns: &str,
) -> Result<(), AdminError>
pub async fn namespace_remove_persistence( &self, ns: &str, ) -> Result<(), AdminError>
Remove a namespace’s persistence policy (fall back to broker default).
DELETE /admin/v2/namespaces/{tenant}/{ns}/persistence.
Java: NamespacesBase#deletePersistence.
Sourcepub async fn namespace_get_dispatch_rate(
&self,
ns: &str,
) -> Result<DispatchRate, AdminError>
pub async fn namespace_get_dispatch_rate( &self, ns: &str, ) -> Result<DispatchRate, AdminError>
Get a namespace’s consumer dispatch-rate policy.
GET /admin/v2/namespaces/{tenant}/{ns}/dispatchRate. Returns
the per-namespace consumer-dispatch throttle (msg/sec, byte/sec,
window in seconds). -1 on either dimension means unlimited.
Java: NamespacesBase#getDispatchRate.
Sourcepub async fn namespace_set_dispatch_rate(
&self,
ns: &str,
rate: DispatchRate,
) -> Result<(), AdminError>
pub async fn namespace_set_dispatch_rate( &self, ns: &str, rate: DispatchRate, ) -> Result<(), AdminError>
Set a namespace’s consumer dispatch-rate policy.
POST /admin/v2/namespaces/{tenant}/{ns}/dispatchRate with a
JSON DispatchRate body.
Java: NamespacesBase#setDispatchRate.
Sourcepub async fn namespace_remove_dispatch_rate(
&self,
ns: &str,
) -> Result<(), AdminError>
pub async fn namespace_remove_dispatch_rate( &self, ns: &str, ) -> Result<(), AdminError>
Remove a namespace’s consumer dispatch-rate policy.
DELETE /admin/v2/namespaces/{tenant}/{ns}/dispatchRate.
Java: NamespacesBase#deleteDispatchRate.
Sourcepub async fn namespace_get_subscription_dispatch_rate(
&self,
ns: &str,
) -> Result<DispatchRate, AdminError>
pub async fn namespace_get_subscription_dispatch_rate( &self, ns: &str, ) -> Result<DispatchRate, AdminError>
Get a namespace’s per-subscription dispatch-rate policy.
GET /admin/v2/namespaces/{tenant}/{ns}/subscriptionDispatchRate.
Reuses the DispatchRate body shape — the policy applies per
subscription rather than aggregated across all consumers.
Java: NamespacesBase#getSubscriptionDispatchRate.
Sourcepub async fn namespace_set_subscription_dispatch_rate(
&self,
ns: &str,
rate: DispatchRate,
) -> Result<(), AdminError>
pub async fn namespace_set_subscription_dispatch_rate( &self, ns: &str, rate: DispatchRate, ) -> Result<(), AdminError>
Set a namespace’s per-subscription dispatch-rate policy.
POST /admin/v2/namespaces/{tenant}/{ns}/subscriptionDispatchRate
with a JSON DispatchRate body.
Java: NamespacesBase#setSubscriptionDispatchRate.
Sourcepub async fn namespace_remove_subscription_dispatch_rate(
&self,
ns: &str,
) -> Result<(), AdminError>
pub async fn namespace_remove_subscription_dispatch_rate( &self, ns: &str, ) -> Result<(), AdminError>
Remove a namespace’s per-subscription dispatch-rate policy.
DELETE /admin/v2/namespaces/{tenant}/{ns}/subscriptionDispatchRate.
Java: NamespacesBase#deleteSubscriptionDispatchRate.
Sourcepub async fn namespace_get_replicator_dispatch_rate(
&self,
ns: &str,
) -> Result<DispatchRate, AdminError>
pub async fn namespace_get_replicator_dispatch_rate( &self, ns: &str, ) -> Result<DispatchRate, AdminError>
Get a namespace’s cross-cluster replicator dispatch-rate policy.
GET /admin/v2/namespaces/{tenant}/{ns}/replicatorDispatchRate.
Reuses the DispatchRate body shape — the policy throttles
outbound geo-replication traffic from this cluster.
Java: NamespacesBase#getReplicatorDispatchRate.
Sourcepub async fn namespace_set_replicator_dispatch_rate(
&self,
ns: &str,
rate: DispatchRate,
) -> Result<(), AdminError>
pub async fn namespace_set_replicator_dispatch_rate( &self, ns: &str, rate: DispatchRate, ) -> Result<(), AdminError>
Set a namespace’s cross-cluster replicator dispatch-rate policy.
POST /admin/v2/namespaces/{tenant}/{ns}/replicatorDispatchRate
with a JSON DispatchRate body.
Java: NamespacesBase#setReplicatorDispatchRate.
Sourcepub async fn namespace_remove_replicator_dispatch_rate(
&self,
ns: &str,
) -> Result<(), AdminError>
pub async fn namespace_remove_replicator_dispatch_rate( &self, ns: &str, ) -> Result<(), AdminError>
Remove a namespace’s cross-cluster replicator dispatch-rate policy.
DELETE /admin/v2/namespaces/{tenant}/{ns}/replicatorDispatchRate.
Java: NamespacesBase#removeReplicatorDispatchRate.
Sourcepub async fn namespace_get_publish_rate(
&self,
ns: &str,
) -> Result<PublishRate, AdminError>
pub async fn namespace_get_publish_rate( &self, ns: &str, ) -> Result<PublishRate, AdminError>
Get a namespace’s publish-rate policy.
GET /admin/v2/namespaces/{tenant}/{ns}/publishRate. Returns
the producer-side throttle (msg/sec + byte/sec). -1 on either
dimension means unlimited.
Java: NamespacesBase#getPublishRate.
Sourcepub async fn namespace_set_publish_rate(
&self,
ns: &str,
rate: PublishRate,
) -> Result<(), AdminError>
pub async fn namespace_set_publish_rate( &self, ns: &str, rate: PublishRate, ) -> Result<(), AdminError>
Set a namespace’s publish-rate policy.
POST /admin/v2/namespaces/{tenant}/{ns}/publishRate with a JSON
PublishRate body.
Java: NamespacesBase#setPublishRate.
Sourcepub async fn namespace_remove_publish_rate(
&self,
ns: &str,
) -> Result<(), AdminError>
pub async fn namespace_remove_publish_rate( &self, ns: &str, ) -> Result<(), AdminError>
Remove a namespace’s publish-rate policy.
DELETE /admin/v2/namespaces/{tenant}/{ns}/publishRate.
Java: NamespacesBase#removePublishRate.
Sourcepub async fn namespace_get_deduplication(
&self,
ns: &str,
) -> Result<Option<bool>, AdminError>
pub async fn namespace_get_deduplication( &self, ns: &str, ) -> Result<Option<bool>, AdminError>
Get a namespace’s broker-side message deduplication flag.
GET /admin/v2/namespaces/{tenant}/{ns}/deduplication. Returns a
bare JSON boolean, or null (decoded as None) when the policy
is unset and the broker default applies.
Java: NamespacesBase#getDeduplication.
Sourcepub async fn namespace_set_deduplication(
&self,
ns: &str,
enabled: bool,
) -> Result<(), AdminError>
pub async fn namespace_set_deduplication( &self, ns: &str, enabled: bool, ) -> Result<(), AdminError>
Set a namespace’s broker-side message deduplication flag.
POST /admin/v2/namespaces/{tenant}/{ns}/deduplication with a
bare JSON boolean body.
Java: NamespacesBase#modifyDeduplication.
Sourcepub async fn namespace_remove_deduplication(
&self,
ns: &str,
) -> Result<(), AdminError>
pub async fn namespace_remove_deduplication( &self, ns: &str, ) -> Result<(), AdminError>
Remove a namespace’s deduplication flag (fall back to broker default).
DELETE /admin/v2/namespaces/{tenant}/{ns}/deduplication.
Java: NamespacesBase#removeDeduplication.
Sourcepub async fn namespace_get_deduplication_snapshot_interval(
&self,
ns: &str,
) -> Result<Option<i32>, AdminError>
pub async fn namespace_get_deduplication_snapshot_interval( &self, ns: &str, ) -> Result<Option<i32>, AdminError>
Get a namespace’s deduplication-snapshot interval (entries).
GET /admin/v2/namespaces/{tenant}/{ns}/deduplicationSnapshotInterval.
Returns a bare integer (the entry count between dedup cursor
snapshots), or null (decoded as None) when the broker default
applies.
Java: NamespacesBase#getDeduplicationSnapshotInterval.
Sourcepub async fn namespace_set_deduplication_snapshot_interval(
&self,
ns: &str,
interval_entries: i32,
) -> Result<(), AdminError>
pub async fn namespace_set_deduplication_snapshot_interval( &self, ns: &str, interval_entries: i32, ) -> Result<(), AdminError>
Set a namespace’s deduplication-snapshot interval (entries).
POST /admin/v2/namespaces/{tenant}/{ns}/deduplicationSnapshotInterval
with a bare JSON integer body.
Java: NamespacesBase#setDeduplicationSnapshotInterval.
Sourcepub async fn namespace_remove_deduplication_snapshot_interval(
&self,
ns: &str,
) -> Result<(), AdminError>
pub async fn namespace_remove_deduplication_snapshot_interval( &self, ns: &str, ) -> Result<(), AdminError>
Remove a namespace’s deduplication-snapshot interval override.
DELETE /admin/v2/namespaces/{tenant}/{ns}/deduplicationSnapshotInterval.
Java: NamespacesBase#deleteDeduplicationSnapshotInterval.
Sourcepub async fn namespace_get_compaction_threshold(
&self,
ns: &str,
) -> Result<Option<i64>, AdminError>
pub async fn namespace_get_compaction_threshold( &self, ns: &str, ) -> Result<Option<i64>, AdminError>
Get a namespace’s compaction threshold (bytes).
GET /admin/v2/namespaces/{tenant}/{ns}/compactionThreshold. Returns
a bare integer (bytes of accumulated topic backlog above which the
broker triggers automatic compaction), or null (decoded as None)
when the broker default applies.
Java: NamespacesBase#getCompactionThreshold.
Sourcepub async fn namespace_set_compaction_threshold(
&self,
ns: &str,
threshold_bytes: i64,
) -> Result<(), AdminError>
pub async fn namespace_set_compaction_threshold( &self, ns: &str, threshold_bytes: i64, ) -> Result<(), AdminError>
Set a namespace’s compaction threshold (bytes).
PUT /admin/v2/namespaces/{tenant}/{ns}/compactionThreshold with
a bare JSON long body. 0 disables automatic compaction.
Note: this endpoint is @PUT in Pulsar 4 (Namespaces.java
declares @PUT @Path("/{tenant}/{namespace}/compactionThreshold")),
not POST — using POST yields a 405 Method Not Allowed.
Java: NamespacesBase#setCompactionThreshold.
Sourcepub async fn namespace_remove_compaction_threshold(
&self,
ns: &str,
) -> Result<(), AdminError>
pub async fn namespace_remove_compaction_threshold( &self, ns: &str, ) -> Result<(), AdminError>
Remove a namespace’s compaction threshold override.
DELETE /admin/v2/namespaces/{tenant}/{ns}/compactionThreshold.
Java: NamespacesBase#deleteCompactionThreshold.
Sourcepub async fn namespace_get_delayed_delivery(
&self,
ns: &str,
) -> Result<Option<DelayedDeliveryPolicies>, AdminError>
pub async fn namespace_get_delayed_delivery( &self, ns: &str, ) -> Result<Option<DelayedDeliveryPolicies>, AdminError>
Get a namespace’s delayed-delivery policy.
GET /admin/v2/namespaces/{tenant}/{ns}/delayedDelivery. Returns
the active flag + tick time (the broker’s index-tick granularity
for delivering delayed messages). null decodes as None.
Java: NamespacesBase#getDelayedDeliveryPolicies.
Sourcepub async fn namespace_set_delayed_delivery(
&self,
ns: &str,
policy: DelayedDeliveryPolicies,
) -> Result<(), AdminError>
pub async fn namespace_set_delayed_delivery( &self, ns: &str, policy: DelayedDeliveryPolicies, ) -> Result<(), AdminError>
Set a namespace’s delayed-delivery policy.
POST /admin/v2/namespaces/{tenant}/{ns}/delayedDelivery with a
JSON DelayedDeliveryPolicies body.
Java: NamespacesBase#setDelayedDeliveryPolicies.
Sourcepub async fn namespace_remove_delayed_delivery(
&self,
ns: &str,
) -> Result<(), AdminError>
pub async fn namespace_remove_delayed_delivery( &self, ns: &str, ) -> Result<(), AdminError>
Remove a namespace’s delayed-delivery policy override.
DELETE /admin/v2/namespaces/{tenant}/{ns}/delayedDelivery.
Java: NamespacesBase#removeDelayedDeliveryPolicies.
Sourcepub async fn namespace_get_max_producers_per_topic(
&self,
ns: &str,
) -> Result<Option<i32>, AdminError>
pub async fn namespace_get_max_producers_per_topic( &self, ns: &str, ) -> Result<Option<i32>, AdminError>
Get a namespace’s max-producers-per-topic limit.
GET /admin/v2/namespaces/{tenant}/{ns}/maxProducersPerTopic. Returns
a bare integer (the per-topic ceiling on concurrent producer
connections), or null (decoded as None) when the broker default
applies.
Java: NamespacesBase#getMaxProducersPerTopic.
Sourcepub async fn namespace_set_max_producers_per_topic(
&self,
ns: &str,
max_producers: i32,
) -> Result<(), AdminError>
pub async fn namespace_set_max_producers_per_topic( &self, ns: &str, max_producers: i32, ) -> Result<(), AdminError>
Set a namespace’s max-producers-per-topic limit.
POST /admin/v2/namespaces/{tenant}/{ns}/maxProducersPerTopic with
a bare JSON integer body. 0 disables the limit.
Java: NamespacesBase#setMaxProducersPerTopic.
Sourcepub async fn namespace_remove_max_producers_per_topic(
&self,
ns: &str,
) -> Result<(), AdminError>
pub async fn namespace_remove_max_producers_per_topic( &self, ns: &str, ) -> Result<(), AdminError>
Remove a namespace’s max-producers-per-topic limit override.
DELETE /admin/v2/namespaces/{tenant}/{ns}/maxProducersPerTopic.
Java: NamespacesBase#removeMaxProducersPerTopic.
Sourcepub async fn namespace_get_max_consumers_per_topic(
&self,
ns: &str,
) -> Result<Option<i32>, AdminError>
pub async fn namespace_get_max_consumers_per_topic( &self, ns: &str, ) -> Result<Option<i32>, AdminError>
Get a namespace’s max-consumers-per-topic limit.
GET /admin/v2/namespaces/{tenant}/{ns}/maxConsumersPerTopic. Returns
a bare integer (the per-topic ceiling on concurrent consumer
connections across all subscriptions), or null (decoded as None)
when the broker default applies.
Java: NamespacesBase#getMaxConsumersPerTopic.
Sourcepub async fn namespace_set_max_consumers_per_topic(
&self,
ns: &str,
max_consumers: i32,
) -> Result<(), AdminError>
pub async fn namespace_set_max_consumers_per_topic( &self, ns: &str, max_consumers: i32, ) -> Result<(), AdminError>
Set a namespace’s max-consumers-per-topic limit.
POST /admin/v2/namespaces/{tenant}/{ns}/maxConsumersPerTopic with
a bare JSON integer body. 0 disables the limit.
Java: NamespacesBase#setMaxConsumersPerTopic.
Sourcepub async fn namespace_remove_max_consumers_per_topic(
&self,
ns: &str,
) -> Result<(), AdminError>
pub async fn namespace_remove_max_consumers_per_topic( &self, ns: &str, ) -> Result<(), AdminError>
Remove a namespace’s max-consumers-per-topic limit override.
DELETE /admin/v2/namespaces/{tenant}/{ns}/maxConsumersPerTopic.
Java: NamespacesBase#removeMaxConsumersPerTopic.
Sourcepub async fn namespace_get_max_unacked_messages_per_consumer(
&self,
ns: &str,
) -> Result<Option<i32>, AdminError>
pub async fn namespace_get_max_unacked_messages_per_consumer( &self, ns: &str, ) -> Result<Option<i32>, AdminError>
Get a namespace’s max-unacked-messages-per-consumer limit.
GET /admin/v2/namespaces/{tenant}/{ns}/maxUnackedMessagesPerConsumer.
Returns a bare integer (the broker’s per-consumer permit-pool cap
before it stops dispatching), or null (decoded as None) when
the broker default applies.
Java: NamespacesBase#getMaxUnackedMessagesPerConsumer.
Sourcepub async fn namespace_set_max_unacked_messages_per_consumer(
&self,
ns: &str,
max_unacked: i32,
) -> Result<(), AdminError>
pub async fn namespace_set_max_unacked_messages_per_consumer( &self, ns: &str, max_unacked: i32, ) -> Result<(), AdminError>
Set a namespace’s max-unacked-messages-per-consumer limit.
POST /admin/v2/namespaces/{tenant}/{ns}/maxUnackedMessagesPerConsumer
with a bare JSON integer body. 0 disables the limit.
Java: NamespacesBase#setMaxUnackedMessagesPerConsumer.
Sourcepub async fn namespace_remove_max_unacked_messages_per_consumer(
&self,
ns: &str,
) -> Result<(), AdminError>
pub async fn namespace_remove_max_unacked_messages_per_consumer( &self, ns: &str, ) -> Result<(), AdminError>
Remove a namespace’s max-unacked-messages-per-consumer override.
DELETE /admin/v2/namespaces/{tenant}/{ns}/maxUnackedMessagesPerConsumer.
Java: NamespacesBase#removeMaxUnackedMessagesPerConsumer.
Sourcepub async fn namespace_get_max_unacked_messages_per_subscription(
&self,
ns: &str,
) -> Result<Option<i32>, AdminError>
pub async fn namespace_get_max_unacked_messages_per_subscription( &self, ns: &str, ) -> Result<Option<i32>, AdminError>
Get a namespace’s max-unacked-messages-per-subscription limit.
GET /admin/v2/namespaces/{tenant}/{ns}/maxUnackedMessagesPerSubscription.
Returns a bare integer (the broker’s per-subscription unacked
ceiling — once exceeded the broker stops dispatching to every
consumer on that subscription), or null (decoded as None)
when the broker default applies.
Java: NamespacesBase#getMaxUnackedMessagesPerSubscription.
Sourcepub async fn namespace_set_max_unacked_messages_per_subscription(
&self,
ns: &str,
max_unacked: i32,
) -> Result<(), AdminError>
pub async fn namespace_set_max_unacked_messages_per_subscription( &self, ns: &str, max_unacked: i32, ) -> Result<(), AdminError>
Set a namespace’s max-unacked-messages-per-subscription limit.
POST /admin/v2/namespaces/{tenant}/{ns}/maxUnackedMessagesPerSubscription
with a bare JSON integer body. 0 disables the limit.
Java: NamespacesBase#setMaxUnackedMessagesPerSubscription.
Sourcepub async fn namespace_remove_max_unacked_messages_per_subscription(
&self,
ns: &str,
) -> Result<(), AdminError>
pub async fn namespace_remove_max_unacked_messages_per_subscription( &self, ns: &str, ) -> Result<(), AdminError>
Remove a namespace’s max-unacked-messages-per-subscription override.
DELETE /admin/v2/namespaces/{tenant}/{ns}/maxUnackedMessagesPerSubscription.
Java: NamespacesBase#removeMaxUnackedMessagesPerSubscription.
Sourcepub async fn topics_list(
&self,
namespace: &str,
) -> Result<Vec<String>, AdminError>
pub async fn topics_list( &self, namespace: &str, ) -> Result<Vec<String>, AdminError>
List persistent topics in a namespace.
GET /admin/v2/persistent/{tenant}/{namespace}.
Java: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
(@Path("/persistent") + @GET @Path("/{tenant}/{namespace}")).
Sourcepub async fn topic_create_non_partitioned(
&self,
topic: &str,
) -> Result<(), AdminError>
pub async fn topic_create_non_partitioned( &self, topic: &str, ) -> Result<(), AdminError>
Create a non-partitioned persistent topic.
PUT /admin/v2/persistent/{tenant}/{namespace}/{topic}.
Java: PersistentTopics.java#createNonPartitionedTopic
(@PUT @Path("/{tenant}/{namespace}/{topic}")).
Sourcepub async fn topic_create_partitioned(
&self,
topic: &str,
partitions: u32,
) -> Result<(), AdminError>
pub async fn topic_create_partitioned( &self, topic: &str, partitions: u32, ) -> Result<(), AdminError>
Create a partitioned topic with partitions partitions.
PUT /admin/v2/persistent/{tenant}/{namespace}/{topic}/partitions
with the partition count as a JSON integer body.
Java: PersistentTopics.java#createPartitionedTopic
(@PUT @Path("/{tenant}/{namespace}/{topic}/partitions")).
Sourcepub async fn topic_delete(
&self,
topic: &str,
force: bool,
) -> Result<(), AdminError>
pub async fn topic_delete( &self, topic: &str, force: bool, ) -> Result<(), AdminError>
Delete a topic, auto-detecting partitioned vs non-partitioned.
Pulsar exposes two distinct delete endpoints — the partitioned
parent uses DELETE .../{topic}/partitions?force=… and the
non-partitioned topic uses DELETE .../{topic}?force=…. Hitting
the partitioned endpoint on a non-partitioned topic returns 404
(“Topic is not partitioned”), which used to surface as “the
topic doesn’t exist” to operators using magnetar admin topics delete.
Probe via topic_partitions_count first (GET .../partitions
returns partitions: 0 for non-partitioned topics, > 0 for a
partitioned parent) and route to the matching endpoint. Same
behaviour as pulsarctl’s topics delete.
Java: PersistentTopics.java#deletePartitionedTopic /
PersistentTopics.java#deleteTopic.
Sourcepub async fn topic_stats(&self, topic: &str) -> Result<TopicStats, AdminError>
pub async fn topic_stats(&self, topic: &str) -> Result<TopicStats, AdminError>
Get topic stats.
GET /admin/v2/persistent/{tenant}/{namespace}/{topic}/stats.
Java: PersistentTopics.java#getStats
(@GET @Path("/{tenant}/{namespace}/{topic}/stats"),
response shape PersistentTopicStats).
For a partitioned topic, the broker returns 404 on this endpoint
because there is no ledger backing the parent name. Call
Self::topic_partitioned_stats instead, or look up the count via
Self::topic_partitions_count first.
Sourcepub async fn topic_partitioned_stats(
&self,
topic: &str,
) -> Result<TopicStats, AdminError>
pub async fn topic_partitioned_stats( &self, topic: &str, ) -> Result<TopicStats, AdminError>
Get aggregated stats for a partitioned topic.
GET /admin/v2/persistent/{tenant}/{namespace}/{topic}/partitioned-stats? perPartition=false. Java: PersistentTopics.java#getPartitionedStats
(@GET @Path("/{tenant}/{namespace}/{topic}/partitioned-stats"),
response shape PartitionedTopicStats which extends
PersistentTopicStats with partitions: Map<String, TopicStats>
and metadata: PartitionedTopicMetadata).
magnetar exposes only the aggregated top-level counters through the
same TopicStats shape — the broker populates msgInCounter,
bytesInCounter, publishers, subscriptions at the response root
summed across partitions. The partitions and metadata fields are
dropped on deserialisation; for per-partition detail call
Self::topic_stats on each <topic>-partition-N instead. We pass
perPartition=false to keep the wire response small.
Sourcepub async fn topic_partitions_count(
&self,
topic: &str,
) -> Result<u32, AdminError>
pub async fn topic_partitions_count( &self, topic: &str, ) -> Result<u32, AdminError>
Resolve the partition count of a topic.
GET /admin/v2/persistent/{tenant}/{namespace}/{topic}/partitions.
Java: PersistentTopics.java#getPartitionedMetadata
(@GET @Path("/{tenant}/{namespace}/{topic}/partitions"),
response shape PartitionedTopicMetadata{ partitions: int }).
Returns 0 for non-partitioned topics; lets a caller disambiguate
between Self::topic_stats and Self::topic_partitioned_stats
when the topology is not known in advance.
Sourcepub async fn topic_get_message_id_by_index(
&self,
topic: &str,
index: i64,
) -> Result<MessageId, AdminError>
pub async fn topic_get_message_id_by_index( &self, topic: &str, index: i64, ) -> Result<MessageId, AdminError>
Resolve a broker-entry-metadata index to a MessageId (PIP-415).
GET /admin/v2/persistent/{tenant}/{namespace}/{topic}/getMessageIdByIndex?index={index}.
Per PIP-415
this is REST-only — the spec’s “Binary protocol” section is
intentionally empty and the canonical implementation PR
apache/pulsar#24222
(merged 2025-06-23) touches only admin / broker / CLI Java code.
Java:
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
(@GET @Path("/{tenant}/{namespace}/{topic}/getMessageIdByIndex"),
@QueryParam("index") long); admin-client side is
pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ TopicsImpl.java#getMessageIdByIndexAsync which deserialises the
response into MessageIdImpl (i.e. {ledgerId, entryId, partitionIndex}).
topic follows the same rule as every other topic-scoped method:
either persistent://tenant/ns/topic or tenant/ns/topic. For a
partitioned topic, pass the specific partition (my-topic-partition-0).
The response carries only (ledgerId, entryId, partitionIndex). The
returned MessageId sets batch_index = -1 and batch_size = -1
because the broker resolves at entry granularity — see PIP-415 §“Why
Precise Index Matching Isn’t Implemented on the Broker Side”.
Sourcepub async fn topic_compact(&self, topic: &str) -> Result<(), AdminError>
pub async fn topic_compact(&self, topic: &str) -> Result<(), AdminError>
Trigger ledger compaction for a topic.
PUT /admin/v2/persistent/{tenant}/{namespace}/{topic}/compaction.
Returns 204 on success; the broker queues the work asynchronously —
poll Self::topic_compaction_status to observe progress.
Java: PersistentTopics#triggerCompaction.
Sourcepub async fn topic_compaction_status(
&self,
topic: &str,
) -> Result<LongRunningProcessStatus, AdminError>
pub async fn topic_compaction_status( &self, topic: &str, ) -> Result<LongRunningProcessStatus, AdminError>
Get the current compaction status for a topic.
GET /admin/v2/persistent/{tenant}/{namespace}/{topic}/compaction.
Returns Java’s LongRunningProcessStatus: status ∈ {NOT_RUN,
RUNNING, SUCCESS, ERROR} plus an optional lastError string.
Java: PersistentTopics#compactionStatus.
Sourcepub async fn topic_unload(&self, topic: &str) -> Result<(), AdminError>
pub async fn topic_unload(&self, topic: &str) -> Result<(), AdminError>
Unload a topic from its current broker (forces rebalancing).
PUT /admin/v2/persistent/{tenant}/{namespace}/{topic}/unload.
Operators use this to drain a hot broker or to re-elect ownership
after a configuration change. Java: PersistentTopics#unloadTopic.
Sourcepub async fn topic_terminate(
&self,
topic: &str,
) -> Result<Option<MessageId>, AdminError>
pub async fn topic_terminate( &self, topic: &str, ) -> Result<Option<MessageId>, AdminError>
Terminate (seal) a topic — no further produces succeed.
POST /admin/v2/persistent/{tenant}/{namespace}/{topic}/terminate.
Returns the MessageId of the last message that landed before
the seal, or None when the broker reports the
MessageIdImpl(-1, -1) sentinel — meaning the topic was sealed
before any confirmed entry was written (a freshly-created topic,
or a topic whose owner was just re-elected). The Java client
surfaces that case as MessageId.earliest; we use Option so
callers don’t have to special-case a magic value.
Java: PersistentTopics#terminate.
Sourcepub async fn topic_update_partitions(
&self,
topic: &str,
new_partitions: u32,
) -> Result<(), AdminError>
pub async fn topic_update_partitions( &self, topic: &str, new_partitions: u32, ) -> Result<(), AdminError>
Grow a partitioned topic’s partition count.
POST /admin/v2/persistent/{tenant}/{namespace}/{topic}/partitions
with a bare JSON integer body. Only forward (grow) is supported by
the broker — shrinking returns 409. Java:
PersistentTopics#updatePartitionedTopic.
Sourcepub async fn topic_get_retention(
&self,
topic: &str,
) -> Result<RetentionPolicies, AdminError>
pub async fn topic_get_retention( &self, topic: &str, ) -> Result<RetentionPolicies, AdminError>
Get a topic’s retention policy.
GET /admin/v2/persistent/{tenant}/{ns}/{topic}/retention.
Returns the per-topic RetentionPolicies override; the broker
emits a RetentionPolicies JSON when the policy is set and a bare
null (decoded as RetentionPolicies::default() via #[serde(default)])
when no override is in place — callers fall back to the namespace
policy in that case. Java: PersistentTopicsBase#getRetention.
Sourcepub async fn topic_set_retention(
&self,
topic: &str,
policy: RetentionPolicies,
) -> Result<(), AdminError>
pub async fn topic_set_retention( &self, topic: &str, policy: RetentionPolicies, ) -> Result<(), AdminError>
Set a topic’s retention policy (overrides the namespace default).
POST /admin/v2/persistent/{tenant}/{ns}/{topic}/retention with a
JSON RetentionPolicies body. -1 means infinite (size or time).
Java: PersistentTopicsBase#setRetention.
Sourcepub async fn topic_remove_retention(
&self,
topic: &str,
) -> Result<(), AdminError>
pub async fn topic_remove_retention( &self, topic: &str, ) -> Result<(), AdminError>
Remove a topic’s retention policy (fall back to namespace default).
DELETE /admin/v2/persistent/{tenant}/{ns}/{topic}/retention.
Java: PersistentTopicsBase#removeRetention.
Sourcepub async fn topic_get_backlog_quotas(
&self,
topic: &str,
) -> Result<Value, AdminError>
pub async fn topic_get_backlog_quotas( &self, topic: &str, ) -> Result<Value, AdminError>
Get all backlog-quota policies on a topic.
GET /admin/v2/persistent/{tenant}/{ns}/{topic}/backlogQuotaMap.
Returns Map<BacklogQuotaType, BacklogQuota> — kept as raw JSON
for the same reason as Self::namespace_get_backlog_quotas:
broker minor versions add quota types.
Java: PersistentTopicsBase#getBacklogQuotaMap.
Sourcepub async fn topic_set_backlog_quota(
&self,
topic: &str,
backlog_quota_type: BacklogQuotaType,
quota: BacklogQuota,
) -> Result<(), AdminError>
pub async fn topic_set_backlog_quota( &self, topic: &str, backlog_quota_type: BacklogQuotaType, quota: BacklogQuota, ) -> Result<(), AdminError>
Set a backlog-quota policy on a topic (overrides the namespace
default for the matching backlogQuotaType).
POST /admin/v2/persistent/{tenant}/{ns}/{topic}/backlogQuota ?backlogQuotaType={type} with a JSON BacklogQuota body.
Java: PersistentTopicsBase#setBacklogQuota.
Sourcepub async fn topic_remove_backlog_quota(
&self,
topic: &str,
backlog_quota_type: BacklogQuotaType,
) -> Result<(), AdminError>
pub async fn topic_remove_backlog_quota( &self, topic: &str, backlog_quota_type: BacklogQuotaType, ) -> Result<(), AdminError>
Remove a backlog-quota policy from a topic.
DELETE /admin/v2/persistent/{tenant}/{ns}/{topic}/backlogQuota ?backlogQuotaType={type}.
Java: PersistentTopicsBase#removeBacklogQuota.
Sourcepub async fn topic_get_message_ttl(
&self,
topic: &str,
) -> Result<Option<i32>, AdminError>
pub async fn topic_get_message_ttl( &self, topic: &str, ) -> Result<Option<i32>, AdminError>
Get a topic’s message-TTL (seconds, or null if unset).
GET /admin/v2/persistent/{tenant}/{ns}/{topic}/messageTTL. Returns
a bare integer when the override is set, null (decoded as
Option::None) when no topic-level override is in place.
Java: PersistentTopicsBase#getMessageTTL.
Sourcepub async fn topic_set_message_ttl(
&self,
topic: &str,
ttl_seconds: i32,
) -> Result<(), AdminError>
pub async fn topic_set_message_ttl( &self, topic: &str, ttl_seconds: i32, ) -> Result<(), AdminError>
Set a topic’s message-TTL (seconds).
POST /admin/v2/persistent/{tenant}/{ns}/{topic}/messageTTL?messageTTL={n}.
0 disables (broker treats as no TTL).
Note: unlike the namespace-level setNamespaceMessageTTL (which
takes the TTL as a JSON int body), the topic-level setter binds
@QueryParam("messageTTL") Integer messageTTL on both Pulsar 4.0 and
4.2 (pulsar-broker/.../v2/PersistentTopics.java#setMessageTTL).
Sending the value as a JSON body returns 204 No Content but the
broker reads the query param as null and silently treats the call
as “no override” — the topic policy never persists, and a subsequent
topic_get_message_ttl surfaces Ok(None). Older Pulsar releases
tolerated both encodings; 4.2 enforces the query-param shape.
Java: PersistentTopicsBase#setMessageTTL.
Sourcepub async fn topic_remove_message_ttl(
&self,
topic: &str,
) -> Result<(), AdminError>
pub async fn topic_remove_message_ttl( &self, topic: &str, ) -> Result<(), AdminError>
Remove a topic’s message-TTL (fall back to namespace default).
DELETE /admin/v2/persistent/{tenant}/{ns}/{topic}/messageTTL.
Java: PersistentTopicsBase#removeMessageTTL.
Sourcepub async fn topic_get_persistence(
&self,
topic: &str,
) -> Result<Option<PersistencePolicies>, AdminError>
pub async fn topic_get_persistence( &self, topic: &str, ) -> Result<Option<PersistencePolicies>, AdminError>
Get a topic’s persistence policy.
GET /admin/v2/persistent/{tenant}/{ns}/{topic}/persistence. The
broker emits a PersistencePolicies JSON when the topic override
is set and null (decoded as Option::None) when no override is
in place — callers fall back to the namespace policy in that case.
Java: PersistentTopicsBase#getPersistence.
Sourcepub async fn topic_set_persistence(
&self,
topic: &str,
policy: PersistencePolicies,
) -> Result<(), AdminError>
pub async fn topic_set_persistence( &self, topic: &str, policy: PersistencePolicies, ) -> Result<(), AdminError>
Set a topic’s persistence policy (overrides the namespace default).
POST /admin/v2/persistent/{tenant}/{ns}/{topic}/persistence with a
JSON PersistencePolicies body.
Java: PersistentTopicsBase#setPersistence.
Sourcepub async fn topic_remove_persistence(
&self,
topic: &str,
) -> Result<(), AdminError>
pub async fn topic_remove_persistence( &self, topic: &str, ) -> Result<(), AdminError>
Remove a topic’s persistence policy (fall back to namespace default).
DELETE /admin/v2/persistent/{tenant}/{ns}/{topic}/persistence.
Java: PersistentTopicsBase#removePersistence.
Sourcepub async fn topic_get_dispatch_rate(
&self,
topic: &str,
) -> Result<Option<DispatchRate>, AdminError>
pub async fn topic_get_dispatch_rate( &self, topic: &str, ) -> Result<Option<DispatchRate>, AdminError>
Get a topic’s consumer dispatch-rate policy (or null if no override).
GET /admin/v2/persistent/{tenant}/{ns}/{topic}/dispatchRate. The
broker emits the per-topic DispatchRate override or null when
no override is set; callers fall back to the namespace policy in the
None case. Java: PersistentTopicsBase#getDispatchRate.
Sourcepub async fn topic_set_dispatch_rate(
&self,
topic: &str,
rate: DispatchRate,
) -> Result<(), AdminError>
pub async fn topic_set_dispatch_rate( &self, topic: &str, rate: DispatchRate, ) -> Result<(), AdminError>
Set a topic’s consumer dispatch-rate policy (overrides namespace default).
POST /admin/v2/persistent/{tenant}/{ns}/{topic}/dispatchRate with a
JSON DispatchRate body. Java: PersistentTopicsBase#setDispatchRate.
Sourcepub async fn topic_remove_dispatch_rate(
&self,
topic: &str,
) -> Result<(), AdminError>
pub async fn topic_remove_dispatch_rate( &self, topic: &str, ) -> Result<(), AdminError>
Remove a topic’s consumer dispatch-rate policy.
DELETE /admin/v2/persistent/{tenant}/{ns}/{topic}/dispatchRate.
Java: PersistentTopicsBase#removeDispatchRate.
Sourcepub async fn topic_get_subscription_dispatch_rate(
&self,
topic: &str,
) -> Result<Option<DispatchRate>, AdminError>
pub async fn topic_get_subscription_dispatch_rate( &self, topic: &str, ) -> Result<Option<DispatchRate>, AdminError>
Get a topic’s per-subscription dispatch-rate policy (or null).
GET /admin/v2/persistent/{tenant}/{ns}/{topic}/subscriptionDispatchRate.
Reuses the DispatchRate body shape — the policy applies per
subscription rather than aggregated across all consumers.
Java: PersistentTopicsBase#getSubscriptionDispatchRate.
Sourcepub async fn topic_set_subscription_dispatch_rate(
&self,
topic: &str,
rate: DispatchRate,
) -> Result<(), AdminError>
pub async fn topic_set_subscription_dispatch_rate( &self, topic: &str, rate: DispatchRate, ) -> Result<(), AdminError>
Set a topic’s per-subscription dispatch-rate policy (overrides namespace default).
POST /admin/v2/persistent/{tenant}/{ns}/{topic}/subscriptionDispatchRate
with a JSON DispatchRate body.
Java: PersistentTopicsBase#setSubscriptionDispatchRate.
Sourcepub async fn topic_remove_subscription_dispatch_rate(
&self,
topic: &str,
) -> Result<(), AdminError>
pub async fn topic_remove_subscription_dispatch_rate( &self, topic: &str, ) -> Result<(), AdminError>
Remove a topic’s per-subscription dispatch-rate policy.
DELETE /admin/v2/persistent/{tenant}/{ns}/{topic}/subscriptionDispatchRate.
Java: PersistentTopicsBase#removeSubscriptionDispatchRate.
Sourcepub async fn topic_get_replicator_dispatch_rate(
&self,
topic: &str,
) -> Result<Option<DispatchRate>, AdminError>
pub async fn topic_get_replicator_dispatch_rate( &self, topic: &str, ) -> Result<Option<DispatchRate>, AdminError>
Get a topic’s cross-cluster replicator dispatch-rate policy (or null).
GET /admin/v2/persistent/{tenant}/{ns}/{topic}/replicatorDispatchRate.
Reuses the DispatchRate body shape — the policy throttles
outbound geo-replication traffic from this cluster.
Java: PersistentTopicsBase#getReplicatorDispatchRate.
Sourcepub async fn topic_set_replicator_dispatch_rate(
&self,
topic: &str,
rate: DispatchRate,
) -> Result<(), AdminError>
pub async fn topic_set_replicator_dispatch_rate( &self, topic: &str, rate: DispatchRate, ) -> Result<(), AdminError>
Set a topic’s cross-cluster replicator dispatch-rate policy.
POST /admin/v2/persistent/{tenant}/{ns}/{topic}/replicatorDispatchRate
with a JSON DispatchRate body.
Java: PersistentTopicsBase#setReplicatorDispatchRate.
Sourcepub async fn topic_remove_replicator_dispatch_rate(
&self,
topic: &str,
) -> Result<(), AdminError>
pub async fn topic_remove_replicator_dispatch_rate( &self, topic: &str, ) -> Result<(), AdminError>
Remove a topic’s cross-cluster replicator dispatch-rate policy.
DELETE /admin/v2/persistent/{tenant}/{ns}/{topic}/replicatorDispatchRate.
Java: PersistentTopicsBase#removeReplicatorDispatchRate.
Sourcepub async fn topic_get_publish_rate(
&self,
topic: &str,
) -> Result<Option<PublishRate>, AdminError>
pub async fn topic_get_publish_rate( &self, topic: &str, ) -> Result<Option<PublishRate>, AdminError>
Get a topic’s publish-rate policy (or null if no override).
GET /admin/v2/persistent/{tenant}/{ns}/{topic}/publishRate. Returns
the per-topic PublishRate producer-side throttle (msg/sec +
byte/sec). -1 on either dimension means unlimited.
Java: PersistentTopicsBase#getPublishRate.
Sourcepub async fn topic_set_publish_rate(
&self,
topic: &str,
rate: PublishRate,
) -> Result<(), AdminError>
pub async fn topic_set_publish_rate( &self, topic: &str, rate: PublishRate, ) -> Result<(), AdminError>
Set a topic’s publish-rate policy (overrides namespace default).
POST /admin/v2/persistent/{tenant}/{ns}/{topic}/publishRate with a
JSON PublishRate body. Java: PersistentTopicsBase#setPublishRate.
Sourcepub async fn topic_remove_publish_rate(
&self,
topic: &str,
) -> Result<(), AdminError>
pub async fn topic_remove_publish_rate( &self, topic: &str, ) -> Result<(), AdminError>
Remove a topic’s publish-rate policy.
DELETE /admin/v2/persistent/{tenant}/{ns}/{topic}/publishRate.
Java: PersistentTopicsBase#removePublishRate.
Sourcepub async fn topic_get_max_producers(
&self,
topic: &str,
) -> Result<Option<i32>, AdminError>
pub async fn topic_get_max_producers( &self, topic: &str, ) -> Result<Option<i32>, AdminError>
Get a topic’s max-producers cap (or null if no override).
GET /admin/v2/persistent/{tenant}/{ns}/{topic}/maxProducers. Returns
a bare integer when the override is set, null (decoded as
Option::None) when no topic-level cap is in place.
Java: PersistentTopicsBase#getMaxProducers.
Sourcepub async fn topic_set_max_producers(
&self,
topic: &str,
max_producers: i32,
) -> Result<(), AdminError>
pub async fn topic_set_max_producers( &self, topic: &str, max_producers: i32, ) -> Result<(), AdminError>
Set a topic’s max-producers cap.
POST /admin/v2/persistent/{tenant}/{ns}/{topic}/maxProducers with
a bare integer body. 0 disables (broker treats as unlimited).
Java: PersistentTopicsBase#setMaxProducers.
Sourcepub async fn topic_remove_max_producers(
&self,
topic: &str,
) -> Result<(), AdminError>
pub async fn topic_remove_max_producers( &self, topic: &str, ) -> Result<(), AdminError>
Remove a topic’s max-producers cap (fall back to namespace / broker default).
DELETE /admin/v2/persistent/{tenant}/{ns}/{topic}/maxProducers.
Java: PersistentTopicsBase#removeMaxProducers.
Sourcepub async fn topic_get_max_consumers(
&self,
topic: &str,
) -> Result<Option<i32>, AdminError>
pub async fn topic_get_max_consumers( &self, topic: &str, ) -> Result<Option<i32>, AdminError>
Get a topic’s max-consumers cap (or null if no override).
GET /admin/v2/persistent/{tenant}/{ns}/{topic}/maxConsumers. Returns
a bare integer when the override is set, null (decoded as
Option::None) when no topic-level cap is in place.
Java: PersistentTopicsBase#getMaxConsumers.
Sourcepub async fn topic_set_max_consumers(
&self,
topic: &str,
max_consumers: i32,
) -> Result<(), AdminError>
pub async fn topic_set_max_consumers( &self, topic: &str, max_consumers: i32, ) -> Result<(), AdminError>
Set a topic’s max-consumers cap.
POST /admin/v2/persistent/{tenant}/{ns}/{topic}/maxConsumers with
a bare integer body. 0 disables (broker treats as unlimited).
Java: PersistentTopicsBase#setMaxConsumers.
Sourcepub async fn topic_remove_max_consumers(
&self,
topic: &str,
) -> Result<(), AdminError>
pub async fn topic_remove_max_consumers( &self, topic: &str, ) -> Result<(), AdminError>
Remove a topic’s max-consumers cap (fall back to namespace / broker default).
DELETE /admin/v2/persistent/{tenant}/{ns}/{topic}/maxConsumers.
Java: PersistentTopicsBase#removeMaxConsumers.
Sourcepub async fn subscriptions_list(
&self,
topic: &str,
) -> Result<Vec<String>, AdminError>
pub async fn subscriptions_list( &self, topic: &str, ) -> Result<Vec<String>, AdminError>
List subscription names on a topic.
GET /admin/v2/persistent/{tenant}/{namespace}/{topic}/subscriptions.
Java: PersistentTopics#getSubscriptions.
Sourcepub async fn subscription_reset_cursor_to_position(
&self,
topic: &str,
subscription: &str,
message_id: MessageId,
is_excluded: bool,
) -> Result<(), AdminError>
pub async fn subscription_reset_cursor_to_position( &self, topic: &str, subscription: &str, message_id: MessageId, is_excluded: bool, ) -> Result<(), AdminError>
Reset a subscription’s cursor to a specific message-id position.
POST /admin/v2/persistent/{tenant}/{namespace}/{topic}/subscription/{sub}/resetcursor
with body {ledgerId, entryId, partitionIndex, batchIndex, isExcluded}.
is_excluded = true skips the message at message_id itself; false
leaves it eligible for redelivery. Java: PersistentTopics#resetCursorOnPosition.
Sourcepub async fn subscription_reset_cursor_to_timestamp(
&self,
topic: &str,
subscription: &str,
timestamp_millis: u64,
) -> Result<(), AdminError>
pub async fn subscription_reset_cursor_to_timestamp( &self, topic: &str, subscription: &str, timestamp_millis: u64, ) -> Result<(), AdminError>
Reset a subscription’s cursor to a wall-clock timestamp (millis since epoch).
POST /admin/v2/persistent/{tenant}/{namespace}/{topic}/subscription/{sub}/resetcursor/ {timestamp}. Java: PersistentTopics#resetCursor(topic, sub, timestamp).
Sourcepub async fn subscription_skip_messages(
&self,
topic: &str,
subscription: &str,
num_messages: u64,
) -> Result<(), AdminError>
pub async fn subscription_skip_messages( &self, topic: &str, subscription: &str, num_messages: u64, ) -> Result<(), AdminError>
Advance a subscription’s cursor past N undelivered messages.
POST /admin/v2/persistent/{tenant}/{namespace}/{topic}/subscription/{sub}/skip/ {numMessages}. Java: PersistentTopics#skipMessages.
Sourcepub async fn subscription_skip_all_messages(
&self,
topic: &str,
subscription: &str,
) -> Result<(), AdminError>
pub async fn subscription_skip_all_messages( &self, topic: &str, subscription: &str, ) -> Result<(), AdminError>
Drain the entire backlog of a subscription (clear-backlog).
POST /admin/v2/persistent/{tenant}/{namespace}/{topic}/subscription/{sub}/skip_all.
Java: PersistentTopics#skipAllMessages.
Sourcepub async fn subscription_expire_messages(
&self,
topic: &str,
subscription: &str,
expire_time_seconds: u64,
) -> Result<(), AdminError>
pub async fn subscription_expire_messages( &self, topic: &str, subscription: &str, expire_time_seconds: u64, ) -> Result<(), AdminError>
Expire all messages older than expire_time_seconds for a subscription.
POST /admin/v2/persistent/{tenant}/{namespace}/{topic}/subscription/{sub}/expireMessages/ {seconds}. Java: PersistentTopics#expireMessagesForSubscription.
Sourcepub async fn subscription_delete(
&self,
topic: &str,
subscription: &str,
force: bool,
) -> Result<(), AdminError>
pub async fn subscription_delete( &self, topic: &str, subscription: &str, force: bool, ) -> Result<(), AdminError>
Delete (unsubscribe) a subscription.
DELETE /admin/v2/persistent/{tenant}/{namespace}/{topic}/subscription/{sub}?force={force}.
force = true disconnects active consumers before deletion. Java:
PersistentTopics#deleteSubscription.
Sourcepub async fn create_shadow_topic(
&self,
source: &str,
shadow: &str,
) -> Result<(), AdminError>
pub async fn create_shadow_topic( &self, source: &str, shadow: &str, ) -> Result<(), AdminError>
Create a shadow topic (PIP-180).
Creates the shadow as a regular non-partitioned topic with the broker-reserved
PULSAR.SHADOW_SOURCE topic property pointing at the source topic:
PUT /admin/v2/persistent/{tenant}/{namespace}/{shadow} with body
{ "PULSAR.SHADOW_SOURCE": "persistent://tenant/ns/source" }.
Then links the created shadow in the source topic policy list via
PUT /admin/v2/persistent/{tenant}/{namespace}/{source}/shadowTopics.
Java:
pulsar-client-admin/.../TopicsImpl.java#createShadowTopicAsync builds the same property
map before calling createNonPartitionedTopicAsync for non-partitioned sources.
Errors mirror the existing AdminError taxonomy: 404 → Status { code: 404, .. } (the source topic does not exist), 409 → Status { code: 409, .. } (the shadow topic already exists on this source),
401/403 → Status { code: 401|403, .. } (auth).
Sourcepub async fn delete_shadow_topic(
&self,
shadow: &str,
force: bool,
) -> Result<(), AdminError>
pub async fn delete_shadow_topic( &self, shadow: &str, force: bool, ) -> Result<(), AdminError>
Delete a shadow topic (PIP-180).
DELETE /admin/v2/persistent/{tenant}/{namespace}/{topic} where
{topic} is the shadow topic name. PIP-180’s deletion contract
goes through the regular topic-delete path on the shadow itself —
the broker recognises the topic as a shadow and detaches it from
the source ledger atomically with the metadata delete.
force controls whether active subscribers are kicked off before
the delete (?force=true) or whether the broker rejects the
request when subscribers exist (?force=false, the default).
Java: org.apache.pulsar.client.admin.Topics#deleteShadowTopic calls
the same @DELETE @Path("/{tenant}/{namespace}/{topic}") endpoint.
Sourcepub async fn get_shadow_topics(
&self,
source: &str,
) -> Result<Vec<String>, AdminError>
pub async fn get_shadow_topics( &self, source: &str, ) -> Result<Vec<String>, AdminError>
List the shadow topics created on a source topic (PIP-180).
GET /admin/v2/persistent/{tenant}/{namespace}/{topic}/shadowTopics
where {topic} is the source topic name. The broker returns a
JSON array of fully-qualified shadow topic names.
Java:
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
(@GET @Path("/{tenant}/{namespace}/{topic}/shadowTopics")).
Used by the runtime engine at consumer subscribe time: when the user
subscribes to a topic the runtime cannot yet classify, a single
get_shadow_topics lookup on every other topic in the namespace is
expensive; instead the runtime calls get_shadow_topics(subscribed)
on the topic itself — a non-shadow topic returns an empty array, a
shadow topic surfaces nothing but the broker has already populated
the consumer’s shadow_metadata via the topic’s policy.
(See crates/magnetar-runtime-tokio/src/client.rs::subscribe.)
Sourcepub async fn get_shadow_source(
&self,
shadow: &str,
) -> Result<Option<String>, AdminError>
pub async fn get_shadow_source( &self, shadow: &str, ) -> Result<Option<String>, AdminError>
Resolve the source topic of a shadow topic (PIP-180).
GET /admin/v2/persistent/{tenant}/{namespace}/{topic}/properties.
Returns the PULSAR.SHADOW_SOURCE property when the queried topic is a
shadow; returns None when it is a regular topic. Used by the runtime
at subscribe time to populate
magnetar_proto::ShadowTopicMetadata::source_topic on the new
consumer (so the receive path can emit
magnetar_proto::ConnectionEvent::MessageReceivedFromShadow
without an out-of-band lookup per message).
Java: org.apache.pulsar.client.admin.Topics#getShadowSource —
TopicsImpl#getShadowSourceAsync delegates to getPropertiesAsync.
Sourcepub async fn sources_list_by_namespace(
&self,
tenant: &str,
namespace: &str,
) -> Result<Vec<String>, AdminError>
pub async fn sources_list_by_namespace( &self, tenant: &str, namespace: &str, ) -> Result<Vec<String>, AdminError>
List sources configured under a namespace.
GET /admin/v3/sources/{tenant}/{namespace}. Returns the list of
source names (the broker emits a JSON array of strings — one
entry per declared source, regardless of running state).
Java:
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Sources.java
(@Path("/sources")) +
pulsar-broker/.../admin/impl/SourcesBase.java#listSources.
Sourcepub async fn source_get(
&self,
tenant: &str,
namespace: &str,
name: &str,
) -> Result<Value, AdminError>
pub async fn source_get( &self, tenant: &str, namespace: &str, name: &str, ) -> Result<Value, AdminError>
Get one source’s configuration.
GET /admin/v3/sources/{tenant}/{namespace}/{name}. Returns the
stored SourceConfig envelope as raw JSON — minor broker
versions extend the shape (new connector knobs, secret refs)
faster than a typed Rust DTO can keep up.
Java: SourcesBase#getSourceInfo.
Sourcepub async fn source_status(
&self,
tenant: &str,
namespace: &str,
name: &str,
) -> Result<Value, AdminError>
pub async fn source_status( &self, tenant: &str, namespace: &str, name: &str, ) -> Result<Value, AdminError>
Get a source’s running status (per-instance worker telemetry).
GET /admin/v3/sources/{tenant}/{namespace}/{name}/status.
Returns the broker’s SourceStatus envelope (numInstances,
numRunning, per-instance workerId + running + last
received timestamp). Exposed as raw JSON for forward-compat.
Java: SourcesBase#getSourceStatus.
Sourcepub async fn source_create_with_url(
&self,
tenant: &str,
namespace: &str,
name: &str,
url: &str,
config: SourceConfig,
) -> Result<(), AdminError>
pub async fn source_create_with_url( &self, tenant: &str, namespace: &str, name: &str, url: &str, config: SourceConfig, ) -> Result<(), AdminError>
Register a source from a remote package URL.
POST /admin/v3/sources/{tenant}/{namespace}/{name} with
multipart/form-data carrying two parts: a url text part (the
package URL — http(s)://, file://, or function:// per the
broker’s WorkerUtils#downloadFileFromPackageUrl) and a
sourceConfig JSON part with the SourceConfig body.
A sibling source_create (binary upload) is intentionally not
yet exposed — this URL-based variant covers every
CI/operator scenario that does not need to ship a JAR through
the admin client itself.
Java: SourcesBase#registerSource.
Sourcepub async fn source_update_with_url(
&self,
tenant: &str,
namespace: &str,
name: &str,
url: &str,
config: SourceConfig,
) -> Result<(), AdminError>
pub async fn source_update_with_url( &self, tenant: &str, namespace: &str, name: &str, url: &str, config: SourceConfig, ) -> Result<(), AdminError>
Update a source from a remote package URL.
PUT /admin/v3/sources/{tenant}/{namespace}/{name} with the
same multipart shape as Self::source_create_with_url.
Java: SourcesBase#updateSource.
Sourcepub async fn source_delete(
&self,
tenant: &str,
namespace: &str,
name: &str,
) -> Result<(), AdminError>
pub async fn source_delete( &self, tenant: &str, namespace: &str, name: &str, ) -> Result<(), AdminError>
Delete a source.
DELETE /admin/v3/sources/{tenant}/{namespace}/{name}. Removes
the source declaration and tears the running instances down.
Java: SourcesBase#deregisterSource.
Sourcepub async fn source_start(
&self,
tenant: &str,
namespace: &str,
name: &str,
) -> Result<(), AdminError>
pub async fn source_start( &self, tenant: &str, namespace: &str, name: &str, ) -> Result<(), AdminError>
Start every instance of a source.
POST /admin/v3/sources/{tenant}/{namespace}/{name}/start.
Java: SourcesBase#startSource.
Sourcepub async fn source_stop(
&self,
tenant: &str,
namespace: &str,
name: &str,
) -> Result<(), AdminError>
pub async fn source_stop( &self, tenant: &str, namespace: &str, name: &str, ) -> Result<(), AdminError>
Stop every instance of a source.
POST /admin/v3/sources/{tenant}/{namespace}/{name}/stop.
Java: SourcesBase#stopSource.
Sourcepub async fn source_restart(
&self,
tenant: &str,
namespace: &str,
name: &str,
) -> Result<(), AdminError>
pub async fn source_restart( &self, tenant: &str, namespace: &str, name: &str, ) -> Result<(), AdminError>
Restart every instance of a source.
POST /admin/v3/sources/{tenant}/{namespace}/{name}/restart.
Java: SourcesBase#restartSource.
Sourcepub async fn sinks_list_by_namespace(
&self,
tenant: &str,
namespace: &str,
) -> Result<Vec<String>, AdminError>
pub async fn sinks_list_by_namespace( &self, tenant: &str, namespace: &str, ) -> Result<Vec<String>, AdminError>
List sinks configured under a namespace.
GET /admin/v3/sinks/{tenant}/{namespace}. Returns the list of
sink names. Mirrors Self::sources_list_by_namespace —
Pulsar’s Sources / Sinks REST surfaces are intentionally
symmetric.
Java:
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Sinks.java
(@Path("/sinks")) +
pulsar-broker/.../admin/impl/SinksBase.java#listSinks.
Sourcepub async fn sink_get(
&self,
tenant: &str,
namespace: &str,
name: &str,
) -> Result<Value, AdminError>
pub async fn sink_get( &self, tenant: &str, namespace: &str, name: &str, ) -> Result<Value, AdminError>
Get one sink’s configuration.
GET /admin/v3/sinks/{tenant}/{namespace}/{name}. Returns the
stored SinkConfig as raw JSON for the same forward-compat
reason as Self::source_get. Java: SinksBase#getSinkInfo.
Sourcepub async fn sink_status(
&self,
tenant: &str,
namespace: &str,
name: &str,
) -> Result<Value, AdminError>
pub async fn sink_status( &self, tenant: &str, namespace: &str, name: &str, ) -> Result<Value, AdminError>
Get a sink’s running status (per-instance worker telemetry).
GET /admin/v3/sinks/{tenant}/{namespace}/{name}/status.
Same envelope shape as the Sources status. Raw JSON.
Java: SinksBase#getSinkStatus.
Sourcepub async fn sink_create_with_url(
&self,
tenant: &str,
namespace: &str,
name: &str,
url: &str,
config: SinkConfig,
) -> Result<(), AdminError>
pub async fn sink_create_with_url( &self, tenant: &str, namespace: &str, name: &str, url: &str, config: SinkConfig, ) -> Result<(), AdminError>
Register a sink from a remote package URL.
POST /admin/v3/sinks/{tenant}/{namespace}/{name} with
multipart/form-data carrying a url text part and a
sinkConfig JSON part. Mirrors
Self::source_create_with_url; the only wire-level
difference is the JSON-part field name (sinkConfig vs
sourceConfig). Java: SinksBase#registerSink.
Sourcepub async fn sink_update_with_url(
&self,
tenant: &str,
namespace: &str,
name: &str,
url: &str,
config: SinkConfig,
) -> Result<(), AdminError>
pub async fn sink_update_with_url( &self, tenant: &str, namespace: &str, name: &str, url: &str, config: SinkConfig, ) -> Result<(), AdminError>
Update a sink from a remote package URL.
PUT /admin/v3/sinks/{tenant}/{namespace}/{name} with the same
multipart shape as Self::sink_create_with_url.
Java: SinksBase#updateSink.
Sourcepub async fn sink_delete(
&self,
tenant: &str,
namespace: &str,
name: &str,
) -> Result<(), AdminError>
pub async fn sink_delete( &self, tenant: &str, namespace: &str, name: &str, ) -> Result<(), AdminError>
Delete a sink.
DELETE /admin/v3/sinks/{tenant}/{namespace}/{name}.
Java: SinksBase#deregisterSink.
Sourcepub async fn sink_start(
&self,
tenant: &str,
namespace: &str,
name: &str,
) -> Result<(), AdminError>
pub async fn sink_start( &self, tenant: &str, namespace: &str, name: &str, ) -> Result<(), AdminError>
Start every instance of a sink.
POST /admin/v3/sinks/{tenant}/{namespace}/{name}/start.
Java: SinksBase#startSink.
Sourcepub async fn sink_stop(
&self,
tenant: &str,
namespace: &str,
name: &str,
) -> Result<(), AdminError>
pub async fn sink_stop( &self, tenant: &str, namespace: &str, name: &str, ) -> Result<(), AdminError>
Stop every instance of a sink.
POST /admin/v3/sinks/{tenant}/{namespace}/{name}/stop.
Java: SinksBase#stopSink.
Sourcepub async fn sink_restart(
&self,
tenant: &str,
namespace: &str,
name: &str,
) -> Result<(), AdminError>
pub async fn sink_restart( &self, tenant: &str, namespace: &str, name: &str, ) -> Result<(), AdminError>
Restart every instance of a sink.
POST /admin/v3/sinks/{tenant}/{namespace}/{name}/restart.
Java: SinksBase#restartSink.
Sourcepub async fn packages_list(
&self,
pkg_type: PackageType,
tenant: &str,
namespace: &str,
) -> Result<Vec<String>, AdminError>
pub async fn packages_list( &self, pkg_type: PackageType, tenant: &str, namespace: &str, ) -> Result<Vec<String>, AdminError>
List package names declared under (type, tenant, namespace).
GET /admin/v3/packages/{type}/{tenant}/{namespace}. Returns the
list of package names — not versions; one entry per declared
package. Use Self::package_versions_list to enumerate the
versions of one package.
Java: pulsar-broker/src/main/java/org/apache/pulsar/broker/ admin/v3/Packages.java#listPackages.
Sourcepub async fn package_versions_list(
&self,
pkg_type: PackageType,
tenant: &str,
namespace: &str,
name: &str,
) -> Result<Vec<String>, AdminError>
pub async fn package_versions_list( &self, pkg_type: PackageType, tenant: &str, namespace: &str, name: &str, ) -> Result<Vec<String>, AdminError>
List the versions declared for one package.
GET /admin/v3/packages/{type}/{tenant}/{namespace}/{name}.
Returns the list of version strings (Pulsar treats versions as
opaque strings — 1.0.0, latest, build hashes — and only the
metadata endpoints understand them).
Java: PackagesBase#listPackageVersions.
Sourcepub async fn package_metadata_get(
&self,
pkg_type: PackageType,
tenant: &str,
namespace: &str,
name: &str,
version: &str,
) -> Result<Value, AdminError>
pub async fn package_metadata_get( &self, pkg_type: PackageType, tenant: &str, namespace: &str, name: &str, version: &str, ) -> Result<Value, AdminError>
Get the metadata envelope for one package version.
GET /admin/v3/packages/{type}/{tenant}/{namespace}/{name}/ {version}/metadata. Returns the PackageMetadata envelope as
raw JSON for forward-compat — broker minor versions extend the
shape with tags, documentationUrl, etc.
Java: PackagesBase#getPackageMetadata.
Sourcepub async fn package_metadata_set(
&self,
pkg_type: PackageType,
tenant: &str,
namespace: &str,
name: &str,
version: &str,
metadata: PackageMetadata,
) -> Result<(), AdminError>
pub async fn package_metadata_set( &self, pkg_type: PackageType, tenant: &str, namespace: &str, name: &str, version: &str, metadata: PackageMetadata, ) -> Result<(), AdminError>
Replace the metadata envelope for one package version.
PUT /admin/v3/packages/{type}/{tenant}/{namespace}/{name}/ {version}/metadata with a PackageMetadata JSON body.
The broker rejects this verb with 404 if the package version
does not exist — package_metadata_set is update, never
create. Java: PackagesBase#updatePackageMetadata.
Sourcepub async fn package_delete(
&self,
pkg_type: PackageType,
tenant: &str,
namespace: &str,
name: &str,
version: &str,
) -> Result<(), AdminError>
pub async fn package_delete( &self, pkg_type: PackageType, tenant: &str, namespace: &str, name: &str, version: &str, ) -> Result<(), AdminError>
Delete one package version.
DELETE /admin/v3/packages/{type}/{tenant}/{namespace}/{name}/ {version}. The broker drops the version’s metadata + storage
atomically; other versions of the same package are untouched.
Java: PackagesBase#deletePackage.
Trait Implementations§
Source§impl Clone for AdminClient
impl Clone for AdminClient
Source§fn clone(&self) -> AdminClient
fn clone(&self) -> AdminClient
1.0.0 (const: unstable) · Source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
source. Read more