magnetar_admin/lib.rs
1// SPDX-License-Identifier: Apache-2.0
2
3//! Apache Pulsar admin REST client (`/admin/v2/...`).
4//!
5//! Thin async wrapper around [`reqwest`] for the broker's JAX-RS admin API.
6//! TLS is via `rustls-tls`. There are no channels and no background tasks: every
7//! call is a one-shot `await` that resolves to a [`Result`].
8//!
9//! Endpoint paths mirror the broker. Each method's rustdoc cites the Java
10//! endpoint class (file + relevant `@Path` annotation) in `apache/pulsar` so a
11//! reader can confirm the URL and HTTP verb against upstream.
12//!
13//! ## Quick start
14//!
15//! ```no_run
16//! # async fn run() -> Result<(), Box<dyn std::error::Error>> {
17//! use magnetar_admin::{AdminClient, TenantInfo};
18//!
19//! let admin = AdminClient::builder()
20//! .service_url("http://localhost:8080".parse()?)
21//! .build()?;
22//!
23//! let tenants = admin.tenants_list().await?;
24//! println!("{tenants:?}");
25//!
26//! admin
27//! .tenant_create(
28//! "acme",
29//! TenantInfo {
30//! admin_roles: vec!["admin".into()],
31//! allowed_clusters: vec!["standalone".into()],
32//! },
33//! )
34//! .await?;
35//! # Ok(()) }
36//! ```
37
38#![warn(unreachable_pub)]
39#![forbid(unsafe_code)]
40
41mod tls_crypto;
42
43use std::collections::HashMap;
44use std::time::Duration;
45
46use magnetar_proto::MessageId;
47use reqwest::header::{AUTHORIZATION, HeaderMap, HeaderValue};
48use reqwest::{Method, RequestBuilder, Response, StatusCode};
49use serde::{Deserialize, Serialize};
50use url::Url;
51
52/// Default request timeout. Mirrors `PulsarAdminBuilder` Java default of 60s
53/// (see `pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/
54/// PulsarAdminBuilderImpl.java`).
55pub const DEFAULT_TIMEOUT: Duration = Duration::from_secs(60);
56
57/// Longer per-call timeout for `*_create_with_url` / `*_update_with_url`
58/// endpoints. The broker fetches the package from the supplied URL
59/// before responding, so the round-trip can comfortably exceed 60 s
60/// against a slow internal registry (S3 / GCS / function://) or a
61/// large `.nar` / `.jar` artifact. Overrides the client-level timeout
62/// for those calls only — every other admin verb keeps the 60 s
63/// budget [`DEFAULT_TIMEOUT`] provides.
64///
65/// 5 min matches the Java admin client's read-timeout for register
66/// paths.
67pub const PACKAGE_REGISTER_TIMEOUT: Duration = Duration::from_secs(300);
68
69/// Authentication strategy used by the admin client.
70///
71/// `Token(...)` adds `Authorization: Bearer <token>` to every request.
72/// Mirrors Java's `AuthenticationToken` provider.
73#[derive(Clone, Default)]
74pub enum AdminAuth {
75 /// No authentication.
76 #[default]
77 None,
78 /// Bearer token. The string is the raw token; the `Bearer ` prefix is added
79 /// at request time.
80 Token(String),
81}
82
83impl std::fmt::Debug for AdminAuth {
84 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
85 // Redact the token body so calling `Debug` on the admin client never
86 // spills the bearer credential to tracing or stdout. Mirrors the
87 // `Credentials`/`ClientCredentialsFlow` Debug redaction in
88 // `magnetar-auth-oauth2`.
89 match self {
90 Self::None => f.write_str("None"),
91 Self::Token(_) => f.debug_tuple("Token").field(&"<redacted>").finish(),
92 }
93 }
94}
95
96/// Apache Pulsar admin REST client.
97///
98/// Holds two pre-computed base URLs: `base_url` anchored at
99/// `/admin/v2/` (clusters / tenants / namespaces / topics /
100/// subscriptions / brokers / bookies / schemas) and `base_url_v3`
101/// anchored at `/admin/v3/` (Pulsar Functions / IO Sources / IO Sinks
102/// / Packages). The split mirrors Pulsar's own routing — Java's
103/// `PulsarAdmin` keeps them separate too. Both URLs are derived from
104/// the same broker root at builder time, so a caller never has to
105/// know which version family an endpoint belongs to.
106#[derive(Debug, Clone)]
107pub struct AdminClient {
108 base_url: Url,
109 base_url_v3: Url,
110 http: reqwest::Client,
111 auth: AdminAuth,
112}
113
114impl AdminClient {
115 /// Start building an admin client.
116 #[must_use]
117 pub fn builder() -> AdminClientBuilder {
118 AdminClientBuilder::default()
119 }
120
121 /// Return the base URL the client targets (with the trailing `/admin/v2/`
122 /// component already appended). Exposed for tests and diagnostics.
123 #[must_use]
124 pub fn base_url(&self) -> &Url {
125 &self.base_url
126 }
127
128 /// Return the configured auth strategy. Exposed for tests and diagnostics.
129 #[must_use]
130 pub fn auth(&self) -> &AdminAuth {
131 &self.auth
132 }
133
134 // --- Cluster ---------------------------------------------------------
135
136 /// List clusters.
137 ///
138 /// `GET /admin/v2/clusters`.
139 /// Java: `pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Clusters.java`
140 /// (`@Path("/clusters")`) + `admin/impl/ClustersBase.java#getClusters`.
141 pub async fn cluster_list(&self) -> Result<Vec<String>, AdminError> {
142 let url = self.url(&["clusters"])?;
143 let resp = self.send(self.http.request(Method::GET, url)).await?;
144 json_ok(resp).await
145 }
146
147 /// List failure-domains configured on a cluster.
148 ///
149 /// `GET /admin/v2/clusters/{cluster}/failureDomains`. The broker returns
150 /// a `Map<String, FailureDomain>` keyed by domain name; each value
151 /// carries a `brokers: Set<String>` member. The map is exposed as a
152 /// raw `serde_json::Value` for forward-compat — broker minor versions
153 /// add fields.
154 /// Java: `ClustersBase#getFailureDomains`.
155 pub async fn cluster_failure_domains_list(
156 &self,
157 cluster: &str,
158 ) -> Result<serde_json::Value, AdminError> {
159 validate_segment(cluster)?;
160 let url = self.url(&["clusters", cluster, "failureDomains"])?;
161 let resp = self.send(self.http.request(Method::GET, url)).await?;
162 json_ok(resp).await
163 }
164
165 /// Get one failure-domain by name.
166 ///
167 /// `GET /admin/v2/clusters/{cluster}/failureDomains/{domain}`.
168 /// Java: `ClustersBase#getDomain`.
169 pub async fn cluster_failure_domain_get(
170 &self,
171 cluster: &str,
172 domain: &str,
173 ) -> Result<serde_json::Value, AdminError> {
174 validate_segment(cluster)?;
175 validate_segment(domain)?;
176 let url = self.url(&["clusters", cluster, "failureDomains", domain])?;
177 let resp = self.send(self.http.request(Method::GET, url)).await?;
178 json_ok(resp).await
179 }
180
181 /// List namespace-isolation policies configured on a cluster.
182 ///
183 /// `GET /admin/v2/clusters/{cluster}/namespaceIsolationPolicies`. The
184 /// broker returns a `Map<String, NamespaceIsolationData>` carrying
185 /// the namespace regex, primary/secondary broker lists, and the
186 /// auto-failover policy. Exposed as raw JSON for forward-compat.
187 ///
188 /// A cluster with no isolation policies configured surfaces as
189 /// `404 NamespaceIsolationPolicies for cluster ... does not exist`
190 /// (Pulsar 4.0.x) rather than an empty map; we mirror the Java
191 /// client's `Map<String, _>` surface by mapping that specific 404 to
192 /// an empty `{}` object. Other 404s (auth, wrong cluster) still
193 /// surface as `AdminError::Status`.
194 /// Java: `ClustersBase#getNamespaceIsolationPolicies`.
195 pub async fn namespace_isolation_policies_list(
196 &self,
197 cluster: &str,
198 ) -> Result<serde_json::Value, AdminError> {
199 validate_segment(cluster)?;
200 let url = self.url(&["clusters", cluster, "namespaceIsolationPolicies"])?;
201 let resp = self.send(self.http.request(Method::GET, url)).await?;
202 match json_ok::<serde_json::Value>(resp).await {
203 Ok(v) => Ok(v),
204 Err(AdminError::Status { code: 404, body })
205 if body.contains("NamespaceIsolationPolicies") =>
206 {
207 Ok(serde_json::Value::Object(serde_json::Map::new()))
208 }
209 Err(e) => Err(e),
210 }
211 }
212
213 // --- Brokers ---------------------------------------------------------
214
215 /// List active brokers in a cluster.
216 ///
217 /// `GET /admin/v2/brokers/{cluster}`. Returns a list of `host:port`
218 /// strings — one entry per broker that's currently registered with
219 /// the cluster's metadata store. Java: `BrokersBase#getActiveBrokers`.
220 pub async fn brokers_list(&self, cluster: &str) -> Result<Vec<String>, AdminError> {
221 validate_segment(cluster)?;
222 let url = self.url(&["brokers", cluster])?;
223 let resp = self.send(self.http.request(Method::GET, url)).await?;
224 json_ok(resp).await
225 }
226
227 /// Get the current leader broker for the cluster.
228 ///
229 /// `GET /admin/v2/brokers/leaderBroker`. Returns `{ serviceUrl,
230 /// brokerId }`. Exposed as raw JSON for forward-compat — newer
231 /// brokers add `clusterName` and similar fields.
232 /// Java: `BrokersBase#getLeaderBroker`.
233 pub async fn brokers_leader(&self) -> Result<serde_json::Value, AdminError> {
234 let url = self.url(&["brokers", "leaderBroker"])?;
235 let resp = self.send(self.http.request(Method::GET, url)).await?;
236 json_ok(resp).await
237 }
238
239 /// List the names of all dynamic-config keys the broker exposes.
240 ///
241 /// `GET /admin/v2/brokers/configuration`. Returns the bare list of
242 /// `ServiceConfiguration` fields tagged `@FieldContext(dynamic = true)`
243 /// — the set of keys that `brokers_set_dynamic_config` accepts. Use
244 /// [`Self::brokers_dynamic_config_overrides`] for the current values.
245 ///
246 /// Pulsar 4 normally returns a JSON array (`List<String>` from
247 /// `BrokerService#getDynamicConfiguration`), but some packaging /
248 /// proxy paths surface the underlying `Map<String, ConfigField>`
249 /// shape instead. We accept both — array → values, object → keys —
250 /// to stay version-tolerant. Java: `BrokersBase#getDynamicConfigurationName`.
251 pub async fn brokers_dynamic_config_keys(&self) -> Result<Vec<String>, AdminError> {
252 let url = self.url(&["brokers", "configuration"])?;
253 let resp = self.send(self.http.request(Method::GET, url)).await?;
254 let v: serde_json::Value = json_ok(resp).await?;
255 match v {
256 serde_json::Value::Array(items) => Ok(items
257 .into_iter()
258 .filter_map(|x| x.as_str().map(str::to_owned))
259 .collect()),
260 serde_json::Value::Object(map) => Ok(map.into_iter().map(|(k, _)| k).collect()),
261 other => Err(AdminError::Protocol(format!(
262 "brokers/configuration returned unexpected shape: {other}"
263 ))),
264 }
265 }
266
267 /// Get the currently-overridden dynamic configuration values.
268 ///
269 /// `GET /admin/v2/brokers/configuration/values`. Returns a
270 /// `Map<String, String>` of every dynamic key the operator has set
271 /// (the broker omits keys still on their static / default value).
272 /// Exposed as raw JSON because broker minor versions add keys.
273 /// Java: `BrokersBase#getAllDynamicConfigurations`.
274 pub async fn brokers_dynamic_config_overrides(&self) -> Result<serde_json::Value, AdminError> {
275 let url = self.url(&["brokers", "configuration", "values"])?;
276 let resp = self.send(self.http.request(Method::GET, url)).await?;
277 json_ok(resp).await
278 }
279
280 /// Get the broker's runtime (merged static + dynamic) configuration.
281 ///
282 /// `GET /admin/v2/brokers/configuration/runtime`. Returns the full
283 /// `Map<String, String>` of `ServiceConfiguration` values as they
284 /// currently apply on the broker process — static defaults
285 /// overlaid with any `brokers_set_dynamic_config` overrides. Raw
286 /// JSON for forward-compat. Java: `BrokersBase#getRuntimeConfiguration`.
287 pub async fn brokers_runtime_config(&self) -> Result<serde_json::Value, AdminError> {
288 let url = self.url(&["brokers", "configuration", "runtime"])?;
289 let resp = self.send(self.http.request(Method::GET, url)).await?;
290 json_ok(resp).await
291 }
292
293 /// Get the broker's internal-stack endpoints.
294 ///
295 /// `GET /admin/v2/brokers/internal-configuration`. Returns the
296 /// `InternalConfigurationData` envelope — metadata-store URLs
297 /// (`zookeeperServers`, `configurationMetadataStoreUrl`),
298 /// `BookKeeper` metadata service URI, ledger root paths. Raw JSON
299 /// for forward-compat; the shape rolls between releases as the
300 /// metadata layer evolves.
301 /// Java: `BrokersBase#getInternalConfigurationData`.
302 pub async fn brokers_internal_config(&self) -> Result<serde_json::Value, AdminError> {
303 let url = self.url(&["brokers", "internal-configuration"])?;
304 let resp = self.send(self.http.request(Method::GET, url)).await?;
305 json_ok(resp).await
306 }
307
308 /// Probe broker health — produces and consumes one heartbeat message
309 /// on an internal topic.
310 ///
311 /// `GET /admin/v2/brokers/health`. The broker returns the plain-text
312 /// string `"ok"` on success; non-200 surfaces as `AdminError::Status`.
313 /// Java: `BrokersBase#healthCheck`.
314 pub async fn brokers_health_check(&self) -> Result<String, AdminError> {
315 let url = self.url(&["brokers", "health"])?;
316 let resp = self.send(self.http.request(Method::GET, url)).await?;
317 let resp = ensure_status(resp).await?;
318 Ok(resp.text().await?)
319 }
320
321 /// List the namespaces a specific broker currently owns.
322 ///
323 /// `GET /admin/v2/brokers/{cluster}/{broker}/ownedNamespaces`. The
324 /// `broker` argument must be the broker's `host:port` (matching the
325 /// strings [`Self::brokers_list`] returns). Returns a
326 /// `Map<String, NamespaceOwnershipStatus>` keyed by namespace name —
327 /// raw JSON for forward-compat.
328 /// Java: `BrokersBase#getOwnedNamespaces`.
329 pub async fn brokers_owned_namespaces(
330 &self,
331 cluster: &str,
332 broker: &str,
333 ) -> Result<serde_json::Value, AdminError> {
334 validate_segment(cluster)?;
335 validate_segment(broker)?;
336 let url = self.url(&["brokers", cluster, broker, "ownedNamespaces"])?;
337 let resp = self.send(self.http.request(Method::GET, url)).await?;
338 json_ok(resp).await
339 }
340
341 /// Override a dynamic broker configuration value.
342 ///
343 /// `POST /admin/v2/brokers/configuration/{name}/{value}`. Both the
344 /// key and the value travel in the URL path — there is no request
345 /// body — matching the broker's `updateDynamicConfiguration(@PathParam
346 /// String configName, @PathParam String configValue)` signature.
347 /// The key must be one of those returned by
348 /// [`Self::brokers_dynamic_config_keys`]; unknown keys yield 412.
349 /// Java: `BrokersBase#updateDynamicConfiguration`.
350 pub async fn brokers_set_dynamic_config(
351 &self,
352 name: &str,
353 value: &str,
354 ) -> Result<(), AdminError> {
355 validate_segment(name)?;
356 validate_segment(value)?;
357 let url = self.url(&["brokers", "configuration", name, value])?;
358 let resp = self.send(self.http.request(Method::POST, url)).await?;
359 empty_ok(resp).await
360 }
361
362 /// Drop a dynamic configuration override, reverting to the static value.
363 ///
364 /// `DELETE /admin/v2/brokers/configuration/{name}`. After the call
365 /// the key disappears from [`Self::brokers_dynamic_config_overrides`]
366 /// and [`Self::brokers_runtime_config`] reflects the underlying
367 /// static / default value again.
368 /// Java: `BrokersBase#deleteDynamicConfiguration`.
369 pub async fn brokers_delete_dynamic_config(&self, name: &str) -> Result<(), AdminError> {
370 validate_segment(name)?;
371 let url = self.url(&["brokers", "configuration", name])?;
372 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
373 empty_ok(resp).await
374 }
375
376 // --- Bookies ---------------------------------------------------------
377
378 /// List every bookie the broker knows about — both writable and
379 /// read-only — as registered in `BookKeeper` metadata.
380 ///
381 /// `GET /admin/v2/bookies/all`. Returns the broker's
382 /// `BookiesClusterInfo` envelope — a `bookies: [{ address: "host:port" }]`
383 /// array. Raw JSON for forward-compat.
384 /// Java: `BookiesBase#getAllAvailableBookies`.
385 pub async fn bookies_list_all(&self) -> Result<serde_json::Value, AdminError> {
386 let url = self.url(&["bookies", "all"])?;
387 let resp = self.send(self.http.request(Method::GET, url)).await?;
388 json_ok(resp).await
389 }
390
391 /// Get every bookie's group + rack assignment, as configured for the
392 /// rack-aware placement policy.
393 ///
394 /// `GET /admin/v2/bookies/racks-info`. Returns the nested
395 /// `Map<group, Map<bookieAddress, BookieInfo>>` shape Pulsar
396 /// persists in metadata. Raw JSON because the wire shape exposes
397 /// nested maps that change between releases (the `default` group
398 /// is implicit on older brokers).
399 /// Java: `BookiesBase#getBookieRackInfo`.
400 pub async fn bookies_racks_info(&self) -> Result<serde_json::Value, AdminError> {
401 let url = self.url(&["bookies", "racks-info"])?;
402 let resp = self.send(self.http.request(Method::GET, url)).await?;
403 json_ok(resp).await
404 }
405
406 /// Set (or update) a bookie's rack assignment.
407 ///
408 /// `POST /admin/v2/bookies/racks-info/{bookie}?group={group}` with
409 /// a JSON [`BookieInfo`] body carrying only `{rack, hostname}`.
410 /// `bookie` is the `host:port` registered in `BookKeeper` metadata.
411 /// The placement policy picks up the new rack on its next
412 /// reconciliation tick.
413 ///
414 /// `group` is **a query parameter**, not a body field — Pulsar's
415 /// `BookiesBase#updateBookieRackInfo(@PathParam("bookie") String,
416 /// @QueryParam("group") String, BookieInfo)` Jackson-binds the body
417 /// to `{rack, hostname}` only; an unknown `group` body field is
418 /// silently ignored and the query param defaults to `null`,
419 /// dropping the operator's group choice on the wire.
420 /// Java: `BookiesBase#updateBookieRackInfo`.
421 pub async fn bookies_set_rack(
422 &self,
423 bookie: &str,
424 group: &str,
425 info: BookieInfo,
426 ) -> Result<(), AdminError> {
427 validate_segment(bookie)?;
428 let mut url = self.url(&["bookies", "racks-info", bookie])?;
429 url.query_pairs_mut().append_pair("group", group);
430 let resp = self
431 .send(self.http.request(Method::POST, url).json(&info))
432 .await?;
433 empty_ok(resp).await
434 }
435
436 /// Remove a bookie's rack assignment.
437 ///
438 /// `DELETE /admin/v2/bookies/racks-info/{bookie}`. The bookie falls
439 /// back to the placement policy's default group / rack until
440 /// [`Self::bookies_set_rack`] is called again.
441 /// Java: `BookiesBase#deleteBookieRackInfo`.
442 pub async fn bookies_delete_rack(&self, bookie: &str) -> Result<(), AdminError> {
443 validate_segment(bookie)?;
444 let url = self.url(&["bookies", "racks-info", bookie])?;
445 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
446 empty_ok(resp).await
447 }
448
449 // --- Schemas ---------------------------------------------------------
450
451 /// Get the latest schema attached to a topic.
452 ///
453 /// `GET /admin/v2/schemas/{tenant}/{ns}/{topic}/schema`. Returns
454 /// `{ version, type, schema, properties, timestamp }`; raw JSON
455 /// because the `type` axis (`AVRO` / `JSON` / `PROTOBUF` /
456 /// `PROTOBUF_NATIVE` / `KEY_VALUE` / `STRING` / `BYTES` / …) is
457 /// open-ended and broker minor versions add keys (deletion
458 /// tombstones surface as `type: "DELETE"` on the GET, for
459 /// instance). Java: `SchemasResourceBase#getSchema`.
460 pub async fn schema_get_latest(&self, topic: &str) -> Result<serde_json::Value, AdminError> {
461 let (tenant, namespace, name) = split_topic(topic)?;
462 let url = self.url(&["schemas", tenant, namespace, name, "schema"])?;
463 let resp = self.send(self.http.request(Method::GET, url)).await?;
464 json_ok(resp).await
465 }
466
467 /// Get a specific schema version attached to a topic.
468 ///
469 /// `GET /admin/v2/schemas/{tenant}/{ns}/{topic}/schema/{version}`.
470 /// `version` is the monotonically-increasing integer the broker
471 /// assigns at registration. Same wire shape as
472 /// [`Self::schema_get_latest`].
473 /// Java: `SchemasResourceBase#getSchema` (with version path param).
474 pub async fn schema_get_version(
475 &self,
476 topic: &str,
477 version: i64,
478 ) -> Result<serde_json::Value, AdminError> {
479 let (tenant, namespace, name) = split_topic(topic)?;
480 let v = version.to_string();
481 let url = self.url(&["schemas", tenant, namespace, name, "schema", &v])?;
482 let resp = self.send(self.http.request(Method::GET, url)).await?;
483 json_ok(resp).await
484 }
485
486 /// List every schema version registered for a topic.
487 ///
488 /// `GET /admin/v2/schemas/{tenant}/{ns}/{topic}/schemas`. Pulsar 4
489 /// wraps the per-version entries in a
490 /// `GetAllVersionsSchemaResponse { getSchemaResponses: [...] }`
491 /// envelope (verified against `apache/pulsar@v4.0.4`
492 /// `SchemasResourceBase#convertToAllVersionsSchemaResponse`). We
493 /// unwrap that envelope at the boundary so callers see the flat
494 /// `Vec<Value>` they expect; a bare-array shape (older or
495 /// alternative serialisations) is still accepted. Raw JSON
496 /// per-entry for forward-compat. Java:
497 /// `SchemasResourceBase#getAllSchemas`.
498 pub async fn schema_list_versions(
499 &self,
500 topic: &str,
501 ) -> Result<Vec<serde_json::Value>, AdminError> {
502 let (tenant, namespace, name) = split_topic(topic)?;
503 let url = self.url(&["schemas", tenant, namespace, name, "schemas"])?;
504 let resp = self.send(self.http.request(Method::GET, url)).await?;
505 let v: serde_json::Value = json_ok(resp).await?;
506 match v {
507 serde_json::Value::Array(items) => Ok(items),
508 serde_json::Value::Object(mut envelope) => {
509 if let Some(serde_json::Value::Array(items)) = envelope.remove("getSchemaResponses")
510 {
511 return Ok(items);
512 }
513 Err(AdminError::Protocol(format!(
514 "schemas/.../schemas envelope missing `getSchemaResponses` array: {}",
515 serde_json::Value::Object(envelope)
516 )))
517 }
518 other => Err(AdminError::Protocol(format!(
519 "schemas/.../schemas returned unexpected shape: {other}"
520 ))),
521 }
522 }
523
524 /// Register a new schema version on a topic.
525 ///
526 /// `POST /admin/v2/schemas/{tenant}/{ns}/{topic}/schema` with a JSON
527 /// [`PostSchemaPayload`] body. The broker returns `{ version: N }`;
528 /// raw JSON because the upstream response envelope wraps the
529 /// version under `data` on some 4.x point releases. Compatibility
530 /// is enforced server-side per the namespace's
531 /// `schemaCompatibilityStrategy` — incompatible posts fail with
532 /// 409. Java: `SchemasResourceBase#postSchema`.
533 pub async fn schema_post(
534 &self,
535 topic: &str,
536 payload: PostSchemaPayload,
537 ) -> Result<serde_json::Value, AdminError> {
538 let (tenant, namespace, name) = split_topic(topic)?;
539 let url = self.url(&["schemas", tenant, namespace, name, "schema"])?;
540 let resp = self
541 .send(self.http.request(Method::POST, url).json(&payload))
542 .await?;
543 json_ok(resp).await
544 }
545
546 /// Delete a topic's schema.
547 ///
548 /// `DELETE /admin/v2/schemas/{tenant}/{ns}/{topic}/schema?force={force}`.
549 /// `force = true` skips the broker's "is the schema in use"
550 /// guard — equivalent to `pulsar-admin schemas delete --force`.
551 /// Java: `SchemasResourceBase#deleteSchema`.
552 pub async fn schema_delete(&self, topic: &str, force: bool) -> Result<(), AdminError> {
553 let (tenant, namespace, name) = split_topic(topic)?;
554 let mut url = self.url(&["schemas", tenant, namespace, name, "schema"])?;
555 url.query_pairs_mut()
556 .append_pair("force", if force { "true" } else { "false" });
557 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
558 empty_ok(resp).await
559 }
560
561 /// Check whether a candidate schema would be compatible with the
562 /// topic's current schema.
563 ///
564 /// `POST /admin/v2/schemas/{tenant}/{ns}/{topic}/compatibility` with
565 /// a JSON [`PostSchemaPayload`] body — the same shape
566 /// [`Self::schema_post`] sends, but the broker only evaluates
567 /// compatibility and never persists. Returns `{ isCompatible:
568 /// bool, schemaCompatibilityStrategy: "..." }`; raw JSON for
569 /// forward-compat.
570 /// Java: `SchemasResourceBase#testCompatibility`.
571 pub async fn schema_compatibility_check(
572 &self,
573 topic: &str,
574 payload: PostSchemaPayload,
575 ) -> Result<serde_json::Value, AdminError> {
576 let (tenant, namespace, name) = split_topic(topic)?;
577 let url = self.url(&["schemas", tenant, namespace, name, "compatibility"])?;
578 let resp = self
579 .send(self.http.request(Method::POST, url).json(&payload))
580 .await?;
581 json_ok(resp).await
582 }
583
584 // --- Pulsar Functions (read) ----------------------------------------
585
586 /// List every function registered under a namespace.
587 ///
588 /// `GET /admin/v3/functions/{tenant}/{namespace}`. Returns a JSON
589 /// array of bare function names (no tenant / namespace prefix) —
590 /// matches Java's `FunctionsBase#listFunctions` body shape.
591 pub async fn functions_list_by_namespace(
592 &self,
593 tenant: &str,
594 namespace: &str,
595 ) -> Result<Vec<String>, AdminError> {
596 validate_segment(tenant)?;
597 validate_segment(namespace)?;
598 let url = self.url_v3(&["functions", tenant, namespace])?;
599 let resp = self.send(self.http.request(Method::GET, url)).await?;
600 json_ok(resp).await
601 }
602
603 /// Get a function's registered `FunctionConfig`.
604 ///
605 /// `GET /admin/v3/functions/{tenant}/{namespace}/{name}`. The
606 /// `FunctionConfig` Java type has ~30 fields and grows on every
607 /// minor release — return raw JSON for forward-compat. Java:
608 /// `FunctionsBase#getFunctionInfo`.
609 pub async fn function_get(
610 &self,
611 tenant: &str,
612 namespace: &str,
613 name: &str,
614 ) -> Result<serde_json::Value, AdminError> {
615 validate_segment(tenant)?;
616 validate_segment(namespace)?;
617 validate_segment(name)?;
618 let url = self.url_v3(&["functions", tenant, namespace, name])?;
619 let resp = self.send(self.http.request(Method::GET, url)).await?;
620 json_ok(resp).await
621 }
622
623 /// Get a function's aggregate status (all instances).
624 ///
625 /// `GET /admin/v3/functions/{tenant}/{namespace}/{name}/status`.
626 /// Returns Java's `FunctionStatus` envelope:
627 /// `{numInstances, numRunning, instances: [...]}`. Raw JSON because
628 /// the per-instance shape carries broker-version-dependent fields.
629 /// Java: `FunctionsBase#getFunctionStatus`.
630 pub async fn function_status(
631 &self,
632 tenant: &str,
633 namespace: &str,
634 name: &str,
635 ) -> Result<serde_json::Value, AdminError> {
636 validate_segment(tenant)?;
637 validate_segment(namespace)?;
638 validate_segment(name)?;
639 let url = self.url_v3(&["functions", tenant, namespace, name, "status"])?;
640 let resp = self.send(self.http.request(Method::GET, url)).await?;
641 json_ok(resp).await
642 }
643
644 /// Get a function's aggregate runtime statistics (all instances).
645 ///
646 /// `GET /admin/v3/functions/{tenant}/{namespace}/{name}/stats`.
647 /// Returns Java's `FunctionStats` envelope — message rates,
648 /// processed counts, average latency, per-instance breakdown. Raw
649 /// JSON for forward-compat. Java: `FunctionsBase#getFunctionStats`.
650 pub async fn function_stats(
651 &self,
652 tenant: &str,
653 namespace: &str,
654 name: &str,
655 ) -> Result<serde_json::Value, AdminError> {
656 validate_segment(tenant)?;
657 validate_segment(namespace)?;
658 validate_segment(name)?;
659 let url = self.url_v3(&["functions", tenant, namespace, name, "stats"])?;
660 let resp = self.send(self.http.request(Method::GET, url)).await?;
661 json_ok(resp).await
662 }
663
664 /// Get one instance's status.
665 ///
666 /// `GET /admin/v3/functions/{tenant}/{namespace}/{name}/{instance_id}/status`.
667 /// `instance_id` is the integer index the broker assigns at
668 /// schedule time (`0..parallelism`). Java:
669 /// `FunctionsBase#getFunctionInstanceStatus`.
670 pub async fn function_instance_status(
671 &self,
672 tenant: &str,
673 namespace: &str,
674 name: &str,
675 instance_id: i32,
676 ) -> Result<serde_json::Value, AdminError> {
677 validate_segment(tenant)?;
678 validate_segment(namespace)?;
679 validate_segment(name)?;
680 let id = instance_id.to_string();
681 let url = self.url_v3(&["functions", tenant, namespace, name, &id, "status"])?;
682 let resp = self.send(self.http.request(Method::GET, url)).await?;
683 json_ok(resp).await
684 }
685
686 /// Get one instance's runtime statistics.
687 ///
688 /// `GET /admin/v3/functions/{tenant}/{namespace}/{name}/{instance_id}/stats`.
689 /// Java: `FunctionsBase#getFunctionInstanceStats`.
690 pub async fn function_instance_stats(
691 &self,
692 tenant: &str,
693 namespace: &str,
694 name: &str,
695 instance_id: i32,
696 ) -> Result<serde_json::Value, AdminError> {
697 validate_segment(tenant)?;
698 validate_segment(namespace)?;
699 validate_segment(name)?;
700 let id = instance_id.to_string();
701 let url = self.url_v3(&["functions", tenant, namespace, name, &id, "stats"])?;
702 let resp = self.send(self.http.request(Method::GET, url)).await?;
703 json_ok(resp).await
704 }
705
706 // --- Pulsar Functions (URL-based register / update) ----------------
707
708 /// Register a function from a remote package URL.
709 ///
710 /// `POST /admin/v3/functions/{tenant}/{namespace}/{name}` with a
711 /// `multipart/form-data` body carrying two parts: `url=<pkg-url>`
712 /// (the broker-resolvable HTTP / `function://` / `file://` URL of
713 /// the compiled package) and `functionConfig=<json>` (the
714 /// serialised [`FunctionConfig`]). The local-file upload path
715 /// (Java's `FunctionsBase#registerFunction` with a
716 /// `FormDataMultiPart` `data` part) is intentionally out of scope
717 /// for this method — operators with a pre-built JAR served over
718 /// HTTP / S3 / GCS use the URL path.
719 ///
720 /// The two-part envelope matches Java's
721 /// `FunctionsBase#registerFunction(@PathParam tenant, ...,
722 /// @FormDataParam("url") String functionPkgUrl,
723 /// @FormDataParam("functionConfig") FunctionConfig functionConfig)`
724 /// — when `data` is null and `url` is non-null the broker takes the
725 /// URL fast-path and skips the upload step.
726 pub async fn function_create_with_url(
727 &self,
728 tenant: &str,
729 namespace: &str,
730 name: &str,
731 url: &str,
732 config: FunctionConfig,
733 ) -> Result<(), AdminError> {
734 validate_segment(tenant)?;
735 validate_segment(namespace)?;
736 validate_segment(name)?;
737 let endpoint = self.url_v3(&["functions", tenant, namespace, name])?;
738 let form = function_pkg_form(url, &config)?;
739 let resp = self
740 .send(
741 self.http
742 .request(Method::POST, endpoint)
743 .multipart(form)
744 .timeout(PACKAGE_REGISTER_TIMEOUT),
745 )
746 .await?;
747 empty_ok(resp).await
748 }
749
750 /// Update an existing function from a remote package URL.
751 ///
752 /// `PUT /admin/v3/functions/{tenant}/{namespace}/{name}` with the
753 /// same two-part `multipart/form-data` shape as
754 /// [`Self::function_create_with_url`]. Java:
755 /// `FunctionsBase#updateFunction` with non-null `pkgUrl`.
756 pub async fn function_update_with_url(
757 &self,
758 tenant: &str,
759 namespace: &str,
760 name: &str,
761 url: &str,
762 config: FunctionConfig,
763 ) -> Result<(), AdminError> {
764 validate_segment(tenant)?;
765 validate_segment(namespace)?;
766 validate_segment(name)?;
767 let endpoint = self.url_v3(&["functions", tenant, namespace, name])?;
768 let form = function_pkg_form(url, &config)?;
769 let resp = self
770 .send(
771 self.http
772 .request(Method::PUT, endpoint)
773 .multipart(form)
774 .timeout(PACKAGE_REGISTER_TIMEOUT),
775 )
776 .await?;
777 empty_ok(resp).await
778 }
779
780 // --- Pulsar Functions (lifecycle) -----------------------------------
781
782 /// Deregister (delete) a function.
783 ///
784 /// `DELETE /admin/v3/functions/{tenant}/{namespace}/{name}`. The
785 /// broker stops every running instance and drops the
786 /// `FunctionConfig` from metadata. Java:
787 /// `FunctionsBase#deregisterFunction`.
788 pub async fn function_delete(
789 &self,
790 tenant: &str,
791 namespace: &str,
792 name: &str,
793 ) -> Result<(), AdminError> {
794 validate_segment(tenant)?;
795 validate_segment(namespace)?;
796 validate_segment(name)?;
797 let url = self.url_v3(&["functions", tenant, namespace, name])?;
798 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
799 empty_ok(resp).await
800 }
801
802 /// Start every instance of a function (idempotent).
803 ///
804 /// `POST /admin/v3/functions/{tenant}/{namespace}/{name}/start`. No
805 /// body. Java: `FunctionsBase#startFunction`.
806 pub async fn function_start(
807 &self,
808 tenant: &str,
809 namespace: &str,
810 name: &str,
811 ) -> Result<(), AdminError> {
812 validate_segment(tenant)?;
813 validate_segment(namespace)?;
814 validate_segment(name)?;
815 let url = self.url_v3(&["functions", tenant, namespace, name, "start"])?;
816 let resp = self.send(self.http.request(Method::POST, url)).await?;
817 empty_ok(resp).await
818 }
819
820 /// Stop every instance of a function.
821 ///
822 /// `POST /admin/v3/functions/{tenant}/{namespace}/{name}/stop`. The
823 /// broker leaves the `FunctionConfig` in metadata; a subsequent
824 /// `function_start` brings it back. Java:
825 /// `FunctionsBase#stopFunction`.
826 pub async fn function_stop(
827 &self,
828 tenant: &str,
829 namespace: &str,
830 name: &str,
831 ) -> Result<(), AdminError> {
832 validate_segment(tenant)?;
833 validate_segment(namespace)?;
834 validate_segment(name)?;
835 let url = self.url_v3(&["functions", tenant, namespace, name, "stop"])?;
836 let resp = self.send(self.http.request(Method::POST, url)).await?;
837 empty_ok(resp).await
838 }
839
840 /// Restart every instance of a function.
841 ///
842 /// `POST /admin/v3/functions/{tenant}/{namespace}/{name}/restart`.
843 /// Java: `FunctionsBase#restartFunction`.
844 pub async fn function_restart(
845 &self,
846 tenant: &str,
847 namespace: &str,
848 name: &str,
849 ) -> Result<(), AdminError> {
850 validate_segment(tenant)?;
851 validate_segment(namespace)?;
852 validate_segment(name)?;
853 let url = self.url_v3(&["functions", tenant, namespace, name, "restart"])?;
854 let resp = self.send(self.http.request(Method::POST, url)).await?;
855 empty_ok(resp).await
856 }
857
858 /// Start one specific instance.
859 ///
860 /// `POST /admin/v3/functions/{tenant}/{namespace}/{name}/{instance_id}/start`.
861 /// Java: `FunctionsBase#startFunctionInstance`.
862 pub async fn function_start_instance(
863 &self,
864 tenant: &str,
865 namespace: &str,
866 name: &str,
867 instance_id: i32,
868 ) -> Result<(), AdminError> {
869 validate_segment(tenant)?;
870 validate_segment(namespace)?;
871 validate_segment(name)?;
872 let id = instance_id.to_string();
873 let url = self.url_v3(&["functions", tenant, namespace, name, &id, "start"])?;
874 let resp = self.send(self.http.request(Method::POST, url)).await?;
875 empty_ok(resp).await
876 }
877
878 /// Stop one specific instance.
879 ///
880 /// `POST /admin/v3/functions/{tenant}/{namespace}/{name}/{instance_id}/stop`.
881 /// Java: `FunctionsBase#stopFunctionInstance`.
882 pub async fn function_stop_instance(
883 &self,
884 tenant: &str,
885 namespace: &str,
886 name: &str,
887 instance_id: i32,
888 ) -> Result<(), AdminError> {
889 validate_segment(tenant)?;
890 validate_segment(namespace)?;
891 validate_segment(name)?;
892 let id = instance_id.to_string();
893 let url = self.url_v3(&["functions", tenant, namespace, name, &id, "stop"])?;
894 let resp = self.send(self.http.request(Method::POST, url)).await?;
895 empty_ok(resp).await
896 }
897
898 // --- Tenants ---------------------------------------------------------
899
900 /// List tenants.
901 ///
902 /// `GET /admin/v2/tenants`.
903 /// Java: `pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Tenants.java`
904 /// (`@Path("/tenants")`) + `admin/impl/TenantsBase.java#getTenants`.
905 pub async fn tenants_list(&self) -> Result<Vec<String>, AdminError> {
906 let url = self.url(&["tenants"])?;
907 let resp = self.send(self.http.request(Method::GET, url)).await?;
908 json_ok(resp).await
909 }
910
911 /// Create a tenant.
912 ///
913 /// `PUT /admin/v2/tenants/{tenant}` with a JSON [`TenantInfo`] body.
914 /// Java: `pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantsBase.java#
915 /// createTenant`.
916 pub async fn tenant_create(&self, name: &str, info: TenantInfo) -> Result<(), AdminError> {
917 let url = self.url(&["tenants", name])?;
918 let resp = self
919 .send(self.http.request(Method::PUT, url).json(&info))
920 .await?;
921 empty_ok(resp).await
922 }
923
924 /// Delete a tenant.
925 ///
926 /// `DELETE /admin/v2/tenants/{tenant}`.
927 /// Java: `pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantsBase.java#
928 /// deleteTenant`.
929 pub async fn tenant_delete(&self, name: &str) -> Result<(), AdminError> {
930 let url = self.url(&["tenants", name])?;
931 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
932 empty_ok(resp).await
933 }
934
935 // --- Namespaces ------------------------------------------------------
936
937 /// List namespaces under a tenant.
938 ///
939 /// `GET /admin/v2/namespaces/{tenant}`.
940 /// Java: `pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java`
941 /// (`@Path("/namespaces")` + `@Path("/{tenant}")`).
942 pub async fn namespaces_list(&self, tenant: &str) -> Result<Vec<String>, AdminError> {
943 let url = self.url(&["namespaces", tenant])?;
944 let resp = self.send(self.http.request(Method::GET, url)).await?;
945 json_ok(resp).await
946 }
947
948 /// Create a namespace.
949 ///
950 /// `PUT /admin/v2/namespaces/{tenant}/{namespace}`. The namespace argument
951 /// is `tenant/namespace`, matching how Pulsar expresses fully qualified
952 /// namespace names on the wire and CLI.
953 /// Java: `pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java`
954 /// (`@PUT @Path("/{tenant}/{namespace}")`).
955 pub async fn namespace_create(&self, ns: &str) -> Result<(), AdminError> {
956 let (tenant, namespace) = split_namespace(ns)?;
957 let url = self.url(&["namespaces", tenant, namespace])?;
958 let resp = self.send(self.http.request(Method::PUT, url)).await?;
959 empty_ok(resp).await
960 }
961
962 /// Delete a namespace.
963 ///
964 /// `DELETE /admin/v2/namespaces/{tenant}/{namespace}`.
965 /// Java: `pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java`
966 /// (`@DELETE @Path("/{tenant}/{namespace}")`).
967 pub async fn namespace_delete(&self, ns: &str) -> Result<(), AdminError> {
968 let (tenant, namespace) = split_namespace(ns)?;
969 let url = self.url(&["namespaces", tenant, namespace])?;
970 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
971 empty_ok(resp).await
972 }
973
974 /// Get a namespace's retention policy.
975 ///
976 /// `GET /admin/v2/namespaces/{tenant}/{ns}/retention`.
977 /// Returns `RetentionPolicies { retentionTimeInMinutes, retentionSizeInMB }`.
978 /// A fresh namespace, or a namespace whose retention was just
979 /// removed, surfaces as 204 / empty body / `null` — we fold those
980 /// to `RetentionPolicies::default()` (broker semantic).
981 /// Java: `NamespacesBase#getRetention`.
982 pub async fn namespace_get_retention(&self, ns: &str) -> Result<RetentionPolicies, AdminError> {
983 let (tenant, namespace) = split_namespace(ns)?;
984 let url = self.url(&["namespaces", tenant, namespace, "retention"])?;
985 let resp = self.send(self.http.request(Method::GET, url)).await?;
986 json_ok_or_default(resp).await
987 }
988
989 /// Set a namespace's retention policy.
990 ///
991 /// `POST /admin/v2/namespaces/{tenant}/{ns}/retention` with a JSON
992 /// `RetentionPolicies` body. `-1` means infinite (size or time).
993 /// Java: `NamespacesBase#setRetention`.
994 pub async fn namespace_set_retention(
995 &self,
996 ns: &str,
997 policy: RetentionPolicies,
998 ) -> Result<(), AdminError> {
999 let (tenant, namespace) = split_namespace(ns)?;
1000 let url = self.url(&["namespaces", tenant, namespace, "retention"])?;
1001 let resp = self
1002 .send(self.http.request(Method::POST, url).json(&policy))
1003 .await?;
1004 empty_ok(resp).await
1005 }
1006
1007 /// Remove a namespace's retention policy (fall back to broker default).
1008 ///
1009 /// `DELETE /admin/v2/namespaces/{tenant}/{ns}/retention`.
1010 /// Java: `NamespacesBase#removeRetention`.
1011 pub async fn namespace_remove_retention(&self, ns: &str) -> Result<(), AdminError> {
1012 let (tenant, namespace) = split_namespace(ns)?;
1013 let url = self.url(&["namespaces", tenant, namespace, "retention"])?;
1014 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
1015 empty_ok(resp).await
1016 }
1017
1018 /// Get all backlog-quota policies on a namespace.
1019 ///
1020 /// `GET /admin/v2/namespaces/{tenant}/{ns}/backlogQuotaMap`. Returns
1021 /// `Map<BacklogQuotaType, BacklogQuota>` — kept as raw JSON because
1022 /// broker versions add quota types (`message_age` since 2.10).
1023 /// Java: `NamespacesBase#getBacklogQuotaMap`.
1024 pub async fn namespace_get_backlog_quotas(
1025 &self,
1026 ns: &str,
1027 ) -> Result<serde_json::Value, AdminError> {
1028 let (tenant, namespace) = split_namespace(ns)?;
1029 let url = self.url(&["namespaces", tenant, namespace, "backlogQuotaMap"])?;
1030 let resp = self.send(self.http.request(Method::GET, url)).await?;
1031 json_ok(resp).await
1032 }
1033
1034 /// Set a backlog-quota policy on a namespace.
1035 ///
1036 /// `POST /admin/v2/namespaces/{tenant}/{ns}/backlogQuota?backlogQuotaType={type}`
1037 /// with a JSON `BacklogQuota` body. `backlog_quota_type` selects which
1038 /// dimension to limit (`destination_storage` for byte size, `message_age`
1039 /// for wall-clock TTL).
1040 /// Java: `NamespacesBase#setBacklogQuota`.
1041 pub async fn namespace_set_backlog_quota(
1042 &self,
1043 ns: &str,
1044 backlog_quota_type: BacklogQuotaType,
1045 quota: BacklogQuota,
1046 ) -> Result<(), AdminError> {
1047 let (tenant, namespace) = split_namespace(ns)?;
1048 let mut url = self.url(&["namespaces", tenant, namespace, "backlogQuota"])?;
1049 url.query_pairs_mut()
1050 .append_pair("backlogQuotaType", backlog_quota_type.as_query_value());
1051 let resp = self
1052 .send(self.http.request(Method::POST, url).json("a))
1053 .await?;
1054 empty_ok(resp).await
1055 }
1056
1057 /// Remove a backlog-quota policy from a namespace.
1058 ///
1059 /// `DELETE /admin/v2/namespaces/{tenant}/{ns}/backlogQuota?backlogQuotaType={type}`.
1060 /// Java: `NamespacesBase#removeBacklogQuota`.
1061 pub async fn namespace_remove_backlog_quota(
1062 &self,
1063 ns: &str,
1064 backlog_quota_type: BacklogQuotaType,
1065 ) -> Result<(), AdminError> {
1066 let (tenant, namespace) = split_namespace(ns)?;
1067 let mut url = self.url(&["namespaces", tenant, namespace, "backlogQuota"])?;
1068 url.query_pairs_mut()
1069 .append_pair("backlogQuotaType", backlog_quota_type.as_query_value());
1070 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
1071 empty_ok(resp).await
1072 }
1073
1074 /// Get a namespace's message-TTL (seconds).
1075 ///
1076 /// `GET /admin/v2/namespaces/{tenant}/{ns}/messageTTL`. Returns a
1077 /// bare integer (or `null` if no TTL is set — which decodes as
1078 /// `Option::None`).
1079 /// Java: `NamespacesBase#getNamespaceMessageTTL`.
1080 pub async fn namespace_get_message_ttl(&self, ns: &str) -> Result<Option<i32>, AdminError> {
1081 let (tenant, namespace) = split_namespace(ns)?;
1082 let url = self.url(&["namespaces", tenant, namespace, "messageTTL"])?;
1083 let resp = self.send(self.http.request(Method::GET, url)).await?;
1084 json_ok_optional(resp).await
1085 }
1086
1087 /// Set a namespace's message-TTL (seconds).
1088 ///
1089 /// `POST /admin/v2/namespaces/{tenant}/{ns}/messageTTL` with a bare
1090 /// integer body. `0` disables (broker treats as no TTL).
1091 /// Java: `NamespacesBase#setNamespaceMessageTTL`.
1092 pub async fn namespace_set_message_ttl(
1093 &self,
1094 ns: &str,
1095 ttl_seconds: i32,
1096 ) -> Result<(), AdminError> {
1097 let (tenant, namespace) = split_namespace(ns)?;
1098 let url = self.url(&["namespaces", tenant, namespace, "messageTTL"])?;
1099 let resp = self
1100 .send(self.http.request(Method::POST, url).json(&ttl_seconds))
1101 .await?;
1102 empty_ok(resp).await
1103 }
1104
1105 /// Remove a namespace's message-TTL (fall back to broker default).
1106 ///
1107 /// `DELETE /admin/v2/namespaces/{tenant}/{ns}/messageTTL`.
1108 /// Java: `NamespacesBase#removeNamespaceMessageTTL`.
1109 pub async fn namespace_remove_message_ttl(&self, ns: &str) -> Result<(), AdminError> {
1110 let (tenant, namespace) = split_namespace(ns)?;
1111 let url = self.url(&["namespaces", tenant, namespace, "messageTTL"])?;
1112 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
1113 empty_ok(resp).await
1114 }
1115
1116 // --- Namespace policies — persistence + rates ----------------------
1117
1118 /// Get a namespace's persistence policy.
1119 ///
1120 /// `GET /admin/v2/namespaces/{tenant}/{ns}/persistence`. Returns the
1121 /// `BookKeeper` ensemble / write-quorum / ack-quorum triple plus the
1122 /// managed-ledger mark-delete rate cap. `null` body decodes to
1123 /// `PersistencePolicies::default()` via `#[serde(default)]`.
1124 /// Java: `NamespacesBase#getPersistence`.
1125 pub async fn namespace_get_persistence(
1126 &self,
1127 ns: &str,
1128 ) -> Result<PersistencePolicies, AdminError> {
1129 let (tenant, namespace) = split_namespace(ns)?;
1130 let url = self.url(&["namespaces", tenant, namespace, "persistence"])?;
1131 let resp = self.send(self.http.request(Method::GET, url)).await?;
1132 json_ok_or_default(resp).await
1133 }
1134
1135 /// Set a namespace's persistence policy.
1136 ///
1137 /// `POST /admin/v2/namespaces/{tenant}/{ns}/persistence` with a JSON
1138 /// `PersistencePolicies` body.
1139 /// Java: `NamespacesBase#setPersistence`.
1140 pub async fn namespace_set_persistence(
1141 &self,
1142 ns: &str,
1143 policy: PersistencePolicies,
1144 ) -> Result<(), AdminError> {
1145 let (tenant, namespace) = split_namespace(ns)?;
1146 let url = self.url(&["namespaces", tenant, namespace, "persistence"])?;
1147 let resp = self
1148 .send(self.http.request(Method::POST, url).json(&policy))
1149 .await?;
1150 empty_ok(resp).await
1151 }
1152
1153 /// Remove a namespace's persistence policy (fall back to broker default).
1154 ///
1155 /// `DELETE /admin/v2/namespaces/{tenant}/{ns}/persistence`.
1156 /// Java: `NamespacesBase#deletePersistence`.
1157 pub async fn namespace_remove_persistence(&self, ns: &str) -> Result<(), AdminError> {
1158 let (tenant, namespace) = split_namespace(ns)?;
1159 let url = self.url(&["namespaces", tenant, namespace, "persistence"])?;
1160 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
1161 empty_ok(resp).await
1162 }
1163
1164 /// Get a namespace's consumer dispatch-rate policy.
1165 ///
1166 /// `GET /admin/v2/namespaces/{tenant}/{ns}/dispatchRate`. Returns
1167 /// the per-namespace consumer-dispatch throttle (msg/sec, byte/sec,
1168 /// window in seconds). `-1` on either dimension means unlimited.
1169 /// Java: `NamespacesBase#getDispatchRate`.
1170 pub async fn namespace_get_dispatch_rate(&self, ns: &str) -> Result<DispatchRate, AdminError> {
1171 let (tenant, namespace) = split_namespace(ns)?;
1172 let url = self.url(&["namespaces", tenant, namespace, "dispatchRate"])?;
1173 let resp = self.send(self.http.request(Method::GET, url)).await?;
1174 json_ok_or_default(resp).await
1175 }
1176
1177 /// Set a namespace's consumer dispatch-rate policy.
1178 ///
1179 /// `POST /admin/v2/namespaces/{tenant}/{ns}/dispatchRate` with a
1180 /// JSON `DispatchRate` body.
1181 /// Java: `NamespacesBase#setDispatchRate`.
1182 pub async fn namespace_set_dispatch_rate(
1183 &self,
1184 ns: &str,
1185 rate: DispatchRate,
1186 ) -> Result<(), AdminError> {
1187 let (tenant, namespace) = split_namespace(ns)?;
1188 let url = self.url(&["namespaces", tenant, namespace, "dispatchRate"])?;
1189 let resp = self
1190 .send(self.http.request(Method::POST, url).json(&rate))
1191 .await?;
1192 empty_ok(resp).await
1193 }
1194
1195 /// Remove a namespace's consumer dispatch-rate policy.
1196 ///
1197 /// `DELETE /admin/v2/namespaces/{tenant}/{ns}/dispatchRate`.
1198 /// Java: `NamespacesBase#deleteDispatchRate`.
1199 pub async fn namespace_remove_dispatch_rate(&self, ns: &str) -> Result<(), AdminError> {
1200 let (tenant, namespace) = split_namespace(ns)?;
1201 let url = self.url(&["namespaces", tenant, namespace, "dispatchRate"])?;
1202 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
1203 empty_ok(resp).await
1204 }
1205
1206 /// Get a namespace's per-subscription dispatch-rate policy.
1207 ///
1208 /// `GET /admin/v2/namespaces/{tenant}/{ns}/subscriptionDispatchRate`.
1209 /// Reuses the [`DispatchRate`] body shape — the policy applies per
1210 /// subscription rather than aggregated across all consumers.
1211 /// Java: `NamespacesBase#getSubscriptionDispatchRate`.
1212 pub async fn namespace_get_subscription_dispatch_rate(
1213 &self,
1214 ns: &str,
1215 ) -> Result<DispatchRate, AdminError> {
1216 let (tenant, namespace) = split_namespace(ns)?;
1217 let url = self.url(&["namespaces", tenant, namespace, "subscriptionDispatchRate"])?;
1218 let resp = self.send(self.http.request(Method::GET, url)).await?;
1219 json_ok(resp).await
1220 }
1221
1222 /// Set a namespace's per-subscription dispatch-rate policy.
1223 ///
1224 /// `POST /admin/v2/namespaces/{tenant}/{ns}/subscriptionDispatchRate`
1225 /// with a JSON `DispatchRate` body.
1226 /// Java: `NamespacesBase#setSubscriptionDispatchRate`.
1227 pub async fn namespace_set_subscription_dispatch_rate(
1228 &self,
1229 ns: &str,
1230 rate: DispatchRate,
1231 ) -> Result<(), AdminError> {
1232 let (tenant, namespace) = split_namespace(ns)?;
1233 let url = self.url(&["namespaces", tenant, namespace, "subscriptionDispatchRate"])?;
1234 let resp = self
1235 .send(self.http.request(Method::POST, url).json(&rate))
1236 .await?;
1237 empty_ok(resp).await
1238 }
1239
1240 /// Remove a namespace's per-subscription dispatch-rate policy.
1241 ///
1242 /// `DELETE /admin/v2/namespaces/{tenant}/{ns}/subscriptionDispatchRate`.
1243 /// Java: `NamespacesBase#deleteSubscriptionDispatchRate`.
1244 pub async fn namespace_remove_subscription_dispatch_rate(
1245 &self,
1246 ns: &str,
1247 ) -> Result<(), AdminError> {
1248 let (tenant, namespace) = split_namespace(ns)?;
1249 let url = self.url(&["namespaces", tenant, namespace, "subscriptionDispatchRate"])?;
1250 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
1251 empty_ok(resp).await
1252 }
1253
1254 /// Get a namespace's cross-cluster replicator dispatch-rate policy.
1255 ///
1256 /// `GET /admin/v2/namespaces/{tenant}/{ns}/replicatorDispatchRate`.
1257 /// Reuses the [`DispatchRate`] body shape — the policy throttles
1258 /// outbound geo-replication traffic from this cluster.
1259 /// Java: `NamespacesBase#getReplicatorDispatchRate`.
1260 pub async fn namespace_get_replicator_dispatch_rate(
1261 &self,
1262 ns: &str,
1263 ) -> Result<DispatchRate, AdminError> {
1264 let (tenant, namespace) = split_namespace(ns)?;
1265 let url = self.url(&["namespaces", tenant, namespace, "replicatorDispatchRate"])?;
1266 let resp = self.send(self.http.request(Method::GET, url)).await?;
1267 json_ok(resp).await
1268 }
1269
1270 /// Set a namespace's cross-cluster replicator dispatch-rate policy.
1271 ///
1272 /// `POST /admin/v2/namespaces/{tenant}/{ns}/replicatorDispatchRate`
1273 /// with a JSON `DispatchRate` body.
1274 /// Java: `NamespacesBase#setReplicatorDispatchRate`.
1275 pub async fn namespace_set_replicator_dispatch_rate(
1276 &self,
1277 ns: &str,
1278 rate: DispatchRate,
1279 ) -> Result<(), AdminError> {
1280 let (tenant, namespace) = split_namespace(ns)?;
1281 let url = self.url(&["namespaces", tenant, namespace, "replicatorDispatchRate"])?;
1282 let resp = self
1283 .send(self.http.request(Method::POST, url).json(&rate))
1284 .await?;
1285 empty_ok(resp).await
1286 }
1287
1288 /// Remove a namespace's cross-cluster replicator dispatch-rate policy.
1289 ///
1290 /// `DELETE /admin/v2/namespaces/{tenant}/{ns}/replicatorDispatchRate`.
1291 /// Java: `NamespacesBase#removeReplicatorDispatchRate`.
1292 pub async fn namespace_remove_replicator_dispatch_rate(
1293 &self,
1294 ns: &str,
1295 ) -> Result<(), AdminError> {
1296 let (tenant, namespace) = split_namespace(ns)?;
1297 let url = self.url(&["namespaces", tenant, namespace, "replicatorDispatchRate"])?;
1298 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
1299 empty_ok(resp).await
1300 }
1301
1302 /// Get a namespace's publish-rate policy.
1303 ///
1304 /// `GET /admin/v2/namespaces/{tenant}/{ns}/publishRate`. Returns
1305 /// the producer-side throttle (msg/sec + byte/sec). `-1` on either
1306 /// dimension means unlimited.
1307 /// Java: `NamespacesBase#getPublishRate`.
1308 pub async fn namespace_get_publish_rate(&self, ns: &str) -> Result<PublishRate, AdminError> {
1309 let (tenant, namespace) = split_namespace(ns)?;
1310 let url = self.url(&["namespaces", tenant, namespace, "publishRate"])?;
1311 let resp = self.send(self.http.request(Method::GET, url)).await?;
1312 json_ok_or_default(resp).await
1313 }
1314
1315 /// Set a namespace's publish-rate policy.
1316 ///
1317 /// `POST /admin/v2/namespaces/{tenant}/{ns}/publishRate` with a JSON
1318 /// `PublishRate` body.
1319 /// Java: `NamespacesBase#setPublishRate`.
1320 pub async fn namespace_set_publish_rate(
1321 &self,
1322 ns: &str,
1323 rate: PublishRate,
1324 ) -> Result<(), AdminError> {
1325 let (tenant, namespace) = split_namespace(ns)?;
1326 let url = self.url(&["namespaces", tenant, namespace, "publishRate"])?;
1327 let resp = self
1328 .send(self.http.request(Method::POST, url).json(&rate))
1329 .await?;
1330 empty_ok(resp).await
1331 }
1332
1333 /// Remove a namespace's publish-rate policy.
1334 ///
1335 /// `DELETE /admin/v2/namespaces/{tenant}/{ns}/publishRate`.
1336 /// Java: `NamespacesBase#removePublishRate`.
1337 pub async fn namespace_remove_publish_rate(&self, ns: &str) -> Result<(), AdminError> {
1338 let (tenant, namespace) = split_namespace(ns)?;
1339 let url = self.url(&["namespaces", tenant, namespace, "publishRate"])?;
1340 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
1341 empty_ok(resp).await
1342 }
1343
1344 // --- Namespace policies — limits + dedup + delayed delivery -----
1345
1346 /// Get a namespace's broker-side message deduplication flag.
1347 ///
1348 /// `GET /admin/v2/namespaces/{tenant}/{ns}/deduplication`. Returns a
1349 /// bare JSON boolean, or `null` (decoded as `None`) when the policy
1350 /// is unset and the broker default applies.
1351 /// Java: `NamespacesBase#getDeduplication`.
1352 pub async fn namespace_get_deduplication(&self, ns: &str) -> Result<Option<bool>, AdminError> {
1353 let (tenant, namespace) = split_namespace(ns)?;
1354 let url = self.url(&["namespaces", tenant, namespace, "deduplication"])?;
1355 let resp = self.send(self.http.request(Method::GET, url)).await?;
1356 json_ok_optional(resp).await
1357 }
1358
1359 /// Set a namespace's broker-side message deduplication flag.
1360 ///
1361 /// `POST /admin/v2/namespaces/{tenant}/{ns}/deduplication` with a
1362 /// bare JSON boolean body.
1363 /// Java: `NamespacesBase#modifyDeduplication`.
1364 pub async fn namespace_set_deduplication(
1365 &self,
1366 ns: &str,
1367 enabled: bool,
1368 ) -> Result<(), AdminError> {
1369 let (tenant, namespace) = split_namespace(ns)?;
1370 let url = self.url(&["namespaces", tenant, namespace, "deduplication"])?;
1371 let resp = self
1372 .send(self.http.request(Method::POST, url).json(&enabled))
1373 .await?;
1374 empty_ok(resp).await
1375 }
1376
1377 /// Remove a namespace's deduplication flag (fall back to broker default).
1378 ///
1379 /// `DELETE /admin/v2/namespaces/{tenant}/{ns}/deduplication`.
1380 /// Java: `NamespacesBase#removeDeduplication`.
1381 pub async fn namespace_remove_deduplication(&self, ns: &str) -> Result<(), AdminError> {
1382 let (tenant, namespace) = split_namespace(ns)?;
1383 let url = self.url(&["namespaces", tenant, namespace, "deduplication"])?;
1384 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
1385 empty_ok(resp).await
1386 }
1387
1388 /// Get a namespace's deduplication-snapshot interval (entries).
1389 ///
1390 /// `GET /admin/v2/namespaces/{tenant}/{ns}/deduplicationSnapshotInterval`.
1391 /// Returns a bare integer (the entry count between dedup cursor
1392 /// snapshots), or `null` (decoded as `None`) when the broker default
1393 /// applies.
1394 /// Java: `NamespacesBase#getDeduplicationSnapshotInterval`.
1395 pub async fn namespace_get_deduplication_snapshot_interval(
1396 &self,
1397 ns: &str,
1398 ) -> Result<Option<i32>, AdminError> {
1399 let (tenant, namespace) = split_namespace(ns)?;
1400 let url = self.url(&[
1401 "namespaces",
1402 tenant,
1403 namespace,
1404 "deduplicationSnapshotInterval",
1405 ])?;
1406 let resp = self.send(self.http.request(Method::GET, url)).await?;
1407 json_ok_optional(resp).await
1408 }
1409
1410 /// Set a namespace's deduplication-snapshot interval (entries).
1411 ///
1412 /// `POST /admin/v2/namespaces/{tenant}/{ns}/deduplicationSnapshotInterval`
1413 /// with a bare JSON integer body.
1414 /// Java: `NamespacesBase#setDeduplicationSnapshotInterval`.
1415 pub async fn namespace_set_deduplication_snapshot_interval(
1416 &self,
1417 ns: &str,
1418 interval_entries: i32,
1419 ) -> Result<(), AdminError> {
1420 let (tenant, namespace) = split_namespace(ns)?;
1421 let url = self.url(&[
1422 "namespaces",
1423 tenant,
1424 namespace,
1425 "deduplicationSnapshotInterval",
1426 ])?;
1427 let resp = self
1428 .send(self.http.request(Method::POST, url).json(&interval_entries))
1429 .await?;
1430 empty_ok(resp).await
1431 }
1432
1433 /// Remove a namespace's deduplication-snapshot interval override.
1434 ///
1435 /// `DELETE /admin/v2/namespaces/{tenant}/{ns}/deduplicationSnapshotInterval`.
1436 /// Java: `NamespacesBase#deleteDeduplicationSnapshotInterval`.
1437 pub async fn namespace_remove_deduplication_snapshot_interval(
1438 &self,
1439 ns: &str,
1440 ) -> Result<(), AdminError> {
1441 let (tenant, namespace) = split_namespace(ns)?;
1442 let url = self.url(&[
1443 "namespaces",
1444 tenant,
1445 namespace,
1446 "deduplicationSnapshotInterval",
1447 ])?;
1448 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
1449 empty_ok(resp).await
1450 }
1451
1452 /// Get a namespace's compaction threshold (bytes).
1453 ///
1454 /// `GET /admin/v2/namespaces/{tenant}/{ns}/compactionThreshold`. Returns
1455 /// a bare integer (bytes of accumulated topic backlog above which the
1456 /// broker triggers automatic compaction), or `null` (decoded as `None`)
1457 /// when the broker default applies.
1458 /// Java: `NamespacesBase#getCompactionThreshold`.
1459 pub async fn namespace_get_compaction_threshold(
1460 &self,
1461 ns: &str,
1462 ) -> Result<Option<i64>, AdminError> {
1463 let (tenant, namespace) = split_namespace(ns)?;
1464 let url = self.url(&["namespaces", tenant, namespace, "compactionThreshold"])?;
1465 let resp = self.send(self.http.request(Method::GET, url)).await?;
1466 json_ok_optional(resp).await
1467 }
1468
1469 /// Set a namespace's compaction threshold (bytes).
1470 ///
1471 /// `PUT /admin/v2/namespaces/{tenant}/{ns}/compactionThreshold` with
1472 /// a bare JSON long body. `0` disables automatic compaction.
1473 ///
1474 /// Note: this endpoint is `@PUT` in Pulsar 4 (`Namespaces.java`
1475 /// declares `@PUT @Path("/{tenant}/{namespace}/compactionThreshold")`),
1476 /// not POST — using POST yields a `405 Method Not Allowed`.
1477 /// Java: `NamespacesBase#setCompactionThreshold`.
1478 pub async fn namespace_set_compaction_threshold(
1479 &self,
1480 ns: &str,
1481 threshold_bytes: i64,
1482 ) -> Result<(), AdminError> {
1483 let (tenant, namespace) = split_namespace(ns)?;
1484 let url = self.url(&["namespaces", tenant, namespace, "compactionThreshold"])?;
1485 let resp = self
1486 .send(self.http.request(Method::PUT, url).json(&threshold_bytes))
1487 .await?;
1488 empty_ok(resp).await
1489 }
1490
1491 /// Remove a namespace's compaction threshold override.
1492 ///
1493 /// `DELETE /admin/v2/namespaces/{tenant}/{ns}/compactionThreshold`.
1494 /// Java: `NamespacesBase#deleteCompactionThreshold`.
1495 pub async fn namespace_remove_compaction_threshold(&self, ns: &str) -> Result<(), AdminError> {
1496 let (tenant, namespace) = split_namespace(ns)?;
1497 let url = self.url(&["namespaces", tenant, namespace, "compactionThreshold"])?;
1498 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
1499 empty_ok(resp).await
1500 }
1501
1502 /// Get a namespace's delayed-delivery policy.
1503 ///
1504 /// `GET /admin/v2/namespaces/{tenant}/{ns}/delayedDelivery`. Returns
1505 /// the active flag + tick time (the broker's index-tick granularity
1506 /// for delivering delayed messages). `null` decodes as `None`.
1507 /// Java: `NamespacesBase#getDelayedDeliveryPolicies`.
1508 pub async fn namespace_get_delayed_delivery(
1509 &self,
1510 ns: &str,
1511 ) -> Result<Option<DelayedDeliveryPolicies>, AdminError> {
1512 let (tenant, namespace) = split_namespace(ns)?;
1513 let url = self.url(&["namespaces", tenant, namespace, "delayedDelivery"])?;
1514 let resp = self.send(self.http.request(Method::GET, url)).await?;
1515 json_ok_optional(resp).await
1516 }
1517
1518 /// Set a namespace's delayed-delivery policy.
1519 ///
1520 /// `POST /admin/v2/namespaces/{tenant}/{ns}/delayedDelivery` with a
1521 /// JSON `DelayedDeliveryPolicies` body.
1522 /// Java: `NamespacesBase#setDelayedDeliveryPolicies`.
1523 pub async fn namespace_set_delayed_delivery(
1524 &self,
1525 ns: &str,
1526 policy: DelayedDeliveryPolicies,
1527 ) -> Result<(), AdminError> {
1528 let (tenant, namespace) = split_namespace(ns)?;
1529 let url = self.url(&["namespaces", tenant, namespace, "delayedDelivery"])?;
1530 let resp = self
1531 .send(self.http.request(Method::POST, url).json(&policy))
1532 .await?;
1533 empty_ok(resp).await
1534 }
1535
1536 /// Remove a namespace's delayed-delivery policy override.
1537 ///
1538 /// `DELETE /admin/v2/namespaces/{tenant}/{ns}/delayedDelivery`.
1539 /// Java: `NamespacesBase#removeDelayedDeliveryPolicies`.
1540 pub async fn namespace_remove_delayed_delivery(&self, ns: &str) -> Result<(), AdminError> {
1541 let (tenant, namespace) = split_namespace(ns)?;
1542 let url = self.url(&["namespaces", tenant, namespace, "delayedDelivery"])?;
1543 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
1544 empty_ok(resp).await
1545 }
1546
1547 /// Get a namespace's max-producers-per-topic limit.
1548 ///
1549 /// `GET /admin/v2/namespaces/{tenant}/{ns}/maxProducersPerTopic`. Returns
1550 /// a bare integer (the per-topic ceiling on concurrent producer
1551 /// connections), or `null` (decoded as `None`) when the broker default
1552 /// applies.
1553 /// Java: `NamespacesBase#getMaxProducersPerTopic`.
1554 pub async fn namespace_get_max_producers_per_topic(
1555 &self,
1556 ns: &str,
1557 ) -> Result<Option<i32>, AdminError> {
1558 let (tenant, namespace) = split_namespace(ns)?;
1559 let url = self.url(&["namespaces", tenant, namespace, "maxProducersPerTopic"])?;
1560 let resp = self.send(self.http.request(Method::GET, url)).await?;
1561 json_ok_optional(resp).await
1562 }
1563
1564 /// Set a namespace's max-producers-per-topic limit.
1565 ///
1566 /// `POST /admin/v2/namespaces/{tenant}/{ns}/maxProducersPerTopic` with
1567 /// a bare JSON integer body. `0` disables the limit.
1568 /// Java: `NamespacesBase#setMaxProducersPerTopic`.
1569 pub async fn namespace_set_max_producers_per_topic(
1570 &self,
1571 ns: &str,
1572 max_producers: i32,
1573 ) -> Result<(), AdminError> {
1574 let (tenant, namespace) = split_namespace(ns)?;
1575 let url = self.url(&["namespaces", tenant, namespace, "maxProducersPerTopic"])?;
1576 let resp = self
1577 .send(self.http.request(Method::POST, url).json(&max_producers))
1578 .await?;
1579 empty_ok(resp).await
1580 }
1581
1582 /// Remove a namespace's max-producers-per-topic limit override.
1583 ///
1584 /// `DELETE /admin/v2/namespaces/{tenant}/{ns}/maxProducersPerTopic`.
1585 /// Java: `NamespacesBase#removeMaxProducersPerTopic`.
1586 pub async fn namespace_remove_max_producers_per_topic(
1587 &self,
1588 ns: &str,
1589 ) -> Result<(), AdminError> {
1590 let (tenant, namespace) = split_namespace(ns)?;
1591 let url = self.url(&["namespaces", tenant, namespace, "maxProducersPerTopic"])?;
1592 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
1593 empty_ok(resp).await
1594 }
1595
1596 /// Get a namespace's max-consumers-per-topic limit.
1597 ///
1598 /// `GET /admin/v2/namespaces/{tenant}/{ns}/maxConsumersPerTopic`. Returns
1599 /// a bare integer (the per-topic ceiling on concurrent consumer
1600 /// connections across all subscriptions), or `null` (decoded as `None`)
1601 /// when the broker default applies.
1602 /// Java: `NamespacesBase#getMaxConsumersPerTopic`.
1603 pub async fn namespace_get_max_consumers_per_topic(
1604 &self,
1605 ns: &str,
1606 ) -> Result<Option<i32>, AdminError> {
1607 let (tenant, namespace) = split_namespace(ns)?;
1608 let url = self.url(&["namespaces", tenant, namespace, "maxConsumersPerTopic"])?;
1609 let resp = self.send(self.http.request(Method::GET, url)).await?;
1610 json_ok_optional(resp).await
1611 }
1612
1613 /// Set a namespace's max-consumers-per-topic limit.
1614 ///
1615 /// `POST /admin/v2/namespaces/{tenant}/{ns}/maxConsumersPerTopic` with
1616 /// a bare JSON integer body. `0` disables the limit.
1617 /// Java: `NamespacesBase#setMaxConsumersPerTopic`.
1618 pub async fn namespace_set_max_consumers_per_topic(
1619 &self,
1620 ns: &str,
1621 max_consumers: i32,
1622 ) -> Result<(), AdminError> {
1623 let (tenant, namespace) = split_namespace(ns)?;
1624 let url = self.url(&["namespaces", tenant, namespace, "maxConsumersPerTopic"])?;
1625 let resp = self
1626 .send(self.http.request(Method::POST, url).json(&max_consumers))
1627 .await?;
1628 empty_ok(resp).await
1629 }
1630
1631 /// Remove a namespace's max-consumers-per-topic limit override.
1632 ///
1633 /// `DELETE /admin/v2/namespaces/{tenant}/{ns}/maxConsumersPerTopic`.
1634 /// Java: `NamespacesBase#removeMaxConsumersPerTopic`.
1635 pub async fn namespace_remove_max_consumers_per_topic(
1636 &self,
1637 ns: &str,
1638 ) -> Result<(), AdminError> {
1639 let (tenant, namespace) = split_namespace(ns)?;
1640 let url = self.url(&["namespaces", tenant, namespace, "maxConsumersPerTopic"])?;
1641 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
1642 empty_ok(resp).await
1643 }
1644
1645 /// Get a namespace's max-unacked-messages-per-consumer limit.
1646 ///
1647 /// `GET /admin/v2/namespaces/{tenant}/{ns}/maxUnackedMessagesPerConsumer`.
1648 /// Returns a bare integer (the broker's per-consumer permit-pool cap
1649 /// before it stops dispatching), or `null` (decoded as `None`) when
1650 /// the broker default applies.
1651 /// Java: `NamespacesBase#getMaxUnackedMessagesPerConsumer`.
1652 pub async fn namespace_get_max_unacked_messages_per_consumer(
1653 &self,
1654 ns: &str,
1655 ) -> Result<Option<i32>, AdminError> {
1656 let (tenant, namespace) = split_namespace(ns)?;
1657 let url = self.url(&[
1658 "namespaces",
1659 tenant,
1660 namespace,
1661 "maxUnackedMessagesPerConsumer",
1662 ])?;
1663 let resp = self.send(self.http.request(Method::GET, url)).await?;
1664 json_ok_optional(resp).await
1665 }
1666
1667 /// Set a namespace's max-unacked-messages-per-consumer limit.
1668 ///
1669 /// `POST /admin/v2/namespaces/{tenant}/{ns}/maxUnackedMessagesPerConsumer`
1670 /// with a bare JSON integer body. `0` disables the limit.
1671 /// Java: `NamespacesBase#setMaxUnackedMessagesPerConsumer`.
1672 pub async fn namespace_set_max_unacked_messages_per_consumer(
1673 &self,
1674 ns: &str,
1675 max_unacked: i32,
1676 ) -> Result<(), AdminError> {
1677 let (tenant, namespace) = split_namespace(ns)?;
1678 let url = self.url(&[
1679 "namespaces",
1680 tenant,
1681 namespace,
1682 "maxUnackedMessagesPerConsumer",
1683 ])?;
1684 let resp = self
1685 .send(self.http.request(Method::POST, url).json(&max_unacked))
1686 .await?;
1687 empty_ok(resp).await
1688 }
1689
1690 /// Remove a namespace's max-unacked-messages-per-consumer override.
1691 ///
1692 /// `DELETE /admin/v2/namespaces/{tenant}/{ns}/maxUnackedMessagesPerConsumer`.
1693 /// Java: `NamespacesBase#removeMaxUnackedMessagesPerConsumer`.
1694 pub async fn namespace_remove_max_unacked_messages_per_consumer(
1695 &self,
1696 ns: &str,
1697 ) -> Result<(), AdminError> {
1698 let (tenant, namespace) = split_namespace(ns)?;
1699 let url = self.url(&[
1700 "namespaces",
1701 tenant,
1702 namespace,
1703 "maxUnackedMessagesPerConsumer",
1704 ])?;
1705 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
1706 empty_ok(resp).await
1707 }
1708
1709 /// Get a namespace's max-unacked-messages-per-subscription limit.
1710 ///
1711 /// `GET /admin/v2/namespaces/{tenant}/{ns}/maxUnackedMessagesPerSubscription`.
1712 /// Returns a bare integer (the broker's per-subscription unacked
1713 /// ceiling — once exceeded the broker stops dispatching to every
1714 /// consumer on that subscription), or `null` (decoded as `None`)
1715 /// when the broker default applies.
1716 /// Java: `NamespacesBase#getMaxUnackedMessagesPerSubscription`.
1717 pub async fn namespace_get_max_unacked_messages_per_subscription(
1718 &self,
1719 ns: &str,
1720 ) -> Result<Option<i32>, AdminError> {
1721 let (tenant, namespace) = split_namespace(ns)?;
1722 let url = self.url(&[
1723 "namespaces",
1724 tenant,
1725 namespace,
1726 "maxUnackedMessagesPerSubscription",
1727 ])?;
1728 let resp = self.send(self.http.request(Method::GET, url)).await?;
1729 json_ok_optional(resp).await
1730 }
1731
1732 /// Set a namespace's max-unacked-messages-per-subscription limit.
1733 ///
1734 /// `POST /admin/v2/namespaces/{tenant}/{ns}/maxUnackedMessagesPerSubscription`
1735 /// with a bare JSON integer body. `0` disables the limit.
1736 /// Java: `NamespacesBase#setMaxUnackedMessagesPerSubscription`.
1737 pub async fn namespace_set_max_unacked_messages_per_subscription(
1738 &self,
1739 ns: &str,
1740 max_unacked: i32,
1741 ) -> Result<(), AdminError> {
1742 let (tenant, namespace) = split_namespace(ns)?;
1743 let url = self.url(&[
1744 "namespaces",
1745 tenant,
1746 namespace,
1747 "maxUnackedMessagesPerSubscription",
1748 ])?;
1749 let resp = self
1750 .send(self.http.request(Method::POST, url).json(&max_unacked))
1751 .await?;
1752 empty_ok(resp).await
1753 }
1754
1755 /// Remove a namespace's max-unacked-messages-per-subscription override.
1756 ///
1757 /// `DELETE /admin/v2/namespaces/{tenant}/{ns}/maxUnackedMessagesPerSubscription`.
1758 /// Java: `NamespacesBase#removeMaxUnackedMessagesPerSubscription`.
1759 pub async fn namespace_remove_max_unacked_messages_per_subscription(
1760 &self,
1761 ns: &str,
1762 ) -> Result<(), AdminError> {
1763 let (tenant, namespace) = split_namespace(ns)?;
1764 let url = self.url(&[
1765 "namespaces",
1766 tenant,
1767 namespace,
1768 "maxUnackedMessagesPerSubscription",
1769 ])?;
1770 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
1771 empty_ok(resp).await
1772 }
1773
1774 // --- Topics ----------------------------------------------------------
1775
1776 /// List persistent topics in a namespace.
1777 ///
1778 /// `GET /admin/v2/persistent/{tenant}/{namespace}`.
1779 /// Java: `pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java`
1780 /// (`@Path("/persistent")` + `@GET @Path("/{tenant}/{namespace}")`).
1781 pub async fn topics_list(&self, namespace: &str) -> Result<Vec<String>, AdminError> {
1782 let (tenant, namespace) = split_namespace(namespace)?;
1783 let url = self.url(&["persistent", tenant, namespace])?;
1784 let resp = self.send(self.http.request(Method::GET, url)).await?;
1785 json_ok(resp).await
1786 }
1787
1788 /// Create a non-partitioned persistent topic.
1789 ///
1790 /// `PUT /admin/v2/persistent/{tenant}/{namespace}/{topic}`.
1791 /// Java: `PersistentTopics.java#createNonPartitionedTopic`
1792 /// (`@PUT @Path("/{tenant}/{namespace}/{topic}")`).
1793 pub async fn topic_create_non_partitioned(&self, topic: &str) -> Result<(), AdminError> {
1794 self.topic_create_non_partitioned_with_properties(topic, &HashMap::new())
1795 .await
1796 }
1797
1798 async fn topic_create_non_partitioned_with_properties(
1799 &self,
1800 topic: &str,
1801 properties: &HashMap<String, String>,
1802 ) -> Result<(), AdminError> {
1803 let (tenant, namespace, name) = split_topic(topic)?;
1804 let url = self.url(&["persistent", tenant, namespace, name])?;
1805 let resp = self
1806 .send(self.http.request(Method::PUT, url).json(properties))
1807 .await?;
1808 empty_ok(resp).await
1809 }
1810
1811 /// Create a partitioned topic with `partitions` partitions.
1812 ///
1813 /// `PUT /admin/v2/persistent/{tenant}/{namespace}/{topic}/partitions`
1814 /// with the partition count as a JSON integer body.
1815 /// Java: `PersistentTopics.java#createPartitionedTopic`
1816 /// (`@PUT @Path("/{tenant}/{namespace}/{topic}/partitions")`).
1817 pub async fn topic_create_partitioned(
1818 &self,
1819 topic: &str,
1820 partitions: u32,
1821 ) -> Result<(), AdminError> {
1822 let (tenant, namespace, name) = split_topic(topic)?;
1823 let url = self.url(&["persistent", tenant, namespace, name, "partitions"])?;
1824 let resp = self
1825 .send(self.http.request(Method::PUT, url).json(&partitions))
1826 .await?;
1827 empty_ok(resp).await
1828 }
1829
1830 /// Delete a topic, auto-detecting partitioned vs non-partitioned.
1831 ///
1832 /// Pulsar exposes two distinct delete endpoints — the partitioned
1833 /// parent uses `DELETE .../{topic}/partitions?force=…` and the
1834 /// non-partitioned topic uses `DELETE .../{topic}?force=…`. Hitting
1835 /// the partitioned endpoint on a non-partitioned topic returns 404
1836 /// ("Topic is not partitioned"), which used to surface as "the
1837 /// topic doesn't exist" to operators using `magnetar admin topics
1838 /// delete`.
1839 ///
1840 /// Probe via `topic_partitions_count` first (`GET .../partitions`
1841 /// returns `partitions: 0` for non-partitioned topics, `> 0` for a
1842 /// partitioned parent) and route to the matching endpoint. Same
1843 /// behaviour as pulsarctl's `topics delete`.
1844 ///
1845 /// Java: `PersistentTopics.java#deletePartitionedTopic` /
1846 /// `PersistentTopics.java#deleteTopic`.
1847 pub async fn topic_delete(&self, topic: &str, force: bool) -> Result<(), AdminError> {
1848 let (tenant, namespace, name) = split_topic(topic)?;
1849 let partitions = self.topic_partitions_count(topic).await?;
1850 let force_str = if force { "true" } else { "false" };
1851 let mut url = if partitions > 0 {
1852 self.url(&["persistent", tenant, namespace, name, "partitions"])?
1853 } else {
1854 self.url(&["persistent", tenant, namespace, name])?
1855 };
1856 url.query_pairs_mut().append_pair("force", force_str);
1857 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
1858 empty_ok(resp).await
1859 }
1860
1861 /// Get topic stats.
1862 ///
1863 /// `GET /admin/v2/persistent/{tenant}/{namespace}/{topic}/stats`.
1864 /// Java: `PersistentTopics.java#getStats`
1865 /// (`@GET @Path("/{tenant}/{namespace}/{topic}/stats")`,
1866 /// response shape `PersistentTopicStats`).
1867 ///
1868 /// For a **partitioned** topic, the broker returns 404 on this endpoint
1869 /// because there is no ledger backing the parent name. Call
1870 /// [`Self::topic_partitioned_stats`] instead, or look up the count via
1871 /// [`Self::topic_partitions_count`] first.
1872 pub async fn topic_stats(&self, topic: &str) -> Result<TopicStats, AdminError> {
1873 let (tenant, namespace, name) = split_topic(topic)?;
1874 let url = self.url(&["persistent", tenant, namespace, name, "stats"])?;
1875 let resp = self.send(self.http.request(Method::GET, url)).await?;
1876 json_ok(resp).await
1877 }
1878
1879 /// Get aggregated stats for a partitioned topic.
1880 ///
1881 /// `GET /admin/v2/persistent/{tenant}/{namespace}/{topic}/partitioned-stats?
1882 /// perPartition=false`. Java: `PersistentTopics.java#getPartitionedStats`
1883 /// (`@GET @Path("/{tenant}/{namespace}/{topic}/partitioned-stats")`,
1884 /// response shape `PartitionedTopicStats` which extends
1885 /// `PersistentTopicStats` with `partitions: Map<String, TopicStats>`
1886 /// and `metadata: PartitionedTopicMetadata`).
1887 ///
1888 /// magnetar exposes only the aggregated top-level counters through the
1889 /// same [`TopicStats`] shape — the broker populates `msgInCounter`,
1890 /// `bytesInCounter`, `publishers`, `subscriptions` at the response root
1891 /// summed across partitions. The `partitions` and `metadata` fields are
1892 /// dropped on deserialisation; for per-partition detail call
1893 /// [`Self::topic_stats`] on each `<topic>-partition-N` instead. We pass
1894 /// `perPartition=false` to keep the wire response small.
1895 pub async fn topic_partitioned_stats(&self, topic: &str) -> Result<TopicStats, AdminError> {
1896 let (tenant, namespace, name) = split_topic(topic)?;
1897 let mut url = self.url(&["persistent", tenant, namespace, name, "partitioned-stats"])?;
1898 url.query_pairs_mut().append_pair("perPartition", "false");
1899 let resp = self.send(self.http.request(Method::GET, url)).await?;
1900 json_ok(resp).await
1901 }
1902
1903 /// Resolve the partition count of a topic.
1904 ///
1905 /// `GET /admin/v2/persistent/{tenant}/{namespace}/{topic}/partitions`.
1906 /// Java: `PersistentTopics.java#getPartitionedMetadata`
1907 /// (`@GET @Path("/{tenant}/{namespace}/{topic}/partitions")`,
1908 /// response shape `PartitionedTopicMetadata{ partitions: int }`).
1909 ///
1910 /// Returns `0` for non-partitioned topics; lets a caller disambiguate
1911 /// between [`Self::topic_stats`] and [`Self::topic_partitioned_stats`]
1912 /// when the topology is not known in advance.
1913 pub async fn topic_partitions_count(&self, topic: &str) -> Result<u32, AdminError> {
1914 let (tenant, namespace, name) = split_topic(topic)?;
1915 let url = self.url(&["persistent", tenant, namespace, name, "partitions"])?;
1916 let resp = self.send(self.http.request(Method::GET, url)).await?;
1917 let meta: PartitionedTopicMetadata = json_ok(resp).await?;
1918 Ok(meta.partitions)
1919 }
1920
1921 /// Resolve a broker-entry-metadata `index` to a [`MessageId`] (PIP-415).
1922 ///
1923 /// `GET /admin/v2/persistent/{tenant}/{namespace}/{topic}/getMessageIdByIndex?index={index}`.
1924 /// Per [PIP-415](https://github.com/apache/pulsar/blob/master/pip/pip-415.md)
1925 /// this is **REST-only** — the spec's "Binary protocol" section is
1926 /// intentionally empty and the canonical implementation PR
1927 /// [`apache/pulsar#24222`](https://github.com/apache/pulsar/pull/24222)
1928 /// (merged 2025-06-23) touches only admin / broker / CLI Java code.
1929 ///
1930 /// Java:
1931 /// `pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java`
1932 /// (`@GET @Path("/{tenant}/{namespace}/{topic}/getMessageIdByIndex")`,
1933 /// `@QueryParam("index") long`); admin-client side is
1934 /// `pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/
1935 /// TopicsImpl.java#getMessageIdByIndexAsync` which deserialises the
1936 /// response into `MessageIdImpl` (i.e. `{ledgerId, entryId, partitionIndex}`).
1937 ///
1938 /// `topic` follows the same rule as every other topic-scoped method:
1939 /// either `persistent://tenant/ns/topic` or `tenant/ns/topic`. For a
1940 /// partitioned topic, pass the specific partition (`my-topic-partition-0`).
1941 ///
1942 /// The response carries only `(ledgerId, entryId, partitionIndex)`. The
1943 /// returned [`MessageId`] sets `batch_index = -1` and `batch_size = -1`
1944 /// because the broker resolves at entry granularity — see PIP-415 §"Why
1945 /// Precise Index Matching Isn't Implemented on the Broker Side".
1946 pub async fn topic_get_message_id_by_index(
1947 &self,
1948 topic: &str,
1949 index: i64,
1950 ) -> Result<MessageId, AdminError> {
1951 let (tenant, namespace, name) = split_topic(topic)?;
1952 let mut url = self.url(&["persistent", tenant, namespace, name, "getMessageIdByIndex"])?;
1953 url.query_pairs_mut()
1954 .append_pair("index", &index.to_string());
1955 let resp = self.send(self.http.request(Method::GET, url)).await?;
1956 let dto: MessageIdResponse = json_ok(resp).await?;
1957 dto.try_into_message_id()
1958 }
1959
1960 /// Trigger ledger compaction for a topic.
1961 ///
1962 /// `PUT /admin/v2/persistent/{tenant}/{namespace}/{topic}/compaction`.
1963 /// Returns 204 on success; the broker queues the work asynchronously —
1964 /// poll [`Self::topic_compaction_status`] to observe progress.
1965 /// Java: `PersistentTopics#triggerCompaction`.
1966 pub async fn topic_compact(&self, topic: &str) -> Result<(), AdminError> {
1967 let (tenant, namespace, name) = split_topic(topic)?;
1968 let url = self.url(&["persistent", tenant, namespace, name, "compaction"])?;
1969 let resp = self.send(self.http.request(Method::PUT, url)).await?;
1970 empty_ok(resp).await
1971 }
1972
1973 /// Get the current compaction status for a topic.
1974 ///
1975 /// `GET /admin/v2/persistent/{tenant}/{namespace}/{topic}/compaction`.
1976 /// Returns Java's `LongRunningProcessStatus`: `status` ∈ {`NOT_RUN`,
1977 /// `RUNNING`, `SUCCESS`, `ERROR`} plus an optional `lastError` string.
1978 /// Java: `PersistentTopics#compactionStatus`.
1979 pub async fn topic_compaction_status(
1980 &self,
1981 topic: &str,
1982 ) -> Result<LongRunningProcessStatus, AdminError> {
1983 let (tenant, namespace, name) = split_topic(topic)?;
1984 let url = self.url(&["persistent", tenant, namespace, name, "compaction"])?;
1985 let resp = self.send(self.http.request(Method::GET, url)).await?;
1986 json_ok(resp).await
1987 }
1988
1989 /// Unload a topic from its current broker (forces rebalancing).
1990 ///
1991 /// `PUT /admin/v2/persistent/{tenant}/{namespace}/{topic}/unload`.
1992 /// Operators use this to drain a hot broker or to re-elect ownership
1993 /// after a configuration change. Java: `PersistentTopics#unloadTopic`.
1994 pub async fn topic_unload(&self, topic: &str) -> Result<(), AdminError> {
1995 let (tenant, namespace, name) = split_topic(topic)?;
1996 let url = self.url(&["persistent", tenant, namespace, name, "unload"])?;
1997 let resp = self.send(self.http.request(Method::PUT, url)).await?;
1998 empty_ok(resp).await
1999 }
2000
2001 /// Terminate (seal) a topic — no further produces succeed.
2002 ///
2003 /// `POST /admin/v2/persistent/{tenant}/{namespace}/{topic}/terminate`.
2004 /// Returns the [`MessageId`] of the last message that landed before
2005 /// the seal, or `None` when the broker reports the
2006 /// `MessageIdImpl(-1, -1)` sentinel — meaning the topic was sealed
2007 /// before any confirmed entry was written (a freshly-created topic,
2008 /// or a topic whose owner was just re-elected). The Java client
2009 /// surfaces that case as `MessageId.earliest`; we use `Option` so
2010 /// callers don't have to special-case a magic value.
2011 /// Java: `PersistentTopics#terminate`.
2012 pub async fn topic_terminate(&self, topic: &str) -> Result<Option<MessageId>, AdminError> {
2013 let (tenant, namespace, name) = split_topic(topic)?;
2014 let url = self.url(&["persistent", tenant, namespace, name, "terminate"])?;
2015 let resp = self.send(self.http.request(Method::POST, url)).await?;
2016 let dto: MessageIdResponse = json_ok(resp).await?;
2017 if dto.ledger_id < 0 && dto.entry_id < 0 {
2018 return Ok(None);
2019 }
2020 dto.try_into_message_id().map(Some)
2021 }
2022
2023 /// Grow a partitioned topic's partition count.
2024 ///
2025 /// `POST /admin/v2/persistent/{tenant}/{namespace}/{topic}/partitions`
2026 /// with a bare JSON integer body. Only forward (grow) is supported by
2027 /// the broker — shrinking returns 409. Java:
2028 /// `PersistentTopics#updatePartitionedTopic`.
2029 pub async fn topic_update_partitions(
2030 &self,
2031 topic: &str,
2032 new_partitions: u32,
2033 ) -> Result<(), AdminError> {
2034 let (tenant, namespace, name) = split_topic(topic)?;
2035 let url = self.url(&["persistent", tenant, namespace, name, "partitions"])?;
2036 let resp = self
2037 .send(self.http.request(Method::POST, url).json(&new_partitions))
2038 .await?;
2039 empty_ok(resp).await
2040 }
2041
2042 // --- Topic policies — per-topic overrides ---------------------------
2043
2044 /// Get a topic's retention policy.
2045 ///
2046 /// `GET /admin/v2/persistent/{tenant}/{ns}/{topic}/retention`.
2047 /// Returns the per-topic [`RetentionPolicies`] override; the broker
2048 /// emits a `RetentionPolicies` JSON when the policy is set and a bare
2049 /// `null` (decoded as `RetentionPolicies::default()` via `#[serde(default)]`)
2050 /// when no override is in place — callers fall back to the namespace
2051 /// policy in that case. Java: `PersistentTopicsBase#getRetention`.
2052 pub async fn topic_get_retention(&self, topic: &str) -> Result<RetentionPolicies, AdminError> {
2053 let (tenant, namespace, name) = split_topic(topic)?;
2054 let url = self.url(&["persistent", tenant, namespace, name, "retention"])?;
2055 let resp = self.send(self.http.request(Method::GET, url)).await?;
2056 json_ok_or_default(resp).await
2057 }
2058
2059 /// Set a topic's retention policy (overrides the namespace default).
2060 ///
2061 /// `POST /admin/v2/persistent/{tenant}/{ns}/{topic}/retention` with a
2062 /// JSON `RetentionPolicies` body. `-1` means infinite (size or time).
2063 /// Java: `PersistentTopicsBase#setRetention`.
2064 pub async fn topic_set_retention(
2065 &self,
2066 topic: &str,
2067 policy: RetentionPolicies,
2068 ) -> Result<(), AdminError> {
2069 let (tenant, namespace, name) = split_topic(topic)?;
2070 let url = self.url(&["persistent", tenant, namespace, name, "retention"])?;
2071 let resp = self
2072 .send(self.http.request(Method::POST, url).json(&policy))
2073 .await?;
2074 empty_ok(resp).await
2075 }
2076
2077 /// Remove a topic's retention policy (fall back to namespace default).
2078 ///
2079 /// `DELETE /admin/v2/persistent/{tenant}/{ns}/{topic}/retention`.
2080 /// Java: `PersistentTopicsBase#removeRetention`.
2081 pub async fn topic_remove_retention(&self, topic: &str) -> Result<(), AdminError> {
2082 let (tenant, namespace, name) = split_topic(topic)?;
2083 let url = self.url(&["persistent", tenant, namespace, name, "retention"])?;
2084 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
2085 empty_ok(resp).await
2086 }
2087
2088 /// Get all backlog-quota policies on a topic.
2089 ///
2090 /// `GET /admin/v2/persistent/{tenant}/{ns}/{topic}/backlogQuotaMap`.
2091 /// Returns `Map<BacklogQuotaType, BacklogQuota>` — kept as raw JSON
2092 /// for the same reason as [`Self::namespace_get_backlog_quotas`]:
2093 /// broker minor versions add quota types.
2094 /// Java: `PersistentTopicsBase#getBacklogQuotaMap`.
2095 pub async fn topic_get_backlog_quotas(
2096 &self,
2097 topic: &str,
2098 ) -> Result<serde_json::Value, AdminError> {
2099 let (tenant, namespace, name) = split_topic(topic)?;
2100 let url = self.url(&["persistent", tenant, namespace, name, "backlogQuotaMap"])?;
2101 let resp = self.send(self.http.request(Method::GET, url)).await?;
2102 json_ok(resp).await
2103 }
2104
2105 /// Set a backlog-quota policy on a topic (overrides the namespace
2106 /// default for the matching `backlogQuotaType`).
2107 ///
2108 /// `POST /admin/v2/persistent/{tenant}/{ns}/{topic}/backlogQuota
2109 /// ?backlogQuotaType={type}` with a JSON `BacklogQuota` body.
2110 /// Java: `PersistentTopicsBase#setBacklogQuota`.
2111 pub async fn topic_set_backlog_quota(
2112 &self,
2113 topic: &str,
2114 backlog_quota_type: BacklogQuotaType,
2115 quota: BacklogQuota,
2116 ) -> Result<(), AdminError> {
2117 let (tenant, namespace, name) = split_topic(topic)?;
2118 let mut url = self.url(&["persistent", tenant, namespace, name, "backlogQuota"])?;
2119 url.query_pairs_mut()
2120 .append_pair("backlogQuotaType", backlog_quota_type.as_query_value());
2121 let resp = self
2122 .send(self.http.request(Method::POST, url).json("a))
2123 .await?;
2124 empty_ok(resp).await
2125 }
2126
2127 /// Remove a backlog-quota policy from a topic.
2128 ///
2129 /// `DELETE /admin/v2/persistent/{tenant}/{ns}/{topic}/backlogQuota
2130 /// ?backlogQuotaType={type}`.
2131 /// Java: `PersistentTopicsBase#removeBacklogQuota`.
2132 pub async fn topic_remove_backlog_quota(
2133 &self,
2134 topic: &str,
2135 backlog_quota_type: BacklogQuotaType,
2136 ) -> Result<(), AdminError> {
2137 let (tenant, namespace, name) = split_topic(topic)?;
2138 let mut url = self.url(&["persistent", tenant, namespace, name, "backlogQuota"])?;
2139 url.query_pairs_mut()
2140 .append_pair("backlogQuotaType", backlog_quota_type.as_query_value());
2141 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
2142 empty_ok(resp).await
2143 }
2144
2145 /// Get a topic's message-TTL (seconds, or `null` if unset).
2146 ///
2147 /// `GET /admin/v2/persistent/{tenant}/{ns}/{topic}/messageTTL`. Returns
2148 /// a bare integer when the override is set, `null` (decoded as
2149 /// `Option::None`) when no topic-level override is in place.
2150 /// Java: `PersistentTopicsBase#getMessageTTL`.
2151 pub async fn topic_get_message_ttl(&self, topic: &str) -> Result<Option<i32>, AdminError> {
2152 let (tenant, namespace, name) = split_topic(topic)?;
2153 let url = self.url(&["persistent", tenant, namespace, name, "messageTTL"])?;
2154 let resp = self.send(self.http.request(Method::GET, url)).await?;
2155 json_ok_optional(resp).await
2156 }
2157
2158 /// Set a topic's message-TTL (seconds).
2159 ///
2160 /// `POST /admin/v2/persistent/{tenant}/{ns}/{topic}/messageTTL?messageTTL={n}`.
2161 /// `0` disables (broker treats as no TTL).
2162 ///
2163 /// Note: unlike the **namespace**-level `setNamespaceMessageTTL` (which
2164 /// takes the TTL as a JSON int body), the topic-level setter binds
2165 /// `@QueryParam("messageTTL") Integer messageTTL` on both Pulsar 4.0 and
2166 /// 4.2 (`pulsar-broker/.../v2/PersistentTopics.java#setMessageTTL`).
2167 /// Sending the value as a JSON body returns 204 No Content but the
2168 /// broker reads the query param as `null` and silently treats the call
2169 /// as "no override" — the topic policy never persists, and a subsequent
2170 /// `topic_get_message_ttl` surfaces `Ok(None)`. Older Pulsar releases
2171 /// tolerated both encodings; 4.2 enforces the query-param shape.
2172 /// Java: `PersistentTopicsBase#setMessageTTL`.
2173 pub async fn topic_set_message_ttl(
2174 &self,
2175 topic: &str,
2176 ttl_seconds: i32,
2177 ) -> Result<(), AdminError> {
2178 let (tenant, namespace, name) = split_topic(topic)?;
2179 let mut url = self.url(&["persistent", tenant, namespace, name, "messageTTL"])?;
2180 url.query_pairs_mut()
2181 .append_pair("messageTTL", &ttl_seconds.to_string());
2182 let resp = self.send(self.http.request(Method::POST, url)).await?;
2183 empty_ok(resp).await
2184 }
2185
2186 /// Remove a topic's message-TTL (fall back to namespace default).
2187 ///
2188 /// `DELETE /admin/v2/persistent/{tenant}/{ns}/{topic}/messageTTL`.
2189 /// Java: `PersistentTopicsBase#removeMessageTTL`.
2190 pub async fn topic_remove_message_ttl(&self, topic: &str) -> Result<(), AdminError> {
2191 let (tenant, namespace, name) = split_topic(topic)?;
2192 let url = self.url(&["persistent", tenant, namespace, name, "messageTTL"])?;
2193 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
2194 empty_ok(resp).await
2195 }
2196
2197 /// Get a topic's persistence policy.
2198 ///
2199 /// `GET /admin/v2/persistent/{tenant}/{ns}/{topic}/persistence`. The
2200 /// broker emits a `PersistencePolicies` JSON when the topic override
2201 /// is set and `null` (decoded as `Option::None`) when no override is
2202 /// in place — callers fall back to the namespace policy in that case.
2203 /// Java: `PersistentTopicsBase#getPersistence`.
2204 pub async fn topic_get_persistence(
2205 &self,
2206 topic: &str,
2207 ) -> Result<Option<PersistencePolicies>, AdminError> {
2208 let (tenant, namespace, name) = split_topic(topic)?;
2209 let url = self.url(&["persistent", tenant, namespace, name, "persistence"])?;
2210 let resp = self.send(self.http.request(Method::GET, url)).await?;
2211 json_ok_optional(resp).await
2212 }
2213
2214 /// Set a topic's persistence policy (overrides the namespace default).
2215 ///
2216 /// `POST /admin/v2/persistent/{tenant}/{ns}/{topic}/persistence` with a
2217 /// JSON `PersistencePolicies` body.
2218 /// Java: `PersistentTopicsBase#setPersistence`.
2219 pub async fn topic_set_persistence(
2220 &self,
2221 topic: &str,
2222 policy: PersistencePolicies,
2223 ) -> Result<(), AdminError> {
2224 let (tenant, namespace, name) = split_topic(topic)?;
2225 let url = self.url(&["persistent", tenant, namespace, name, "persistence"])?;
2226 let resp = self
2227 .send(self.http.request(Method::POST, url).json(&policy))
2228 .await?;
2229 empty_ok(resp).await
2230 }
2231
2232 /// Remove a topic's persistence policy (fall back to namespace default).
2233 ///
2234 /// `DELETE /admin/v2/persistent/{tenant}/{ns}/{topic}/persistence`.
2235 /// Java: `PersistentTopicsBase#removePersistence`.
2236 pub async fn topic_remove_persistence(&self, topic: &str) -> Result<(), AdminError> {
2237 let (tenant, namespace, name) = split_topic(topic)?;
2238 let url = self.url(&["persistent", tenant, namespace, name, "persistence"])?;
2239 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
2240 empty_ok(resp).await
2241 }
2242
2243 /// Get a topic's consumer dispatch-rate policy (or `null` if no override).
2244 ///
2245 /// `GET /admin/v2/persistent/{tenant}/{ns}/{topic}/dispatchRate`. The
2246 /// broker emits the per-topic [`DispatchRate`] override or `null` when
2247 /// no override is set; callers fall back to the namespace policy in the
2248 /// `None` case. Java: `PersistentTopicsBase#getDispatchRate`.
2249 pub async fn topic_get_dispatch_rate(
2250 &self,
2251 topic: &str,
2252 ) -> Result<Option<DispatchRate>, AdminError> {
2253 let (tenant, namespace, name) = split_topic(topic)?;
2254 let url = self.url(&["persistent", tenant, namespace, name, "dispatchRate"])?;
2255 let resp = self.send(self.http.request(Method::GET, url)).await?;
2256 json_ok_optional(resp).await
2257 }
2258
2259 /// Set a topic's consumer dispatch-rate policy (overrides namespace default).
2260 ///
2261 /// `POST /admin/v2/persistent/{tenant}/{ns}/{topic}/dispatchRate` with a
2262 /// JSON `DispatchRate` body. Java: `PersistentTopicsBase#setDispatchRate`.
2263 pub async fn topic_set_dispatch_rate(
2264 &self,
2265 topic: &str,
2266 rate: DispatchRate,
2267 ) -> Result<(), AdminError> {
2268 let (tenant, namespace, name) = split_topic(topic)?;
2269 let url = self.url(&["persistent", tenant, namespace, name, "dispatchRate"])?;
2270 let resp = self
2271 .send(self.http.request(Method::POST, url).json(&rate))
2272 .await?;
2273 empty_ok(resp).await
2274 }
2275
2276 /// Remove a topic's consumer dispatch-rate policy.
2277 ///
2278 /// `DELETE /admin/v2/persistent/{tenant}/{ns}/{topic}/dispatchRate`.
2279 /// Java: `PersistentTopicsBase#removeDispatchRate`.
2280 pub async fn topic_remove_dispatch_rate(&self, topic: &str) -> Result<(), AdminError> {
2281 let (tenant, namespace, name) = split_topic(topic)?;
2282 let url = self.url(&["persistent", tenant, namespace, name, "dispatchRate"])?;
2283 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
2284 empty_ok(resp).await
2285 }
2286
2287 /// Get a topic's per-subscription dispatch-rate policy (or `null`).
2288 ///
2289 /// `GET /admin/v2/persistent/{tenant}/{ns}/{topic}/subscriptionDispatchRate`.
2290 /// Reuses the [`DispatchRate`] body shape — the policy applies per
2291 /// subscription rather than aggregated across all consumers.
2292 /// Java: `PersistentTopicsBase#getSubscriptionDispatchRate`.
2293 pub async fn topic_get_subscription_dispatch_rate(
2294 &self,
2295 topic: &str,
2296 ) -> Result<Option<DispatchRate>, AdminError> {
2297 let (tenant, namespace, name) = split_topic(topic)?;
2298 let url = self.url(&[
2299 "persistent",
2300 tenant,
2301 namespace,
2302 name,
2303 "subscriptionDispatchRate",
2304 ])?;
2305 let resp = self.send(self.http.request(Method::GET, url)).await?;
2306 json_ok_optional(resp).await
2307 }
2308
2309 /// Set a topic's per-subscription dispatch-rate policy (overrides namespace default).
2310 ///
2311 /// `POST /admin/v2/persistent/{tenant}/{ns}/{topic}/subscriptionDispatchRate`
2312 /// with a JSON `DispatchRate` body.
2313 /// Java: `PersistentTopicsBase#setSubscriptionDispatchRate`.
2314 pub async fn topic_set_subscription_dispatch_rate(
2315 &self,
2316 topic: &str,
2317 rate: DispatchRate,
2318 ) -> Result<(), AdminError> {
2319 let (tenant, namespace, name) = split_topic(topic)?;
2320 let url = self.url(&[
2321 "persistent",
2322 tenant,
2323 namespace,
2324 name,
2325 "subscriptionDispatchRate",
2326 ])?;
2327 let resp = self
2328 .send(self.http.request(Method::POST, url).json(&rate))
2329 .await?;
2330 empty_ok(resp).await
2331 }
2332
2333 /// Remove a topic's per-subscription dispatch-rate policy.
2334 ///
2335 /// `DELETE /admin/v2/persistent/{tenant}/{ns}/{topic}/subscriptionDispatchRate`.
2336 /// Java: `PersistentTopicsBase#removeSubscriptionDispatchRate`.
2337 pub async fn topic_remove_subscription_dispatch_rate(
2338 &self,
2339 topic: &str,
2340 ) -> Result<(), AdminError> {
2341 let (tenant, namespace, name) = split_topic(topic)?;
2342 let url = self.url(&[
2343 "persistent",
2344 tenant,
2345 namespace,
2346 name,
2347 "subscriptionDispatchRate",
2348 ])?;
2349 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
2350 empty_ok(resp).await
2351 }
2352
2353 /// Get a topic's cross-cluster replicator dispatch-rate policy (or `null`).
2354 ///
2355 /// `GET /admin/v2/persistent/{tenant}/{ns}/{topic}/replicatorDispatchRate`.
2356 /// Reuses the [`DispatchRate`] body shape — the policy throttles
2357 /// outbound geo-replication traffic from this cluster.
2358 /// Java: `PersistentTopicsBase#getReplicatorDispatchRate`.
2359 pub async fn topic_get_replicator_dispatch_rate(
2360 &self,
2361 topic: &str,
2362 ) -> Result<Option<DispatchRate>, AdminError> {
2363 let (tenant, namespace, name) = split_topic(topic)?;
2364 let url = self.url(&[
2365 "persistent",
2366 tenant,
2367 namespace,
2368 name,
2369 "replicatorDispatchRate",
2370 ])?;
2371 let resp = self.send(self.http.request(Method::GET, url)).await?;
2372 json_ok_optional(resp).await
2373 }
2374
2375 /// Set a topic's cross-cluster replicator dispatch-rate policy.
2376 ///
2377 /// `POST /admin/v2/persistent/{tenant}/{ns}/{topic}/replicatorDispatchRate`
2378 /// with a JSON `DispatchRate` body.
2379 /// Java: `PersistentTopicsBase#setReplicatorDispatchRate`.
2380 pub async fn topic_set_replicator_dispatch_rate(
2381 &self,
2382 topic: &str,
2383 rate: DispatchRate,
2384 ) -> Result<(), AdminError> {
2385 let (tenant, namespace, name) = split_topic(topic)?;
2386 let url = self.url(&[
2387 "persistent",
2388 tenant,
2389 namespace,
2390 name,
2391 "replicatorDispatchRate",
2392 ])?;
2393 let resp = self
2394 .send(self.http.request(Method::POST, url).json(&rate))
2395 .await?;
2396 empty_ok(resp).await
2397 }
2398
2399 /// Remove a topic's cross-cluster replicator dispatch-rate policy.
2400 ///
2401 /// `DELETE /admin/v2/persistent/{tenant}/{ns}/{topic}/replicatorDispatchRate`.
2402 /// Java: `PersistentTopicsBase#removeReplicatorDispatchRate`.
2403 pub async fn topic_remove_replicator_dispatch_rate(
2404 &self,
2405 topic: &str,
2406 ) -> Result<(), AdminError> {
2407 let (tenant, namespace, name) = split_topic(topic)?;
2408 let url = self.url(&[
2409 "persistent",
2410 tenant,
2411 namespace,
2412 name,
2413 "replicatorDispatchRate",
2414 ])?;
2415 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
2416 empty_ok(resp).await
2417 }
2418
2419 /// Get a topic's publish-rate policy (or `null` if no override).
2420 ///
2421 /// `GET /admin/v2/persistent/{tenant}/{ns}/{topic}/publishRate`. Returns
2422 /// the per-topic [`PublishRate`] producer-side throttle (msg/sec +
2423 /// byte/sec). `-1` on either dimension means unlimited.
2424 /// Java: `PersistentTopicsBase#getPublishRate`.
2425 pub async fn topic_get_publish_rate(
2426 &self,
2427 topic: &str,
2428 ) -> Result<Option<PublishRate>, AdminError> {
2429 let (tenant, namespace, name) = split_topic(topic)?;
2430 let url = self.url(&["persistent", tenant, namespace, name, "publishRate"])?;
2431 let resp = self.send(self.http.request(Method::GET, url)).await?;
2432 json_ok_optional(resp).await
2433 }
2434
2435 /// Set a topic's publish-rate policy (overrides namespace default).
2436 ///
2437 /// `POST /admin/v2/persistent/{tenant}/{ns}/{topic}/publishRate` with a
2438 /// JSON `PublishRate` body. Java: `PersistentTopicsBase#setPublishRate`.
2439 pub async fn topic_set_publish_rate(
2440 &self,
2441 topic: &str,
2442 rate: PublishRate,
2443 ) -> Result<(), AdminError> {
2444 let (tenant, namespace, name) = split_topic(topic)?;
2445 let url = self.url(&["persistent", tenant, namespace, name, "publishRate"])?;
2446 let resp = self
2447 .send(self.http.request(Method::POST, url).json(&rate))
2448 .await?;
2449 empty_ok(resp).await
2450 }
2451
2452 /// Remove a topic's publish-rate policy.
2453 ///
2454 /// `DELETE /admin/v2/persistent/{tenant}/{ns}/{topic}/publishRate`.
2455 /// Java: `PersistentTopicsBase#removePublishRate`.
2456 pub async fn topic_remove_publish_rate(&self, topic: &str) -> Result<(), AdminError> {
2457 let (tenant, namespace, name) = split_topic(topic)?;
2458 let url = self.url(&["persistent", tenant, namespace, name, "publishRate"])?;
2459 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
2460 empty_ok(resp).await
2461 }
2462
2463 /// Get a topic's max-producers cap (or `null` if no override).
2464 ///
2465 /// `GET /admin/v2/persistent/{tenant}/{ns}/{topic}/maxProducers`. Returns
2466 /// a bare integer when the override is set, `null` (decoded as
2467 /// `Option::None`) when no topic-level cap is in place.
2468 /// Java: `PersistentTopicsBase#getMaxProducers`.
2469 pub async fn topic_get_max_producers(&self, topic: &str) -> Result<Option<i32>, AdminError> {
2470 let (tenant, namespace, name) = split_topic(topic)?;
2471 let url = self.url(&["persistent", tenant, namespace, name, "maxProducers"])?;
2472 let resp = self.send(self.http.request(Method::GET, url)).await?;
2473 json_ok_optional(resp).await
2474 }
2475
2476 /// Set a topic's max-producers cap.
2477 ///
2478 /// `POST /admin/v2/persistent/{tenant}/{ns}/{topic}/maxProducers` with
2479 /// a bare integer body. `0` disables (broker treats as unlimited).
2480 /// Java: `PersistentTopicsBase#setMaxProducers`.
2481 pub async fn topic_set_max_producers(
2482 &self,
2483 topic: &str,
2484 max_producers: i32,
2485 ) -> Result<(), AdminError> {
2486 let (tenant, namespace, name) = split_topic(topic)?;
2487 let url = self.url(&["persistent", tenant, namespace, name, "maxProducers"])?;
2488 let resp = self
2489 .send(self.http.request(Method::POST, url).json(&max_producers))
2490 .await?;
2491 empty_ok(resp).await
2492 }
2493
2494 /// Remove a topic's max-producers cap (fall back to namespace / broker default).
2495 ///
2496 /// `DELETE /admin/v2/persistent/{tenant}/{ns}/{topic}/maxProducers`.
2497 /// Java: `PersistentTopicsBase#removeMaxProducers`.
2498 pub async fn topic_remove_max_producers(&self, topic: &str) -> Result<(), AdminError> {
2499 let (tenant, namespace, name) = split_topic(topic)?;
2500 let url = self.url(&["persistent", tenant, namespace, name, "maxProducers"])?;
2501 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
2502 empty_ok(resp).await
2503 }
2504
2505 /// Get a topic's max-consumers cap (or `null` if no override).
2506 ///
2507 /// `GET /admin/v2/persistent/{tenant}/{ns}/{topic}/maxConsumers`. Returns
2508 /// a bare integer when the override is set, `null` (decoded as
2509 /// `Option::None`) when no topic-level cap is in place.
2510 /// Java: `PersistentTopicsBase#getMaxConsumers`.
2511 pub async fn topic_get_max_consumers(&self, topic: &str) -> Result<Option<i32>, AdminError> {
2512 let (tenant, namespace, name) = split_topic(topic)?;
2513 let url = self.url(&["persistent", tenant, namespace, name, "maxConsumers"])?;
2514 let resp = self.send(self.http.request(Method::GET, url)).await?;
2515 json_ok_optional(resp).await
2516 }
2517
2518 /// Set a topic's max-consumers cap.
2519 ///
2520 /// `POST /admin/v2/persistent/{tenant}/{ns}/{topic}/maxConsumers` with
2521 /// a bare integer body. `0` disables (broker treats as unlimited).
2522 /// Java: `PersistentTopicsBase#setMaxConsumers`.
2523 pub async fn topic_set_max_consumers(
2524 &self,
2525 topic: &str,
2526 max_consumers: i32,
2527 ) -> Result<(), AdminError> {
2528 let (tenant, namespace, name) = split_topic(topic)?;
2529 let url = self.url(&["persistent", tenant, namespace, name, "maxConsumers"])?;
2530 let resp = self
2531 .send(self.http.request(Method::POST, url).json(&max_consumers))
2532 .await?;
2533 empty_ok(resp).await
2534 }
2535
2536 /// Remove a topic's max-consumers cap (fall back to namespace / broker default).
2537 ///
2538 /// `DELETE /admin/v2/persistent/{tenant}/{ns}/{topic}/maxConsumers`.
2539 /// Java: `PersistentTopicsBase#removeMaxConsumers`.
2540 pub async fn topic_remove_max_consumers(&self, topic: &str) -> Result<(), AdminError> {
2541 let (tenant, namespace, name) = split_topic(topic)?;
2542 let url = self.url(&["persistent", tenant, namespace, name, "maxConsumers"])?;
2543 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
2544 empty_ok(resp).await
2545 }
2546
2547 // --- Subscriptions ---------------------------------------------------
2548
2549 /// List subscription names on a topic.
2550 ///
2551 /// `GET /admin/v2/persistent/{tenant}/{namespace}/{topic}/subscriptions`.
2552 /// Java: `PersistentTopics#getSubscriptions`.
2553 pub async fn subscriptions_list(&self, topic: &str) -> Result<Vec<String>, AdminError> {
2554 let (tenant, namespace, name) = split_topic(topic)?;
2555 let url = self.url(&["persistent", tenant, namespace, name, "subscriptions"])?;
2556 let resp = self.send(self.http.request(Method::GET, url)).await?;
2557 json_ok(resp).await
2558 }
2559
2560 /// Reset a subscription's cursor to a specific message-id position.
2561 ///
2562 /// `POST /admin/v2/persistent/{tenant}/{namespace}/{topic}/subscription/{sub}/resetcursor`
2563 /// with body `{ledgerId, entryId, partitionIndex, batchIndex, isExcluded}`.
2564 /// `is_excluded = true` skips the message at `message_id` itself; `false`
2565 /// leaves it eligible for redelivery. Java: `PersistentTopics#resetCursorOnPosition`.
2566 pub async fn subscription_reset_cursor_to_position(
2567 &self,
2568 topic: &str,
2569 subscription: &str,
2570 message_id: MessageId,
2571 is_excluded: bool,
2572 ) -> Result<(), AdminError> {
2573 let (tenant, namespace, name) = split_topic(topic)?;
2574 validate_segment(subscription)?;
2575 let url = self.url(&[
2576 "persistent",
2577 tenant,
2578 namespace,
2579 name,
2580 "subscription",
2581 subscription,
2582 "resetcursor",
2583 ])?;
2584 let body = ResetCursorData {
2585 ledger_id: message_id_field_for_wire(message_id.ledger_id),
2586 entry_id: message_id_field_for_wire(message_id.entry_id),
2587 partition_index: message_id.partition,
2588 batch_index: message_id.batch_index,
2589 is_excluded,
2590 };
2591 let resp = self
2592 .send(self.http.request(Method::POST, url).json(&body))
2593 .await?;
2594 empty_ok(resp).await
2595 }
2596
2597 /// Reset a subscription's cursor to a wall-clock timestamp (millis since epoch).
2598 ///
2599 /// `POST /admin/v2/persistent/{tenant}/{namespace}/{topic}/subscription/{sub}/resetcursor/
2600 /// {timestamp}`. Java: `PersistentTopics#resetCursor(topic, sub, timestamp)`.
2601 pub async fn subscription_reset_cursor_to_timestamp(
2602 &self,
2603 topic: &str,
2604 subscription: &str,
2605 timestamp_millis: u64,
2606 ) -> Result<(), AdminError> {
2607 let (tenant, namespace, name) = split_topic(topic)?;
2608 validate_segment(subscription)?;
2609 let timestamp = timestamp_millis.to_string();
2610 let url = self.url(&[
2611 "persistent",
2612 tenant,
2613 namespace,
2614 name,
2615 "subscription",
2616 subscription,
2617 "resetcursor",
2618 ×tamp,
2619 ])?;
2620 let resp = self.send(self.http.request(Method::POST, url)).await?;
2621 empty_ok(resp).await
2622 }
2623
2624 /// Advance a subscription's cursor past N undelivered messages.
2625 ///
2626 /// `POST /admin/v2/persistent/{tenant}/{namespace}/{topic}/subscription/{sub}/skip/
2627 /// {numMessages}`. Java: `PersistentTopics#skipMessages`.
2628 pub async fn subscription_skip_messages(
2629 &self,
2630 topic: &str,
2631 subscription: &str,
2632 num_messages: u64,
2633 ) -> Result<(), AdminError> {
2634 let (tenant, namespace, name) = split_topic(topic)?;
2635 validate_segment(subscription)?;
2636 let n = num_messages.to_string();
2637 let url = self.url(&[
2638 "persistent",
2639 tenant,
2640 namespace,
2641 name,
2642 "subscription",
2643 subscription,
2644 "skip",
2645 &n,
2646 ])?;
2647 let resp = self.send(self.http.request(Method::POST, url)).await?;
2648 empty_ok(resp).await
2649 }
2650
2651 /// Drain the entire backlog of a subscription (clear-backlog).
2652 ///
2653 /// `POST /admin/v2/persistent/{tenant}/{namespace}/{topic}/subscription/{sub}/skip_all`.
2654 /// Java: `PersistentTopics#skipAllMessages`.
2655 pub async fn subscription_skip_all_messages(
2656 &self,
2657 topic: &str,
2658 subscription: &str,
2659 ) -> Result<(), AdminError> {
2660 let (tenant, namespace, name) = split_topic(topic)?;
2661 validate_segment(subscription)?;
2662 let url = self.url(&[
2663 "persistent",
2664 tenant,
2665 namespace,
2666 name,
2667 "subscription",
2668 subscription,
2669 "skip_all",
2670 ])?;
2671 let resp = self.send(self.http.request(Method::POST, url)).await?;
2672 empty_ok(resp).await
2673 }
2674
2675 /// Expire all messages older than `expire_time_seconds` for a subscription.
2676 ///
2677 /// `POST /admin/v2/persistent/{tenant}/{namespace}/{topic}/subscription/{sub}/expireMessages/
2678 /// {seconds}`. Java: `PersistentTopics#expireMessagesForSubscription`.
2679 pub async fn subscription_expire_messages(
2680 &self,
2681 topic: &str,
2682 subscription: &str,
2683 expire_time_seconds: u64,
2684 ) -> Result<(), AdminError> {
2685 let (tenant, namespace, name) = split_topic(topic)?;
2686 validate_segment(subscription)?;
2687 let s = expire_time_seconds.to_string();
2688 let url = self.url(&[
2689 "persistent",
2690 tenant,
2691 namespace,
2692 name,
2693 "subscription",
2694 subscription,
2695 "expireMessages",
2696 &s,
2697 ])?;
2698 let resp = self.send(self.http.request(Method::POST, url)).await?;
2699 empty_ok(resp).await
2700 }
2701
2702 /// Delete (unsubscribe) a subscription.
2703 ///
2704 /// `DELETE /admin/v2/persistent/{tenant}/{namespace}/{topic}/subscription/{sub}?force={force}`.
2705 /// `force = true` disconnects active consumers before deletion. Java:
2706 /// `PersistentTopics#deleteSubscription`.
2707 pub async fn subscription_delete(
2708 &self,
2709 topic: &str,
2710 subscription: &str,
2711 force: bool,
2712 ) -> Result<(), AdminError> {
2713 let (tenant, namespace, name) = split_topic(topic)?;
2714 validate_segment(subscription)?;
2715 let mut url = self.url(&[
2716 "persistent",
2717 tenant,
2718 namespace,
2719 name,
2720 "subscription",
2721 subscription,
2722 ])?;
2723 if force {
2724 url.query_pairs_mut().append_pair("force", "true");
2725 }
2726 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
2727 empty_ok(resp).await
2728 }
2729
2730 // --- Shadow topics (PIP-180 / ADR-0033) ------------------------------
2731
2732 /// Create a shadow topic ([PIP-180](https://github.com/apache/pulsar/blob/master/pip/pip-180.md)).
2733 ///
2734 /// Creates the shadow as a regular non-partitioned topic with the broker-reserved
2735 /// `PULSAR.SHADOW_SOURCE` topic property pointing at the source topic:
2736 /// `PUT /admin/v2/persistent/{tenant}/{namespace}/{shadow}` with body
2737 /// `{ "PULSAR.SHADOW_SOURCE": "persistent://tenant/ns/source" }`.
2738 /// Then links the created shadow in the source topic policy list via
2739 /// `PUT /admin/v2/persistent/{tenant}/{namespace}/{source}/shadowTopics`.
2740 ///
2741 /// Java:
2742 /// `pulsar-client-admin/.../TopicsImpl.java#createShadowTopicAsync` builds the same property
2743 /// map before calling `createNonPartitionedTopicAsync` for non-partitioned sources.
2744 ///
2745 /// Errors mirror the existing `AdminError` taxonomy: 404 → `Status { code:
2746 /// 404, .. }` (the source topic does not exist), 409 → `Status { code:
2747 /// 409, .. }` (the shadow topic already exists on this source),
2748 /// 401/403 → `Status { code: 401|403, .. }` (auth).
2749 pub async fn create_shadow_topic(&self, source: &str, shadow: &str) -> Result<(), AdminError> {
2750 let (source_tenant, source_namespace, source_name) = split_topic(source)?;
2751 // Validate the shadow name eagerly so a misformatted argument errors
2752 // out with `InvalidName` rather than as a broker 4xx after we've
2753 // already crossed the wire.
2754 let (shadow_tenant, shadow_namespace, shadow_name) = split_topic(shadow)?;
2755 let source = format!("persistent://{source_tenant}/{source_namespace}/{source_name}");
2756 let shadow = format!("persistent://{shadow_tenant}/{shadow_namespace}/{shadow_name}");
2757 let mut properties = HashMap::new();
2758 properties.insert("PULSAR.SHADOW_SOURCE".to_owned(), source.clone());
2759 self.topic_create_non_partitioned_with_properties(&shadow, &properties)
2760 .await?;
2761 self.set_shadow_topics(&source, &[shadow]).await
2762 }
2763
2764 async fn set_shadow_topics(&self, source: &str, shadows: &[String]) -> Result<(), AdminError> {
2765 let (tenant, namespace, name) = split_topic(source)?;
2766 let url = self.url(&["persistent", tenant, namespace, name, "shadowTopics"])?;
2767 let resp = self
2768 .send(self.http.request(Method::PUT, url).json(shadows))
2769 .await?;
2770 empty_ok(resp).await
2771 }
2772
2773 /// Delete a shadow topic (PIP-180).
2774 ///
2775 /// `DELETE /admin/v2/persistent/{tenant}/{namespace}/{topic}` where
2776 /// `{topic}` is the **shadow** topic name. PIP-180's deletion contract
2777 /// goes through the regular topic-delete path on the shadow itself —
2778 /// the broker recognises the topic as a shadow and detaches it from
2779 /// the source ledger atomically with the metadata delete.
2780 ///
2781 /// `force` controls whether active subscribers are kicked off before
2782 /// the delete (`?force=true`) or whether the broker rejects the
2783 /// request when subscribers exist (`?force=false`, the default).
2784 ///
2785 /// Java: `org.apache.pulsar.client.admin.Topics#deleteShadowTopic` calls
2786 /// the same `@DELETE @Path("/{tenant}/{namespace}/{topic}")` endpoint.
2787 pub async fn delete_shadow_topic(&self, shadow: &str, force: bool) -> Result<(), AdminError> {
2788 let (tenant, namespace, name) = split_topic(shadow)?;
2789 let mut url = self.url(&["persistent", tenant, namespace, name])?;
2790 url.query_pairs_mut()
2791 .append_pair("force", if force { "true" } else { "false" });
2792 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
2793 empty_ok(resp).await
2794 }
2795
2796 /// List the shadow topics created on a source topic (PIP-180).
2797 ///
2798 /// `GET /admin/v2/persistent/{tenant}/{namespace}/{topic}/shadowTopics`
2799 /// where `{topic}` is the **source** topic name. The broker returns a
2800 /// JSON array of fully-qualified shadow topic names.
2801 ///
2802 /// Java:
2803 /// `pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java`
2804 /// (`@GET @Path("/{tenant}/{namespace}/{topic}/shadowTopics")`).
2805 ///
2806 /// Used by the runtime engine at consumer subscribe time: when the user
2807 /// subscribes to a topic the runtime cannot yet classify, a single
2808 /// `get_shadow_topics` lookup on every other topic in the namespace is
2809 /// expensive; instead the runtime calls `get_shadow_topics(subscribed)`
2810 /// on the topic itself — a non-shadow topic returns an empty array, a
2811 /// shadow topic surfaces nothing but the broker has already populated
2812 /// the consumer's `shadow_metadata` via the topic's policy.
2813 /// (See `crates/magnetar-runtime-tokio/src/client.rs::subscribe`.)
2814 pub async fn get_shadow_topics(&self, source: &str) -> Result<Vec<String>, AdminError> {
2815 let (tenant, namespace, name) = split_topic(source)?;
2816 let url = self.url(&["persistent", tenant, namespace, name, "shadowTopics"])?;
2817 let resp = self.send(self.http.request(Method::GET, url)).await?;
2818 json_ok_or_default(resp).await
2819 }
2820
2821 /// Resolve the **source** topic of a shadow topic (PIP-180).
2822 ///
2823 /// `GET /admin/v2/persistent/{tenant}/{namespace}/{topic}/properties`.
2824 /// Returns the `PULSAR.SHADOW_SOURCE` property when the queried topic is a
2825 /// shadow; returns `None` when it is a regular topic. Used by the runtime
2826 /// at subscribe time to populate
2827 /// [`magnetar_proto::ShadowTopicMetadata::source_topic`] on the new
2828 /// consumer (so the receive path can emit
2829 /// [`magnetar_proto::ConnectionEvent::MessageReceivedFromShadow`]
2830 /// without an out-of-band lookup per message).
2831 ///
2832 /// Java: `org.apache.pulsar.client.admin.Topics#getShadowSource` —
2833 /// `TopicsImpl#getShadowSourceAsync` delegates to `getPropertiesAsync`.
2834 pub async fn get_shadow_source(&self, shadow: &str) -> Result<Option<String>, AdminError> {
2835 let (tenant, namespace, name) = split_topic(shadow)?;
2836 let url = self.url(&["persistent", tenant, namespace, name, "properties"])?;
2837 let resp = self.send(self.http.request(Method::GET, url)).await?;
2838 let mut properties: HashMap<String, String> = json_ok_or_default(resp).await?;
2839 Ok(properties
2840 .remove("PULSAR.SHADOW_SOURCE")
2841 .filter(|source| !source.is_empty()))
2842 }
2843
2844 // --- Pulsar IO Sources (/admin/v3/sources/...) ----------------------
2845
2846 /// List sources configured under a namespace.
2847 ///
2848 /// `GET /admin/v3/sources/{tenant}/{namespace}`. Returns the list of
2849 /// source names (the broker emits a JSON array of strings — one
2850 /// entry per declared source, regardless of running state).
2851 /// Java:
2852 /// `pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Sources.java`
2853 /// (`@Path("/sources")`) +
2854 /// `pulsar-broker/.../admin/impl/SourcesBase.java#listSources`.
2855 pub async fn sources_list_by_namespace(
2856 &self,
2857 tenant: &str,
2858 namespace: &str,
2859 ) -> Result<Vec<String>, AdminError> {
2860 validate_segment(tenant)?;
2861 validate_segment(namespace)?;
2862 let url = self.url_v3(&["sources", tenant, namespace])?;
2863 let resp = self.send(self.http.request(Method::GET, url)).await?;
2864 json_ok(resp).await
2865 }
2866
2867 /// Get one source's configuration.
2868 ///
2869 /// `GET /admin/v3/sources/{tenant}/{namespace}/{name}`. Returns the
2870 /// stored `SourceConfig` envelope as raw JSON — minor broker
2871 /// versions extend the shape (new connector knobs, secret refs)
2872 /// faster than a typed Rust DTO can keep up.
2873 /// Java: `SourcesBase#getSourceInfo`.
2874 pub async fn source_get(
2875 &self,
2876 tenant: &str,
2877 namespace: &str,
2878 name: &str,
2879 ) -> Result<serde_json::Value, AdminError> {
2880 validate_segment(tenant)?;
2881 validate_segment(namespace)?;
2882 validate_segment(name)?;
2883 let url = self.url_v3(&["sources", tenant, namespace, name])?;
2884 let resp = self.send(self.http.request(Method::GET, url)).await?;
2885 json_ok(resp).await
2886 }
2887
2888 /// Get a source's running status (per-instance worker telemetry).
2889 ///
2890 /// `GET /admin/v3/sources/{tenant}/{namespace}/{name}/status`.
2891 /// Returns the broker's `SourceStatus` envelope (`numInstances`,
2892 /// `numRunning`, per-instance `workerId` + `running` + last
2893 /// received timestamp). Exposed as raw JSON for forward-compat.
2894 /// Java: `SourcesBase#getSourceStatus`.
2895 pub async fn source_status(
2896 &self,
2897 tenant: &str,
2898 namespace: &str,
2899 name: &str,
2900 ) -> Result<serde_json::Value, AdminError> {
2901 validate_segment(tenant)?;
2902 validate_segment(namespace)?;
2903 validate_segment(name)?;
2904 let url = self.url_v3(&["sources", tenant, namespace, name, "status"])?;
2905 let resp = self.send(self.http.request(Method::GET, url)).await?;
2906 json_ok(resp).await
2907 }
2908
2909 /// Register a source from a remote package URL.
2910 ///
2911 /// `POST /admin/v3/sources/{tenant}/{namespace}/{name}` with
2912 /// `multipart/form-data` carrying two parts: a `url` text part (the
2913 /// package URL — `http(s)://`, `file://`, or `function://` per the
2914 /// broker's `WorkerUtils#downloadFileFromPackageUrl`) and a
2915 /// `sourceConfig` JSON part with the [`SourceConfig`] body.
2916 /// A sibling `source_create` (binary upload) is intentionally not
2917 /// yet exposed — this URL-based variant covers every
2918 /// CI/operator scenario that does not need to ship a JAR through
2919 /// the admin client itself.
2920 /// Java: `SourcesBase#registerSource`.
2921 pub async fn source_create_with_url(
2922 &self,
2923 tenant: &str,
2924 namespace: &str,
2925 name: &str,
2926 url: &str,
2927 config: SourceConfig,
2928 ) -> Result<(), AdminError> {
2929 validate_segment(tenant)?;
2930 validate_segment(namespace)?;
2931 validate_segment(name)?;
2932 let endpoint = self.url_v3(&["sources", tenant, namespace, name])?;
2933 let form = build_url_config_multipart(url, "sourceConfig", &config)?;
2934 let resp = self
2935 .send(
2936 self.http
2937 .request(Method::POST, endpoint)
2938 .multipart(form)
2939 .timeout(PACKAGE_REGISTER_TIMEOUT),
2940 )
2941 .await?;
2942 empty_ok(resp).await
2943 }
2944
2945 /// Update a source from a remote package URL.
2946 ///
2947 /// `PUT /admin/v3/sources/{tenant}/{namespace}/{name}` with the
2948 /// same multipart shape as [`Self::source_create_with_url`].
2949 /// Java: `SourcesBase#updateSource`.
2950 pub async fn source_update_with_url(
2951 &self,
2952 tenant: &str,
2953 namespace: &str,
2954 name: &str,
2955 url: &str,
2956 config: SourceConfig,
2957 ) -> Result<(), AdminError> {
2958 validate_segment(tenant)?;
2959 validate_segment(namespace)?;
2960 validate_segment(name)?;
2961 let endpoint = self.url_v3(&["sources", tenant, namespace, name])?;
2962 let form = build_url_config_multipart(url, "sourceConfig", &config)?;
2963 let resp = self
2964 .send(
2965 self.http
2966 .request(Method::PUT, endpoint)
2967 .multipart(form)
2968 .timeout(PACKAGE_REGISTER_TIMEOUT),
2969 )
2970 .await?;
2971 empty_ok(resp).await
2972 }
2973
2974 /// Delete a source.
2975 ///
2976 /// `DELETE /admin/v3/sources/{tenant}/{namespace}/{name}`. Removes
2977 /// the source declaration and tears the running instances down.
2978 /// Java: `SourcesBase#deregisterSource`.
2979 pub async fn source_delete(
2980 &self,
2981 tenant: &str,
2982 namespace: &str,
2983 name: &str,
2984 ) -> Result<(), AdminError> {
2985 validate_segment(tenant)?;
2986 validate_segment(namespace)?;
2987 validate_segment(name)?;
2988 let url = self.url_v3(&["sources", tenant, namespace, name])?;
2989 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
2990 empty_ok(resp).await
2991 }
2992
2993 /// Start every instance of a source.
2994 ///
2995 /// `POST /admin/v3/sources/{tenant}/{namespace}/{name}/start`.
2996 /// Java: `SourcesBase#startSource`.
2997 pub async fn source_start(
2998 &self,
2999 tenant: &str,
3000 namespace: &str,
3001 name: &str,
3002 ) -> Result<(), AdminError> {
3003 validate_segment(tenant)?;
3004 validate_segment(namespace)?;
3005 validate_segment(name)?;
3006 let url = self.url_v3(&["sources", tenant, namespace, name, "start"])?;
3007 let resp = self.send(self.http.request(Method::POST, url)).await?;
3008 empty_ok(resp).await
3009 }
3010
3011 /// Stop every instance of a source.
3012 ///
3013 /// `POST /admin/v3/sources/{tenant}/{namespace}/{name}/stop`.
3014 /// Java: `SourcesBase#stopSource`.
3015 pub async fn source_stop(
3016 &self,
3017 tenant: &str,
3018 namespace: &str,
3019 name: &str,
3020 ) -> Result<(), AdminError> {
3021 validate_segment(tenant)?;
3022 validate_segment(namespace)?;
3023 validate_segment(name)?;
3024 let url = self.url_v3(&["sources", tenant, namespace, name, "stop"])?;
3025 let resp = self.send(self.http.request(Method::POST, url)).await?;
3026 empty_ok(resp).await
3027 }
3028
3029 /// Restart every instance of a source.
3030 ///
3031 /// `POST /admin/v3/sources/{tenant}/{namespace}/{name}/restart`.
3032 /// Java: `SourcesBase#restartSource`.
3033 pub async fn source_restart(
3034 &self,
3035 tenant: &str,
3036 namespace: &str,
3037 name: &str,
3038 ) -> Result<(), AdminError> {
3039 validate_segment(tenant)?;
3040 validate_segment(namespace)?;
3041 validate_segment(name)?;
3042 let url = self.url_v3(&["sources", tenant, namespace, name, "restart"])?;
3043 let resp = self.send(self.http.request(Method::POST, url)).await?;
3044 empty_ok(resp).await
3045 }
3046
3047 // --- Pulsar IO Sinks (/admin/v3/sinks/...) --------------------------
3048
3049 /// List sinks configured under a namespace.
3050 ///
3051 /// `GET /admin/v3/sinks/{tenant}/{namespace}`. Returns the list of
3052 /// sink names. Mirrors [`Self::sources_list_by_namespace`] —
3053 /// Pulsar's Sources / Sinks REST surfaces are intentionally
3054 /// symmetric.
3055 /// Java:
3056 /// `pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Sinks.java`
3057 /// (`@Path("/sinks")`) +
3058 /// `pulsar-broker/.../admin/impl/SinksBase.java#listSinks`.
3059 pub async fn sinks_list_by_namespace(
3060 &self,
3061 tenant: &str,
3062 namespace: &str,
3063 ) -> Result<Vec<String>, AdminError> {
3064 validate_segment(tenant)?;
3065 validate_segment(namespace)?;
3066 let url = self.url_v3(&["sinks", tenant, namespace])?;
3067 let resp = self.send(self.http.request(Method::GET, url)).await?;
3068 json_ok(resp).await
3069 }
3070
3071 /// Get one sink's configuration.
3072 ///
3073 /// `GET /admin/v3/sinks/{tenant}/{namespace}/{name}`. Returns the
3074 /// stored `SinkConfig` as raw JSON for the same forward-compat
3075 /// reason as [`Self::source_get`]. Java: `SinksBase#getSinkInfo`.
3076 pub async fn sink_get(
3077 &self,
3078 tenant: &str,
3079 namespace: &str,
3080 name: &str,
3081 ) -> Result<serde_json::Value, AdminError> {
3082 validate_segment(tenant)?;
3083 validate_segment(namespace)?;
3084 validate_segment(name)?;
3085 let url = self.url_v3(&["sinks", tenant, namespace, name])?;
3086 let resp = self.send(self.http.request(Method::GET, url)).await?;
3087 json_ok(resp).await
3088 }
3089
3090 /// Get a sink's running status (per-instance worker telemetry).
3091 ///
3092 /// `GET /admin/v3/sinks/{tenant}/{namespace}/{name}/status`.
3093 /// Same envelope shape as the Sources status. Raw JSON.
3094 /// Java: `SinksBase#getSinkStatus`.
3095 pub async fn sink_status(
3096 &self,
3097 tenant: &str,
3098 namespace: &str,
3099 name: &str,
3100 ) -> Result<serde_json::Value, AdminError> {
3101 validate_segment(tenant)?;
3102 validate_segment(namespace)?;
3103 validate_segment(name)?;
3104 let url = self.url_v3(&["sinks", tenant, namespace, name, "status"])?;
3105 let resp = self.send(self.http.request(Method::GET, url)).await?;
3106 json_ok(resp).await
3107 }
3108
3109 /// Register a sink from a remote package URL.
3110 ///
3111 /// `POST /admin/v3/sinks/{tenant}/{namespace}/{name}` with
3112 /// `multipart/form-data` carrying a `url` text part and a
3113 /// `sinkConfig` JSON part. Mirrors
3114 /// [`Self::source_create_with_url`]; the only wire-level
3115 /// difference is the JSON-part field name (`sinkConfig` vs
3116 /// `sourceConfig`). Java: `SinksBase#registerSink`.
3117 pub async fn sink_create_with_url(
3118 &self,
3119 tenant: &str,
3120 namespace: &str,
3121 name: &str,
3122 url: &str,
3123 config: SinkConfig,
3124 ) -> Result<(), AdminError> {
3125 validate_segment(tenant)?;
3126 validate_segment(namespace)?;
3127 validate_segment(name)?;
3128 let endpoint = self.url_v3(&["sinks", tenant, namespace, name])?;
3129 let form = build_url_config_multipart(url, "sinkConfig", &config)?;
3130 let resp = self
3131 .send(
3132 self.http
3133 .request(Method::POST, endpoint)
3134 .multipart(form)
3135 .timeout(PACKAGE_REGISTER_TIMEOUT),
3136 )
3137 .await?;
3138 empty_ok(resp).await
3139 }
3140
3141 /// Update a sink from a remote package URL.
3142 ///
3143 /// `PUT /admin/v3/sinks/{tenant}/{namespace}/{name}` with the same
3144 /// multipart shape as [`Self::sink_create_with_url`].
3145 /// Java: `SinksBase#updateSink`.
3146 pub async fn sink_update_with_url(
3147 &self,
3148 tenant: &str,
3149 namespace: &str,
3150 name: &str,
3151 url: &str,
3152 config: SinkConfig,
3153 ) -> Result<(), AdminError> {
3154 validate_segment(tenant)?;
3155 validate_segment(namespace)?;
3156 validate_segment(name)?;
3157 let endpoint = self.url_v3(&["sinks", tenant, namespace, name])?;
3158 let form = build_url_config_multipart(url, "sinkConfig", &config)?;
3159 let resp = self
3160 .send(
3161 self.http
3162 .request(Method::PUT, endpoint)
3163 .multipart(form)
3164 .timeout(PACKAGE_REGISTER_TIMEOUT),
3165 )
3166 .await?;
3167 empty_ok(resp).await
3168 }
3169
3170 /// Delete a sink.
3171 ///
3172 /// `DELETE /admin/v3/sinks/{tenant}/{namespace}/{name}`.
3173 /// Java: `SinksBase#deregisterSink`.
3174 pub async fn sink_delete(
3175 &self,
3176 tenant: &str,
3177 namespace: &str,
3178 name: &str,
3179 ) -> Result<(), AdminError> {
3180 validate_segment(tenant)?;
3181 validate_segment(namespace)?;
3182 validate_segment(name)?;
3183 let url = self.url_v3(&["sinks", tenant, namespace, name])?;
3184 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
3185 empty_ok(resp).await
3186 }
3187
3188 /// Start every instance of a sink.
3189 ///
3190 /// `POST /admin/v3/sinks/{tenant}/{namespace}/{name}/start`.
3191 /// Java: `SinksBase#startSink`.
3192 pub async fn sink_start(
3193 &self,
3194 tenant: &str,
3195 namespace: &str,
3196 name: &str,
3197 ) -> Result<(), AdminError> {
3198 validate_segment(tenant)?;
3199 validate_segment(namespace)?;
3200 validate_segment(name)?;
3201 let url = self.url_v3(&["sinks", tenant, namespace, name, "start"])?;
3202 let resp = self.send(self.http.request(Method::POST, url)).await?;
3203 empty_ok(resp).await
3204 }
3205
3206 /// Stop every instance of a sink.
3207 ///
3208 /// `POST /admin/v3/sinks/{tenant}/{namespace}/{name}/stop`.
3209 /// Java: `SinksBase#stopSink`.
3210 pub async fn sink_stop(
3211 &self,
3212 tenant: &str,
3213 namespace: &str,
3214 name: &str,
3215 ) -> Result<(), AdminError> {
3216 validate_segment(tenant)?;
3217 validate_segment(namespace)?;
3218 validate_segment(name)?;
3219 let url = self.url_v3(&["sinks", tenant, namespace, name, "stop"])?;
3220 let resp = self.send(self.http.request(Method::POST, url)).await?;
3221 empty_ok(resp).await
3222 }
3223
3224 /// Restart every instance of a sink.
3225 ///
3226 /// `POST /admin/v3/sinks/{tenant}/{namespace}/{name}/restart`.
3227 /// Java: `SinksBase#restartSink`.
3228 pub async fn sink_restart(
3229 &self,
3230 tenant: &str,
3231 namespace: &str,
3232 name: &str,
3233 ) -> Result<(), AdminError> {
3234 validate_segment(tenant)?;
3235 validate_segment(namespace)?;
3236 validate_segment(name)?;
3237 let url = self.url_v3(&["sinks", tenant, namespace, name, "restart"])?;
3238 let resp = self.send(self.http.request(Method::POST, url)).await?;
3239 empty_ok(resp).await
3240 }
3241
3242 // --- Pulsar Packages (/admin/v3/packages/...) -----------------------
3243
3244 /// List package names declared under (`type`, `tenant`, `namespace`).
3245 ///
3246 /// `GET /admin/v3/packages/{type}/{tenant}/{namespace}`. Returns the
3247 /// list of package names — *not* versions; one entry per declared
3248 /// package. Use [`Self::package_versions_list`] to enumerate the
3249 /// versions of one package.
3250 /// Java: `pulsar-broker/src/main/java/org/apache/pulsar/broker/
3251 /// admin/v3/Packages.java#listPackages`.
3252 pub async fn packages_list(
3253 &self,
3254 pkg_type: PackageType,
3255 tenant: &str,
3256 namespace: &str,
3257 ) -> Result<Vec<String>, AdminError> {
3258 validate_segment(tenant)?;
3259 validate_segment(namespace)?;
3260 let url = self.url_v3(&["packages", pkg_type.as_str(), tenant, namespace])?;
3261 let resp = self.send(self.http.request(Method::GET, url)).await?;
3262 json_ok(resp).await
3263 }
3264
3265 /// List the versions declared for one package.
3266 ///
3267 /// `GET /admin/v3/packages/{type}/{tenant}/{namespace}/{name}`.
3268 /// Returns the list of version strings (Pulsar treats versions as
3269 /// opaque strings — `1.0.0`, `latest`, build hashes — and only the
3270 /// metadata endpoints understand them).
3271 /// Java: `PackagesBase#listPackageVersions`.
3272 pub async fn package_versions_list(
3273 &self,
3274 pkg_type: PackageType,
3275 tenant: &str,
3276 namespace: &str,
3277 name: &str,
3278 ) -> Result<Vec<String>, AdminError> {
3279 validate_segment(tenant)?;
3280 validate_segment(namespace)?;
3281 validate_segment(name)?;
3282 let url = self.url_v3(&["packages", pkg_type.as_str(), tenant, namespace, name])?;
3283 let resp = self.send(self.http.request(Method::GET, url)).await?;
3284 json_ok(resp).await
3285 }
3286
3287 /// Get the metadata envelope for one package version.
3288 ///
3289 /// `GET /admin/v3/packages/{type}/{tenant}/{namespace}/{name}/
3290 /// {version}/metadata`. Returns the `PackageMetadata` envelope as
3291 /// raw JSON for forward-compat — broker minor versions extend the
3292 /// shape with `tags`, `documentationUrl`, etc.
3293 /// Java: `PackagesBase#getPackageMetadata`.
3294 pub async fn package_metadata_get(
3295 &self,
3296 pkg_type: PackageType,
3297 tenant: &str,
3298 namespace: &str,
3299 name: &str,
3300 version: &str,
3301 ) -> Result<serde_json::Value, AdminError> {
3302 validate_segment(tenant)?;
3303 validate_segment(namespace)?;
3304 validate_segment(name)?;
3305 validate_segment(version)?;
3306 let url = self.url_v3(&[
3307 "packages",
3308 pkg_type.as_str(),
3309 tenant,
3310 namespace,
3311 name,
3312 version,
3313 "metadata",
3314 ])?;
3315 let resp = self.send(self.http.request(Method::GET, url)).await?;
3316 json_ok(resp).await
3317 }
3318
3319 /// Replace the metadata envelope for one package version.
3320 ///
3321 /// `PUT /admin/v3/packages/{type}/{tenant}/{namespace}/{name}/
3322 /// {version}/metadata` with a [`PackageMetadata`] JSON body.
3323 /// The broker rejects this verb with 404 if the package version
3324 /// does not exist — `package_metadata_set` is *update*, never
3325 /// *create*. Java: `PackagesBase#updatePackageMetadata`.
3326 pub async fn package_metadata_set(
3327 &self,
3328 pkg_type: PackageType,
3329 tenant: &str,
3330 namespace: &str,
3331 name: &str,
3332 version: &str,
3333 metadata: PackageMetadata,
3334 ) -> Result<(), AdminError> {
3335 validate_segment(tenant)?;
3336 validate_segment(namespace)?;
3337 validate_segment(name)?;
3338 validate_segment(version)?;
3339 let url = self.url_v3(&[
3340 "packages",
3341 pkg_type.as_str(),
3342 tenant,
3343 namespace,
3344 name,
3345 version,
3346 "metadata",
3347 ])?;
3348 let resp = self
3349 .send(self.http.request(Method::PUT, url).json(&metadata))
3350 .await?;
3351 empty_ok(resp).await
3352 }
3353
3354 /// Delete one package version.
3355 ///
3356 /// `DELETE /admin/v3/packages/{type}/{tenant}/{namespace}/{name}/
3357 /// {version}`. The broker drops the version's metadata + storage
3358 /// atomically; other versions of the same package are untouched.
3359 /// Java: `PackagesBase#deletePackage`.
3360 pub async fn package_delete(
3361 &self,
3362 pkg_type: PackageType,
3363 tenant: &str,
3364 namespace: &str,
3365 name: &str,
3366 version: &str,
3367 ) -> Result<(), AdminError> {
3368 validate_segment(tenant)?;
3369 validate_segment(namespace)?;
3370 validate_segment(name)?;
3371 validate_segment(version)?;
3372 let url = self.url_v3(&[
3373 "packages",
3374 pkg_type.as_str(),
3375 tenant,
3376 namespace,
3377 name,
3378 version,
3379 ])?;
3380 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
3381 empty_ok(resp).await
3382 }
3383
3384 // --- Internal --------------------------------------------------------
3385
3386 /// Build a request URL by joining `segments` onto `base_url`. Each segment
3387 /// is percent-encoded for the URL path.
3388 fn url(&self, segments: &[&str]) -> Result<Url, AdminError> {
3389 Self::url_for(&self.base_url, segments)
3390 }
3391
3392 /// Build a request URL by joining `segments` onto the `/admin/v3/`
3393 /// base. Used by Pulsar Functions / IO Sources / IO Sinks /
3394 /// Packages, which Pulsar moved off of `/admin/v2/` historically.
3395 fn url_v3(&self, segments: &[&str]) -> Result<Url, AdminError> {
3396 Self::url_for(&self.base_url_v3, segments)
3397 }
3398
3399 /// Shared URL-builder body for the v2 / v3 helpers.
3400 fn url_for(base: &Url, segments: &[&str]) -> Result<Url, AdminError> {
3401 let mut url = base.clone();
3402 {
3403 // `Url::path_segments_mut` only fails for cannot-be-a-base URLs;
3404 // builder already rejected those.
3405 let mut path = url
3406 .path_segments_mut()
3407 .map_err(|()| AdminError::Builder("base url is cannot-be-a-base".into()))?;
3408 // Both `base_url` (anchored at `/admin/v2/`) and `base_url_v3`
3409 // (`/admin/v3/`) carry a trailing slash, which produces a
3410 // sentinel empty trailing segment in `path_segments_mut`. Drop
3411 // it before appending API segments — otherwise pushes land
3412 // after the empty, producing `/admin/v2//persistent/...`. Real
3413 // brokers tolerate the double slash; strict mocks (wiremock)
3414 // do not, and Java's `PulsarAdmin` emits the single-slash
3415 // form.
3416 path.pop_if_empty();
3417 for segment in segments {
3418 path.push(segment);
3419 }
3420 }
3421 Ok(url)
3422 }
3423
3424 /// Apply auth headers and dispatch.
3425 async fn send(&self, req: RequestBuilder) -> Result<Response, AdminError> {
3426 let req = match &self.auth {
3427 AdminAuth::None => req,
3428 AdminAuth::Token(tok) => {
3429 let value = format!("Bearer {tok}");
3430 let mut headers = HeaderMap::new();
3431 let header_value = HeaderValue::from_str(&value)
3432 .map_err(|err| AdminError::Builder(format!("invalid bearer token: {err}")))?;
3433 headers.insert(AUTHORIZATION, header_value);
3434 req.headers(headers)
3435 }
3436 };
3437 Ok(req.send().await?)
3438 }
3439}
3440
3441/// Pulsar Functions configuration — the subset of Java's
3442/// `org.apache.pulsar.common.functions.FunctionConfig` that the URL-based
3443/// `register` / `update` calls actually require. The Java type carries
3444/// ~30 fields (process / k8s runtime tuning, secrets, deadletter
3445/// topics, …); we expose the load-bearing ones an operator running a
3446/// pre-compiled JAR needs. Unknown fields on the wire are tolerated by
3447/// the broker, so adding extra knobs is an additive change later.
3448///
3449/// `tenant` / `namespace` / `name` are duplicated in the body because
3450/// the broker re-validates them against the URL path (Java's
3451/// `WorkerUtils.validateFunctionName`). The CLI fills these in from the
3452/// fully qualified name the operator passes on the command line.
3453///
3454/// `runtime` is the string `"JAVA"`, `"PYTHON"`, or `"GO"` — matches
3455/// Java's `org.apache.pulsar.common.functions.FunctionConfig.Runtime`
3456/// enum serialised by name.
3457#[derive(Debug, Clone, Default, Deserialize, Serialize)]
3458#[serde(default, rename_all = "camelCase")]
3459pub struct FunctionConfig {
3460 /// Owning tenant.
3461 pub tenant: String,
3462 /// Owning namespace (bare name, not `tenant/namespace`).
3463 pub namespace: String,
3464 /// Function name (unique within the namespace).
3465 pub name: String,
3466 /// Fully qualified entry-point class name (`com.acme.MyFunction`
3467 /// for Java; `module.fn` for Python; `main` for Go).
3468 #[serde(rename = "className")]
3469 pub class_name: String,
3470 /// Input topics the function subscribes to.
3471 pub inputs: Vec<String>,
3472 /// Output topic the function produces to. Empty when the function
3473 /// has no output sink.
3474 pub output: String,
3475 /// Runtime — `"JAVA"`, `"PYTHON"`, or `"GO"`.
3476 pub runtime: String,
3477 /// Number of parallel instances the worker schedules.
3478 pub parallelism: i32,
3479 /// Optional opaque user-config map passed to the function's
3480 /// `Context#getUserConfigValue`. JSON object on the wire.
3481 #[serde(rename = "userConfig", skip_serializing_if = "Option::is_none")]
3482 pub user_config: Option<serde_json::Value>,
3483}
3484
3485/// Tenant policy info — admin roles and allowed clusters.
3486///
3487/// Mirrors Java's `org.apache.pulsar.common.policies.data.TenantInfoImpl` —
3488/// the JSON keys (`adminRoles`, `allowedClusters`) match upstream verbatim.
3489#[derive(Debug, Clone, Default, Serialize, Deserialize)]
3490pub struct TenantInfo {
3491 /// Roles permitted to administrate the tenant.
3492 #[serde(rename = "adminRoles")]
3493 pub admin_roles: Vec<String>,
3494 /// Cluster names the tenant may use.
3495 #[serde(rename = "allowedClusters")]
3496 pub allowed_clusters: Vec<String>,
3497}
3498
3499/// Wire shape of the PIP-415 `getMessageIdByIndex` response.
3500///
3501/// Mirrors Java's `MessageIdImpl` JSON shape (Jackson default property-name
3502/// serialisation): `{ledgerId, entryId, partitionIndex}`. See
3503/// `pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java`.
3504///
3505/// Kept as a deserialise-only DTO and converted into
3506/// [`magnetar_proto::MessageId`] at the boundary so callers do not see this
3507/// wire detail. Not exposed publicly.
3508#[derive(Debug, Clone, Deserialize)]
3509#[serde(rename_all = "camelCase")]
3510struct MessageIdResponse {
3511 ledger_id: i64,
3512 entry_id: i64,
3513 #[serde(default = "default_partition_index")]
3514 partition_index: i32,
3515}
3516
3517fn default_partition_index() -> i32 {
3518 -1
3519}
3520
3521impl MessageIdResponse {
3522 /// Convert the REST response into the canonical [`MessageId`]. The broker
3523 /// resolves at entry granularity, so `batch_index` / `batch_size` are not
3524 /// part of the JSON — they default to `-1` (the same sentinel
3525 /// `MessageId::from_pb` uses for `MessageIdData` without batch fields).
3526 ///
3527 /// Returns `AdminError::Protocol` if the broker emits a negative
3528 /// `ledgerId` or `entryId` — both fields are `u64` in the canonical type
3529 /// (matching the proto wire format) and Java's `MessageIdImpl` cannot
3530 /// represent negative values either, so a negative wire value is a
3531 /// broker bug we must surface rather than silently wrap.
3532 fn try_into_message_id(self) -> Result<MessageId, AdminError> {
3533 let ledger_id = u64::try_from(self.ledger_id).map_err(|_| {
3534 AdminError::Protocol(format!("negative ledgerId from broker: {}", self.ledger_id))
3535 })?;
3536 let entry_id = u64::try_from(self.entry_id).map_err(|_| {
3537 AdminError::Protocol(format!("negative entryId from broker: {}", self.entry_id))
3538 })?;
3539 Ok(MessageId {
3540 ledger_id,
3541 entry_id,
3542 partition: self.partition_index,
3543 batch_index: -1,
3544 batch_size: -1,
3545 // PIP-460 (ADR-0031): admin REST never resolves a scalable
3546 // segment id; the field only exists under `scalable-topics`.
3547 #[cfg(feature = "scalable-topics")]
3548 segment_id: None,
3549 })
3550 }
3551}
3552
3553/// Topic stats. Intentionally permissive: the Java
3554/// `PersistentTopicStatsImpl` shape is large and shifts between releases;
3555/// we extract the high-signal counters and pass the rest through as raw JSON.
3556#[derive(Debug, Clone, Default, Deserialize)]
3557#[serde(default)]
3558pub struct TopicStats {
3559 /// Total messages received.
3560 #[serde(rename = "msgInCounter")]
3561 pub msg_in_counter: i64,
3562 /// Total bytes received.
3563 #[serde(rename = "bytesInCounter")]
3564 pub bytes_in_counter: i64,
3565 /// Publishers, raw JSON because the schema is large and version-dependent.
3566 pub publishers: Vec<serde_json::Value>,
3567 /// Subscriptions map (raw JSON).
3568 pub subscriptions: serde_json::Value,
3569}
3570
3571/// Partitioned-topic metadata, as returned by
3572/// `GET /admin/v2/persistent/{tenant}/{namespace}/{topic}/partitions`.
3573/// Java: `org.apache.pulsar.common.partition.PartitionedTopicMetadata`.
3574/// Only the partition count is consumed; broker-side extensions are ignored.
3575#[derive(Debug, Clone, Default, Deserialize)]
3576#[serde(default)]
3577struct PartitionedTopicMetadata {
3578 partitions: u32,
3579}
3580
3581/// Java `RetentionPolicies` — namespace-level retention policy. `-1` for
3582/// either dimension means infinite. The broker applies whichever quota
3583/// becomes binding first (time OR size).
3584#[derive(Debug, Clone, Copy, Default, Deserialize, Serialize)]
3585#[serde(default, rename_all = "camelCase")]
3586pub struct RetentionPolicies {
3587 /// Maximum retention time in minutes. `-1` = infinite, `0` = none.
3588 pub retention_time_in_minutes: i32,
3589 /// Maximum retention size in megabytes. `-1` = infinite, `0` = none.
3590 #[serde(rename = "retentionSizeInMB")]
3591 pub retention_size_in_mb: i64,
3592}
3593
3594/// Java `PersistencePolicies` — namespace-level `BookKeeper` layout +
3595/// managed-ledger write-shaping knobs. Maps to the broker's
3596/// `org.apache.pulsar.common.policies.data.PersistencePolicies`.
3597///
3598/// `Default::default()` returns the broker's documented default for a
3599/// new namespace (`2/2/2/0.0`), NOT all zeros — the broker rejects
3600/// ensemble values < 1 on `set`, so an all-zero policy is unusable
3601/// for a round-trip. Missing fields on decode default the same way:
3602/// a partial body where the broker omits one field round-trips with
3603/// the legal default, never with the illegal `0`.
3604#[derive(Debug, Clone, Copy, Deserialize, Serialize)]
3605#[serde(rename_all = "camelCase")]
3606pub struct PersistencePolicies {
3607 /// `BookKeeper` ensemble size — the number of bookies the managed
3608 /// ledger striping is spread across. Default: 2.
3609 #[serde(default = "default_bookkeeper_quorum")]
3610 pub bookkeeper_ensemble: i32,
3611 /// `BookKeeper` write quorum — the number of bookies each entry is
3612 /// written to. Default: 2.
3613 #[serde(default = "default_bookkeeper_quorum")]
3614 pub bookkeeper_write_quorum: i32,
3615 /// `BookKeeper` ack quorum — the number of acks required before an
3616 /// add is considered durable. Default: 2.
3617 #[serde(default = "default_bookkeeper_quorum")]
3618 pub bookkeeper_ack_quorum: i32,
3619 /// Managed-ledger mark-delete-rate cap (ops/sec). `0.0` disables
3620 /// the throttle. Default: 0.0 (disabled).
3621 #[serde(default)]
3622 pub managed_ledger_max_mark_delete_rate: f64,
3623}
3624
3625impl Default for PersistencePolicies {
3626 fn default() -> Self {
3627 Self {
3628 bookkeeper_ensemble: 2,
3629 bookkeeper_write_quorum: 2,
3630 bookkeeper_ack_quorum: 2,
3631 managed_ledger_max_mark_delete_rate: 0.0,
3632 }
3633 }
3634}
3635
3636#[inline]
3637fn default_bookkeeper_quorum() -> i32 {
3638 2
3639}
3640
3641/// Java `DispatchRate` — a sliding-window throttle (msg/sec + byte/sec
3642/// over a `ratePeriodInSecond` window). Shared shape between the
3643/// per-namespace consumer dispatch rate, the per-subscription dispatch
3644/// rate, and the cross-cluster replicator dispatch rate.
3645///
3646/// `-1` on either rate dimension disables that axis of the throttle —
3647/// missing fields default to `-1` (not `0`) so a broker-omitted
3648/// dimension round-trips as "no throttle", never as "throttle to
3649/// zero" (which would block consumer dispatch on the namespace).
3650#[derive(Debug, Clone, Copy, Deserialize, Serialize)]
3651#[serde(rename_all = "camelCase")]
3652pub struct DispatchRate {
3653 /// Throttle in messages/sec. `-1` = unlimited.
3654 #[serde(default = "neg_one_i32")]
3655 pub dispatch_throttling_rate_in_msg: i32,
3656 /// Throttle in bytes/sec. `-1` = unlimited.
3657 #[serde(default = "neg_one_i64")]
3658 pub dispatch_throttling_rate_in_byte: i64,
3659 /// Window size in seconds the throttle averages over.
3660 #[serde(default = "default_rate_period_seconds")]
3661 pub rate_period_in_second: i32,
3662 /// If `true`, dispatch rate is interpreted as an addend on top of
3663 /// the namespace publish rate rather than an absolute cap.
3664 #[serde(default)]
3665 pub relative_to_publish_rate: bool,
3666}
3667
3668impl Default for DispatchRate {
3669 fn default() -> Self {
3670 Self {
3671 dispatch_throttling_rate_in_msg: -1,
3672 dispatch_throttling_rate_in_byte: -1,
3673 rate_period_in_second: 1,
3674 relative_to_publish_rate: false,
3675 }
3676 }
3677}
3678
3679/// Java `PublishRate` — producer-side throttle (msg/sec + byte/sec).
3680/// `-1` on either dimension disables that axis of the throttle.
3681/// Missing fields default to `-1` (not `0`) — same sentinel semantics
3682/// as [`DispatchRate`]. Unlike `DispatchRate`, there is no
3683/// rate-period field; the broker uses a fixed 1-second window.
3684#[derive(Debug, Clone, Copy, Deserialize, Serialize)]
3685#[serde(rename_all = "camelCase")]
3686pub struct PublishRate {
3687 /// Throttle in messages/sec. `-1` = unlimited.
3688 #[serde(default = "neg_one_i32")]
3689 pub publish_throttling_rate_in_msg: i32,
3690 /// Throttle in bytes/sec. `-1` = unlimited.
3691 #[serde(default = "neg_one_i64")]
3692 pub publish_throttling_rate_in_byte: i64,
3693}
3694
3695impl Default for PublishRate {
3696 fn default() -> Self {
3697 Self {
3698 publish_throttling_rate_in_msg: -1,
3699 publish_throttling_rate_in_byte: -1,
3700 }
3701 }
3702}
3703
3704#[inline]
3705fn neg_one_i32() -> i32 {
3706 -1
3707}
3708#[inline]
3709fn neg_one_i64() -> i64 {
3710 -1
3711}
3712#[inline]
3713fn default_rate_period_seconds() -> i32 {
3714 1
3715}
3716
3717/// Java `DelayedDeliveryPolicies` — namespace-level switch + index-tick
3718/// granularity for the broker's delayed-message delivery tracker.
3719/// Maps to `org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies`.
3720/// `tick_time` controls how often the broker's delay-index buckets are
3721/// re-evaluated; smaller values give tighter delivery accuracy at a
3722/// higher tracker cost.
3723///
3724/// The Java field name is `tickTime` (carrying a `@JsonProperty("tickTime")`
3725/// annotation), **not** `tickTimeMillis`. The unit is documented as
3726/// milliseconds (see the upstream class doc), but the wire key drops
3727/// the unit suffix — Jackson on the broker only binds the literal
3728/// `tickTime`.
3729#[derive(Debug, Clone, Copy, Default, Deserialize, Serialize)]
3730#[serde(default, rename_all = "camelCase")]
3731pub struct DelayedDeliveryPolicies {
3732 /// Whether delayed delivery is enabled for the namespace.
3733 pub active: bool,
3734 /// Index-tick granularity in milliseconds. Wire key `tickTime`.
3735 #[serde(rename = "tickTime")]
3736 pub tick_time_millis: i64,
3737}
3738
3739/// Java `BacklogQuota` — one entry in the namespace-level backlog quota
3740/// map. `policy` is a string (`producer_request_hold`,
3741/// `producer_exception`, `consumer_backlog_eviction`) rather than a
3742/// closed Rust enum so new broker enum values forward-decode cleanly.
3743///
3744/// `-1` on either limit dimension disables that axis — missing fields
3745/// default to `-1` (not `0`) so a broker-omitted dimension round-trips
3746/// as "no quota", never as "expire-everything" or "block-everything".
3747#[derive(Debug, Clone, Deserialize, Serialize)]
3748#[serde(rename_all = "camelCase")]
3749pub struct BacklogQuota {
3750 /// Maximum allowed backlog in bytes (when type=`destination_storage`).
3751 /// `-1` = unlimited.
3752 #[serde(default = "neg_one_i64")]
3753 pub limit_size: i64,
3754 /// Maximum allowed backlog age in seconds (when type=`message_age`).
3755 /// `-1` = unlimited.
3756 #[serde(default = "neg_one_i32")]
3757 pub limit_time: i32,
3758 /// Action when the quota is exceeded.
3759 #[serde(default)]
3760 pub policy: String,
3761}
3762
3763impl Default for BacklogQuota {
3764 fn default() -> Self {
3765 Self {
3766 limit_size: -1,
3767 limit_time: -1,
3768 policy: String::new(),
3769 }
3770 }
3771}
3772
3773/// Java `BookieInfo` — a single bookie's rack assignment, as stored in
3774/// the `racks-info` metadata path and shipped on
3775/// [`AdminClient::bookies_set_rack`]. Field names are camelCase on the
3776/// wire (matching `org.apache.pulsar.common.policies.data.BookieInfo`,
3777/// which carries only `rack` and `hostname`).
3778///
3779/// The placement group is **not** part of this body — Pulsar's
3780/// `BookiesBase#updateBookieRackInfo` exposes `group` as a
3781/// `@QueryParam`, and the JSON body Jackson-binds only to
3782/// `{rack, hostname}`. Treating it as a body field silently drops the
3783/// operator's group choice on the wire.
3784#[derive(Debug, Clone, Default, Deserialize, Serialize)]
3785#[serde(default, rename_all = "camelCase")]
3786pub struct BookieInfo {
3787 /// Rack identifier within the group — opaque to the broker, only
3788 /// the placement policy cares about it.
3789 pub rack: String,
3790 /// Resolved hostname for the bookie. The broker uses it for
3791 /// log lines; it does not have to match DNS.
3792 pub hostname: String,
3793}
3794
3795/// Java `PostSchemaPayload` — the request body for
3796/// [`AdminClient::schema_post`] and
3797/// [`AdminClient::schema_compatibility_check`]. The Java DTO has
3798/// (`type`, `schema`, `properties`); both keys travel as-is on the wire.
3799/// `schema` is the canonical-form blob for AVRO / JSON / PROTOBUF and
3800/// the protobuf descriptor for `PROTOBUF_NATIVE`.
3801#[derive(Debug, Clone, Default, Deserialize, Serialize)]
3802#[serde(default)]
3803pub struct PostSchemaPayload {
3804 /// Schema type (`AVRO` / `JSON` / `PROTOBUF` / `PROTOBUF_NATIVE` /
3805 /// `KEY_VALUE` / `STRING` / `BYTES` / ...).
3806 #[serde(rename = "type")]
3807 pub schema_type: String,
3808 /// Schema definition, encoded per the type axis.
3809 pub schema: String,
3810 /// User-defined per-schema properties.
3811 pub properties: std::collections::HashMap<String, String>,
3812}
3813
3814/// Java `SourceConfig` — declarative description of a Pulsar IO
3815/// Source. Mirrors `org.apache.pulsar.common.io.SourceConfig` (Jackson
3816/// camelCase on the wire). Only the fields the JAX-RS `create` /
3817/// `update` paths require are typed; per-connector knobs ride along in
3818/// the open-ended `configs` map so a forward broker can add fields
3819/// without a magnetar release.
3820#[derive(Debug, Clone, Default, Deserialize, Serialize)]
3821#[serde(default, rename_all = "camelCase")]
3822pub struct SourceConfig {
3823 /// Tenant owning the source. Must match the URL path tenant.
3824 pub tenant: String,
3825 /// Namespace owning the source. Must match the URL path namespace.
3826 pub namespace: String,
3827 /// Source name. Must match the URL path name.
3828 pub name: String,
3829 /// Fully-qualified connector class (e.g.
3830 /// `org.apache.pulsar.io.kafka.KafkaSource`).
3831 pub class_name: String,
3832 /// Destination topic the source writes to.
3833 pub topic_name: String,
3834 /// Number of source instances to schedule.
3835 pub parallelism: i32,
3836 /// Connector-specific configuration map. Skipped from JSON when
3837 /// `None` so a `null` does not override the broker default.
3838 #[serde(skip_serializing_if = "Option::is_none")]
3839 pub configs: Option<serde_json::Value>,
3840}
3841
3842/// Java `SinkConfig` — declarative description of a Pulsar IO Sink.
3843/// Mirrors `org.apache.pulsar.common.io.SinkConfig` (Jackson camelCase
3844/// on the wire). The `inputs` slot is the list of source topics the
3845/// sink reads from — the broker accepts either fully-qualified topic
3846/// names or `tenant/namespace/topic` shorthand. Per-connector knobs
3847/// ride in `configs` for the same forward-compat reason as
3848/// [`SourceConfig`].
3849#[derive(Debug, Clone, Default, Deserialize, Serialize)]
3850#[serde(default, rename_all = "camelCase")]
3851pub struct SinkConfig {
3852 /// Tenant owning the sink. Must match the URL path tenant.
3853 pub tenant: String,
3854 /// Namespace owning the sink. Must match the URL path namespace.
3855 pub namespace: String,
3856 /// Sink name. Must match the URL path name.
3857 pub name: String,
3858 /// Fully-qualified connector class (e.g.
3859 /// `org.apache.pulsar.io.jdbc.PostgresJdbcAutoSchemaSink`).
3860 pub class_name: String,
3861 /// Source topics the sink subscribes to.
3862 pub inputs: Vec<String>,
3863 /// Number of sink instances to schedule.
3864 pub parallelism: i32,
3865 /// Connector-specific configuration map. Skipped from JSON when
3866 /// `None` so a `null` does not override the broker default.
3867 #[serde(skip_serializing_if = "Option::is_none")]
3868 pub configs: Option<serde_json::Value>,
3869}
3870
3871/// Pulsar Packages namespace dimension — the `{type}` segment of the
3872/// `/admin/v3/packages/{type}/...` URL. Maps to upstream's
3873/// `PackageType` enum
3874/// (`pulsar-packages-management/pulsar-packages-management-core/.../
3875/// PackageType.java`): the broker only accepts the three lowercase
3876/// tokens `function`, `source`, `sink` and rejects everything else
3877/// with 400. Modelled as a closed Rust enum so the URL builder
3878/// cannot emit a value the broker will refuse.
3879#[derive(Debug, Clone, Copy, PartialEq, Eq)]
3880pub enum PackageType {
3881 /// `function` — Pulsar Functions JAR.
3882 Function,
3883 /// `source` — Pulsar IO Source NAR.
3884 Source,
3885 /// `sink` — Pulsar IO Sink NAR.
3886 Sink,
3887}
3888
3889impl PackageType {
3890 /// Render as the lowercase token the broker URL surface expects.
3891 #[must_use]
3892 pub fn as_str(self) -> &'static str {
3893 match self {
3894 Self::Function => "function",
3895 Self::Source => "source",
3896 Self::Sink => "sink",
3897 }
3898 }
3899}
3900
3901impl std::str::FromStr for PackageType {
3902 type Err = AdminError;
3903
3904 /// Parse from the lowercase tokens the broker emits (`function` /
3905 /// `source` / `sink`). Hyphenated aliases are accepted to make the
3906 /// CLI feel idiomatic — `package-type=source` vs the broker's
3907 /// `source` are equivalent.
3908 fn from_str(s: &str) -> Result<Self, AdminError> {
3909 match s.to_ascii_lowercase().as_str() {
3910 "function" | "functions" => Ok(Self::Function),
3911 "source" | "sources" => Ok(Self::Source),
3912 "sink" | "sinks" => Ok(Self::Sink),
3913 other => Err(AdminError::InvalidName(format!(
3914 "unknown package type {other:?} (expected: function | source | sink)"
3915 ))),
3916 }
3917 }
3918}
3919
3920/// Java `PackageMetadata` — the metadata envelope Pulsar Packages
3921/// attaches to each `(type, tenant, namespace, name, version)` tuple.
3922/// Mirrors `org.apache.pulsar.packages.management.core.common.PackageMetadata`
3923/// (Jackson camelCase on the wire). `modification_time` is a
3924/// broker-side timestamp in milliseconds-since-epoch — the broker
3925/// emits it on `GET` and ignores caller-supplied values on `PUT`
3926/// (overwriting them with the receive timestamp).
3927#[derive(Debug, Clone, Default, Deserialize, Serialize)]
3928#[serde(default, rename_all = "camelCase")]
3929pub struct PackageMetadata {
3930 /// Free-form package description.
3931 pub description: String,
3932 /// Maintainer contact (typically an email or team handle).
3933 pub contact: String,
3934 /// Last-modification timestamp in ms-since-epoch. Read-only for
3935 /// callers — the broker overwrites the value on `PUT`.
3936 pub modification_time: i64,
3937 /// Arbitrary key/value labels (release notes, CI ids, etc.).
3938 pub properties: std::collections::HashMap<String, String>,
3939}
3940
3941/// Java `BacklogQuotaType` — selects which dimension a `BacklogQuota`
3942/// entry limits.
3943#[derive(Debug, Clone, Copy, PartialEq, Eq)]
3944pub enum BacklogQuotaType {
3945 /// Bytes-on-disk dimension. Uses `BacklogQuota::limit_size`.
3946 DestinationStorage,
3947 /// Message-age dimension. Uses `BacklogQuota::limit_time`.
3948 MessageAge,
3949}
3950
3951impl BacklogQuotaType {
3952 /// Render as the lowercase `snake_case` value the broker REST surface
3953 /// expects in the `backlogQuotaType` query parameter.
3954 #[must_use]
3955 pub fn as_query_value(self) -> &'static str {
3956 match self {
3957 Self::DestinationStorage => "destination_storage",
3958 Self::MessageAge => "message_age",
3959 }
3960 }
3961}
3962
3963/// Java `LongRunningProcessStatus` — the polling shape for triggered
3964/// background jobs (compaction, offload). The broker returns one of four
3965/// `status` values: `NOT_RUN` (never triggered), `RUNNING`, `SUCCESS`,
3966/// `ERROR`. `last_error` is populated only on `ERROR`.
3967#[derive(Debug, Clone, Default, Deserialize, Serialize)]
3968#[serde(default, rename_all = "camelCase")]
3969pub struct LongRunningProcessStatus {
3970 /// Job state — `NOT_RUN`, `RUNNING`, `SUCCESS`, or `ERROR`.
3971 pub status: String,
3972 /// Human-readable error message, present on `ERROR`.
3973 pub last_error: String,
3974}
3975
3976/// Request body for `POST .../subscription/{sub}/resetcursor` (Java
3977/// `ResetCursorData`). The CLI exposes `message_id` and `is_excluded`;
3978/// Pulsar's `batchIndexes` / `properties` fields are not currently set
3979/// — they exist for transactional dedup metadata and would require
3980/// txn-aware callers anyway.
3981///
3982/// `ledger_id` / `entry_id` are `i64` on the wire because Pulsar
3983/// Jackson-binds them to Java `long`. `MessageId` on the Rust side uses
3984/// `u64` (matching the wire-protocol envelope), with `u64::MAX` as the
3985/// `EARLIEST` / `LATEST` sentinel — the conversion below maps those
3986/// sentinels to `-1` (the Java sentinel) so a reset-to-earliest /
3987/// reset-to-latest doesn't overflow Java's `long` parser.
3988#[derive(Debug, Serialize)]
3989#[serde(rename_all = "camelCase")]
3990struct ResetCursorData {
3991 ledger_id: i64,
3992 entry_id: i64,
3993 partition_index: i32,
3994 batch_index: i32,
3995 #[serde(rename = "isExcluded")]
3996 is_excluded: bool,
3997}
3998
3999/// Map a `MessageId` u64 ledger/entry id onto Pulsar's Java-`long`
4000/// wire field, translating the Rust-side `u64::MAX` sentinel (used by
4001/// `MessageId::EARLIEST` / `LATEST`) to Java's `-1` sentinel.
4002/// Non-sentinel values are passed through verbatim — Pulsar's
4003/// `LedgerHandle` / `EntryId` indices fit in `i64::MAX` long before
4004/// overflowing.
4005#[inline]
4006fn message_id_field_for_wire(value: u64) -> i64 {
4007 if value == u64::MAX {
4008 -1
4009 } else {
4010 // Pulsar entry indices fit in i63 — `as i64` cannot overflow
4011 // for any legitimate broker-emitted value.
4012 value as i64
4013 }
4014}
4015
4016/// Builder for [`AdminClient`].
4017#[derive(Debug, Default)]
4018pub struct AdminClientBuilder {
4019 base_url: Option<Url>,
4020 auth: AdminAuth,
4021 timeout: Option<Duration>,
4022}
4023
4024impl AdminClientBuilder {
4025 /// Set the service URL — the base for `/admin/v2/...`. Required.
4026 #[must_use]
4027 pub fn service_url(mut self, url: Url) -> Self {
4028 self.base_url = Some(url);
4029 self
4030 }
4031
4032 /// Configure bearer-token auth (`Authorization: Bearer <token>`).
4033 #[must_use]
4034 pub fn token(mut self, token: String) -> Self {
4035 self.auth = AdminAuth::Token(token);
4036 self
4037 }
4038
4039 /// Override the request timeout. Defaults to [`DEFAULT_TIMEOUT`].
4040 #[must_use]
4041 pub fn timeout(mut self, dur: Duration) -> Self {
4042 self.timeout = Some(dur);
4043 self
4044 }
4045
4046 /// Build the client.
4047 pub fn build(self) -> Result<AdminClient, AdminError> {
4048 let base_url = self
4049 .base_url
4050 .ok_or_else(|| AdminError::Builder("service_url is required".into()))?;
4051 if base_url.cannot_be_a_base() {
4052 return Err(AdminError::Builder(format!(
4053 "service_url cannot be a base url: {base_url}"
4054 )));
4055 }
4056 // Anchor every V2 API call below `/admin/v2/` and every V3 call
4057 // below `/admin/v3/`. We append the suffix here so callers pass
4058 // plain `http://broker:8080` rather than baking either prefix in.
4059 //
4060 // `Url::join` follows WHATWG semantics: if `base_url` has no
4061 // trailing slash, its last path segment is replaceable. So
4062 // `http://broker/pulsar` + `admin/v2/` would yield
4063 // `http://broker/admin/v2/` — the `pulsar` prefix silently
4064 // dropped (common for path-prefixed K8s ingresses). Normalise
4065 // the base to end with `/` first so the join always appends.
4066 let base_url = {
4067 let mut b = base_url.clone();
4068 if !b.path().ends_with('/') {
4069 b.set_path(&format!("{}/", b.path()));
4070 }
4071 b
4072 };
4073 let base_url_v3 = base_url.join("admin/v3/")?;
4074 let base_url = base_url.join("admin/v2/")?;
4075
4076 // reqwest 0.13 panics in `Client::builder().build()` when the active
4077 // `rustls` flavor is `rustls-no-provider` and no global
4078 // `CryptoProvider` is installed. That happens whenever more than one
4079 // `crypto-*` feature is unified (e.g. default `crypto-aws-lc-rs`
4080 // plus an explicit `crypto-ring`), so install the default here —
4081 // the shim is idempotent and a no-op once a provider is set, which
4082 // covers parallel callers and processes that also boot the tokio
4083 // engine.
4084 tls_crypto::install_default_provider();
4085
4086 let timeout = self.timeout.unwrap_or(DEFAULT_TIMEOUT);
4087 let http = reqwest::Client::builder()
4088 .timeout(timeout)
4089 .build()
4090 .map_err(AdminError::Http)?;
4091
4092 Ok(AdminClient {
4093 base_url,
4094 base_url_v3,
4095 http,
4096 auth: self.auth,
4097 })
4098 }
4099}
4100
4101/// Errors returned by the admin client.
4102#[derive(Debug, thiserror::Error)]
4103pub enum AdminError {
4104 /// Transport-layer error from `reqwest`.
4105 #[error("http error: {0}")]
4106 Http(#[from] reqwest::Error),
4107 /// API returned a non-success HTTP status.
4108 #[error("api error {code}: {body}")]
4109 Status {
4110 /// HTTP status code.
4111 code: u16,
4112 /// Response body (or a placeholder if reading the body failed).
4113 body: String,
4114 },
4115 /// JSON decode error.
4116 #[error("json decode: {0}")]
4117 Json(#[from] serde_json::Error),
4118 /// URL parse / construction error.
4119 #[error("invalid url: {0}")]
4120 Url(#[from] url::ParseError),
4121 /// Builder configuration error (missing service URL, invalid argument...).
4122 #[error("invalid builder: {0}")]
4123 Builder(String),
4124 /// Caller passed a namespace or topic name that the client could not parse.
4125 #[error("invalid name: {0}")]
4126 InvalidName(String),
4127 /// Broker returned a response that violates the documented wire contract
4128 /// (e.g. negative `ledgerId` from `getMessageIdByIndex`, which Java
4129 /// `MessageIdImpl` cannot represent either).
4130 #[error("broker protocol violation: {0}")]
4131 Protocol(String),
4132}
4133
4134/// Decode a non-error JSON response body.
4135async fn json_ok<T>(resp: Response) -> Result<T, AdminError>
4136where
4137 T: for<'de> Deserialize<'de>,
4138{
4139 let resp = ensure_status(resp).await?;
4140 let bytes = resp.bytes().await?;
4141 Ok(serde_json::from_slice(&bytes)?)
4142}
4143
4144/// Decode a JSON response body, defaulting to `T::default()` when the
4145/// broker returns 204 / empty body / literal `null`. Pulsar's
4146/// "policy-unset" pattern collapses to one of those three encodings:
4147///
4148/// - `getRetention` / `getPersistence` / `getDispatchRate` / `getPublishRate` on a fresh namespace
4149/// surface `null` through Jersey's `resume(null)`, which travels as 204 No Content;
4150/// - `getTopicPolicies` on a topic with no explicit policy returns either 204 or the literal JSON
4151/// `null`.
4152///
4153/// `json_ok::<RetentionPolicies>` errors with `EOF while parsing a
4154/// value` for either case; this helper maps them to
4155/// `RetentionPolicies::default()` (broker semantic: missing fields ==
4156/// broker default). Callers asserting "post-remove returns the
4157/// default" stay correct without changing the public signature.
4158async fn json_ok_or_default<T>(resp: Response) -> Result<T, AdminError>
4159where
4160 T: for<'de> Deserialize<'de> + Default,
4161{
4162 let resp = ensure_status(resp).await?;
4163 if resp.status() == StatusCode::NO_CONTENT {
4164 return Ok(T::default());
4165 }
4166 let bytes = resp.bytes().await?;
4167 if bytes.is_empty() {
4168 return Ok(T::default());
4169 }
4170 // `null` body — broker says "policy unset" — also folds to default.
4171 if bytes.as_ref().trim_ascii() == b"null" {
4172 return Ok(T::default());
4173 }
4174 Ok(serde_json::from_slice::<T>(&bytes)?)
4175}
4176
4177/// Decode a JSON response body that the broker may emit as an empty
4178/// payload to mean "no value here". Pulsar's pattern for "no override
4179/// set" on the policy GET endpoints is to return `204 No Content` (or
4180/// a 200 with an empty body) rather than the literal JSON `null`;
4181/// `serde_json::from_slice::<Option<T>>(b"")` errors with `EOF while
4182/// parsing a value`, so every `namespace_get_*` / `topic_get_*` that
4183/// returns `Option<T>` needs this tolerant decoder.
4184///
4185/// Decoding rules:
4186/// - 204 No Content → `Ok(None)`
4187/// - 2xx with empty body bytes → `Ok(None)`
4188/// - 2xx with the literal `null` → `Ok(None)`
4189/// - 2xx with a JSON value → `Ok(Some(value))` via serde
4190async fn json_ok_optional<T>(resp: Response) -> Result<Option<T>, AdminError>
4191where
4192 T: for<'de> Deserialize<'de>,
4193{
4194 let resp = ensure_status(resp).await?;
4195 if resp.status() == StatusCode::NO_CONTENT {
4196 return Ok(None);
4197 }
4198 let bytes = resp.bytes().await?;
4199 if bytes.is_empty() {
4200 return Ok(None);
4201 }
4202 Ok(serde_json::from_slice::<Option<T>>(&bytes)?)
4203}
4204
4205/// Discard a successful no-content response body.
4206async fn empty_ok(resp: Response) -> Result<(), AdminError> {
4207 let _ = ensure_status(resp).await?;
4208 Ok(())
4209}
4210
4211/// Convert a non-success response into [`AdminError::Status`]. Returns the
4212/// original response on 2xx so the caller can decode the body.
4213async fn ensure_status(resp: Response) -> Result<Response, AdminError> {
4214 let status = resp.status();
4215 if status.is_success() || status == StatusCode::NO_CONTENT {
4216 return Ok(resp);
4217 }
4218 let code = status.as_u16();
4219 let body = resp
4220 .text()
4221 .await
4222 .unwrap_or_else(|err| format!("<failed to read body: {err}>"));
4223 Err(AdminError::Status { code, body })
4224}
4225
4226/// Split a `tenant/namespace` string into its two segments.
4227/// Reject path segments the `url` crate would silently rewrite. `.` and `..`
4228/// disappear under RFC 3986 dot-segment normalisation; percent-encoded slash
4229/// (`%2F` / `%2f`) lets a hostile name escape its segment; NUL / ASCII
4230/// control bytes have no place in an admin path. Refusing all of these at
4231/// the input boundary keeps the URL the client builds in lock-step with the
4232/// path the broker eventually parses.
4233fn validate_segment(segment: &str) -> Result<(), AdminError> {
4234 if segment.is_empty() {
4235 return Err(AdminError::InvalidName("empty path segment".into()));
4236 }
4237 if segment == "." || segment == ".." {
4238 return Err(AdminError::InvalidName(format!(
4239 "dot segment is not a valid name: {segment:?}",
4240 )));
4241 }
4242 if segment.contains("%2F") || segment.contains("%2f") {
4243 return Err(AdminError::InvalidName(format!(
4244 "percent-encoded slash in segment: {segment:?}",
4245 )));
4246 }
4247 if segment.bytes().any(|b| b < 0x20 || b == 0x7f) {
4248 return Err(AdminError::InvalidName(format!(
4249 "control byte in segment: {segment:?}",
4250 )));
4251 }
4252 Ok(())
4253}
4254
4255/// Split a `tenant/namespace` string into its two segments.
4256///
4257/// Exposed for the CLI (and any other admin-client wrapper) so the
4258/// `tenant/namespace` shape used by the namespace-scoped list verbs
4259/// (`functions list`, `sources list`, `sinks list`, `packages list`)
4260/// validates against the same `validate_segment` rules every admin
4261/// method enforces internally — no parallel parsers, no divergent
4262/// error categories.
4263pub fn split_namespace(ns: &str) -> Result<(&str, &str), AdminError> {
4264 let (tenant, namespace) = ns.split_once('/').ok_or_else(|| {
4265 AdminError::InvalidName(format!("expected tenant/namespace, got {ns:?} (no '/')"))
4266 })?;
4267 if tenant.is_empty() || namespace.is_empty() || namespace.contains('/') {
4268 return Err(AdminError::InvalidName(format!(
4269 "expected tenant/namespace, got {ns:?}"
4270 )));
4271 }
4272 validate_segment(tenant)?;
4273 validate_segment(namespace)?;
4274 Ok((tenant, namespace))
4275}
4276
4277/// Split a `persistent://tenant/namespace/topic` (or `tenant/namespace/topic`)
4278/// into its three path segments. The scheme is optional; if present it must
4279/// be `persistent://`.
4280fn split_topic(topic: &str) -> Result<(&str, &str, &str), AdminError> {
4281 let rest = topic.strip_prefix("persistent://").unwrap_or(topic);
4282 let mut parts = rest.splitn(3, '/');
4283 let tenant = parts.next().unwrap_or("");
4284 let namespace = parts.next().unwrap_or("");
4285 let name = parts.next().unwrap_or("");
4286 if tenant.is_empty() || namespace.is_empty() || name.is_empty() || name.contains('/') {
4287 return Err(AdminError::InvalidName(format!(
4288 "expected [persistent://]tenant/namespace/topic, got {topic:?}"
4289 )));
4290 }
4291 validate_segment(tenant)?;
4292 validate_segment(namespace)?;
4293 validate_segment(name)?;
4294 Ok((tenant, namespace, name))
4295}
4296
4297/// Build the two-part `multipart/form-data` envelope the broker expects
4298/// on URL-based function register / update calls. Order is fixed (`url`
4299/// then `functionConfig`) so wire-level tests can pin the body shape.
4300/// The `functionConfig` part carries an explicit `application/json`
4301/// content type; without it the broker's Jersey
4302/// `FormDataMultiPartFeature` falls back to `text/plain` and refuses
4303/// the JSON.
4304fn function_pkg_form(
4305 pkg_url: &str,
4306 config: &FunctionConfig,
4307) -> Result<reqwest::multipart::Form, AdminError> {
4308 build_url_config_multipart(pkg_url, "functionConfig", config)
4309}
4310
4311/// Split a `tenant/namespace/name` Functions / IO identifier into its
4312/// three segments. Pulsar Functions never carry a `persistent://`
4313/// scheme prefix (functions are not topics), so the parser is stricter
4314/// than the internal `split_topic` (rustdoc cannot resolve the bare
4315/// identifier because `split_topic` is module-private).
4316///
4317/// Exposed for the CLI, which parses the fully qualified name out of a
4318/// single positional argument before calling the admin methods (which
4319/// take `tenant`, `namespace`, `name` separately so the broker's
4320/// per-segment validation maps 1:1 to the URL path).
4321pub fn split_function_id(id: &str) -> Result<(&str, &str, &str), AdminError> {
4322 let mut parts = id.splitn(3, '/');
4323 let tenant = parts.next().unwrap_or("");
4324 let namespace = parts.next().unwrap_or("");
4325 let name = parts.next().unwrap_or("");
4326 if tenant.is_empty() || namespace.is_empty() || name.is_empty() || name.contains('/') {
4327 return Err(AdminError::InvalidName(format!(
4328 "expected tenant/namespace/name, got {id:?}"
4329 )));
4330 }
4331 validate_segment(tenant)?;
4332 validate_segment(namespace)?;
4333 validate_segment(name)?;
4334 Ok((tenant, namespace, name))
4335}
4336
4337/// Build the two-part `multipart/form-data` body Pulsar IO's
4338/// `register*` / `update*` endpoints expect when the package is
4339/// referenced by URL (`http(s)://`, `file://`, `function://`): a `url`
4340/// text part and a `<config_field>` JSON part. The broker enforces
4341/// both parts at the dispatcher boundary
4342/// (`SourcesBase#registerSource`, `SinksBase#registerSink`) — missing
4343/// either yields 400. Generic over the config type so Functions,
4344/// Sources, and Sinks share one helper (`functionConfig` /
4345/// `sourceConfig` / `sinkConfig` via the `config_field` argument).
4346fn build_url_config_multipart<T: Serialize>(
4347 pkg_url: &str,
4348 config_field: &str,
4349 config: &T,
4350) -> Result<reqwest::multipart::Form, AdminError> {
4351 let body = serde_json::to_string(config)?;
4352 // `mime_str` only fails on a malformed string; the literal we pass
4353 // is well-formed so the `expect` is on a never-taken branch.
4354 let config_part = reqwest::multipart::Part::text(body)
4355 .mime_str("application/json")
4356 .expect("application/json is a well-formed media type");
4357 let form = reqwest::multipart::Form::new()
4358 .text("url", pkg_url.to_owned())
4359 .part(config_field.to_owned(), config_part);
4360 Ok(form)
4361}
4362
4363#[cfg(test)]
4364mod tests {
4365 use super::*;
4366
4367 #[test]
4368 fn builder_requires_service_url() {
4369 let err = AdminClient::builder().build().unwrap_err();
4370 assert!(matches!(err, AdminError::Builder(_)));
4371 }
4372
4373 #[test]
4374 fn builder_appends_admin_v2_prefix() {
4375 let client = AdminClient::builder()
4376 .service_url("http://localhost:8080".parse().unwrap())
4377 .build()
4378 .unwrap();
4379 assert_eq!(
4380 client.base_url().as_str(),
4381 "http://localhost:8080/admin/v2/"
4382 );
4383 }
4384
4385 #[test]
4386 fn builder_carries_token() {
4387 let client = AdminClient::builder()
4388 .service_url("http://localhost:8080".parse().unwrap())
4389 .token("abc".into())
4390 .build()
4391 .unwrap();
4392 assert!(matches!(client.auth(), AdminAuth::Token(t) if t == "abc"));
4393 }
4394
4395 #[test]
4396 fn admin_auth_token_debug_redacts_secret() {
4397 let auth = AdminAuth::Token("super-secret-jwt".to_owned());
4398 let rendered = format!("{auth:?}");
4399 assert!(
4400 !rendered.contains("super-secret-jwt"),
4401 "raw token leaked through Debug: {rendered}",
4402 );
4403 assert!(
4404 rendered.contains("<redacted>"),
4405 "expected redaction sentinel in {rendered}"
4406 );
4407 assert!(
4408 rendered.contains("Token"),
4409 "expected variant name in {rendered}"
4410 );
4411
4412 let none_rendered = format!("{:?}", AdminAuth::None);
4413 assert_eq!(none_rendered, "None");
4414 }
4415
4416 #[test]
4417 fn admin_client_debug_does_not_leak_token() {
4418 let client = AdminClient::builder()
4419 .service_url("http://localhost:8080".parse().unwrap())
4420 .token("leaky-token".into())
4421 .build()
4422 .unwrap();
4423 let rendered = format!("{client:?}");
4424 assert!(
4425 !rendered.contains("leaky-token"),
4426 "raw token leaked through AdminClient Debug: {rendered}",
4427 );
4428 }
4429
4430 #[test]
4431 fn split_namespace_ok() {
4432 assert_eq!(
4433 split_namespace("public/default").unwrap(),
4434 ("public", "default")
4435 );
4436 }
4437
4438 #[test]
4439 fn split_namespace_rejects_missing_slash() {
4440 assert!(matches!(
4441 split_namespace("public"),
4442 Err(AdminError::InvalidName(_))
4443 ));
4444 }
4445
4446 #[test]
4447 fn split_namespace_rejects_extra_segment() {
4448 assert!(matches!(
4449 split_namespace("public/default/extra"),
4450 Err(AdminError::InvalidName(_))
4451 ));
4452 }
4453
4454 #[test]
4455 fn split_topic_with_scheme() {
4456 let (t, n, name) = split_topic("persistent://acme/svc/orders").unwrap();
4457 assert_eq!((t, n, name), ("acme", "svc", "orders"));
4458 }
4459
4460 #[test]
4461 fn split_topic_without_scheme() {
4462 let (t, n, name) = split_topic("acme/svc/orders").unwrap();
4463 assert_eq!((t, n, name), ("acme", "svc", "orders"));
4464 }
4465
4466 #[test]
4467 fn split_topic_rejects_short_name() {
4468 assert!(matches!(
4469 split_topic("acme/svc"),
4470 Err(AdminError::InvalidName(_))
4471 ));
4472 }
4473
4474 #[test]
4475 fn split_function_id_ok() {
4476 let (t, n, name) = split_function_id("public/default/my-fn").unwrap();
4477 assert_eq!((t, n, name), ("public", "default", "my-fn"));
4478 }
4479
4480 #[test]
4481 fn split_function_id_rejects_short_name() {
4482 assert!(matches!(
4483 split_function_id("public/default"),
4484 Err(AdminError::InvalidName(_))
4485 ));
4486 }
4487
4488 #[test]
4489 fn split_function_id_rejects_persistent_scheme() {
4490 // Functions never carry a `persistent://` prefix — the parser
4491 // refuses one rather than silently treating the scheme as the
4492 // tenant name (which `validate_segment` would later catch via
4493 // the percent-encoded slash rule, but we want a clearer error
4494 // up front).
4495 assert!(matches!(
4496 split_function_id("persistent://acme/svc/fn"),
4497 Err(AdminError::InvalidName(_))
4498 ));
4499 }
4500
4501 #[test]
4502 fn message_id_response_deserialises_java_camelcase() {
4503 // The exact body shape upstream PIP-415 §"Success Response" advertises.
4504 let json = r#"{"ledgerId":12345,"entryId":67890,"partitionIndex":0}"#;
4505 let dto: MessageIdResponse = serde_json::from_str(json).unwrap();
4506 let msg = dto.try_into_message_id().unwrap();
4507 assert_eq!(msg.ledger_id, 12345);
4508 assert_eq!(msg.entry_id, 67890);
4509 assert_eq!(msg.partition, 0);
4510 // The broker resolves at entry granularity — batch fields are absent
4511 // from the JSON and must default to -1 to match the canonical sentinel.
4512 assert_eq!(msg.batch_index, -1);
4513 assert_eq!(msg.batch_size, -1);
4514 }
4515
4516 #[test]
4517 fn message_id_response_defaults_partition_for_non_partitioned_topic() {
4518 // PIP-415 §"Success Response": `partitionIndex: -1` for non-partitioned
4519 // topics. Some broker versions omit the field entirely on
4520 // non-partitioned topics; serde default keeps us correct in either case.
4521 let json = r#"{"ledgerId":1,"entryId":2}"#;
4522 let dto: MessageIdResponse = serde_json::from_str(json).unwrap();
4523 assert_eq!(dto.try_into_message_id().unwrap().partition, -1);
4524 }
4525
4526 #[test]
4527 fn url_helper_emits_single_slash_after_admin_v2() {
4528 // Regression guard: the previous url() helper appended segments after
4529 // the trailing-slash sentinel of /admin/v2/, producing
4530 // /admin/v2//persistent/... — real brokers tolerated it but strict
4531 // mocks (and Java's PulsarAdmin) emit the single-slash form. Pin the
4532 // current behaviour so we notice any future regression.
4533 let client = AdminClient::builder()
4534 .service_url("http://broker.example:8080".parse().unwrap())
4535 .build()
4536 .unwrap();
4537 let url = client.url(&["clusters"]).unwrap();
4538 assert_eq!(url.as_str(), "http://broker.example:8080/admin/v2/clusters");
4539 let url2 = client
4540 .url(&["persistent", "public", "default", "topic", "stats"])
4541 .unwrap();
4542 assert_eq!(
4543 url2.as_str(),
4544 "http://broker.example:8080/admin/v2/persistent/public/default/topic/stats"
4545 );
4546 }
4547
4548 #[test]
4549 fn url_helper_preserves_path_prefix_without_trailing_slash() {
4550 // Regression guard: per WHATWG URL semantics, `Url::join("admin/v2/")`
4551 // on a base whose path has no trailing slash REPLACES the last segment
4552 // — `http://broker/pulsar` + `admin/v2/` becomes
4553 // `http://broker/admin/v2/`, silently dropping `/pulsar`. The builder
4554 // normalises the base to end in `/` before joining so any operator
4555 // running behind a path-prefixed K8s ingress (`--admin-url
4556 // http://gw/pulsar`) gets the right URLs on both V2 and V3 endpoints.
4557 let client = AdminClient::builder()
4558 .service_url("http://broker.example:8080/pulsar".parse().unwrap())
4559 .build()
4560 .unwrap();
4561 let url = client.url(&["clusters"]).unwrap();
4562 assert_eq!(
4563 url.as_str(),
4564 "http://broker.example:8080/pulsar/admin/v2/clusters"
4565 );
4566 let url_v3 = client.url_v3(&["functions", "a", "b"]).unwrap();
4567 assert_eq!(
4568 url_v3.as_str(),
4569 "http://broker.example:8080/pulsar/admin/v3/functions/a/b"
4570 );
4571 }
4572
4573 #[test]
4574 fn split_topic_rejects_dot_segments() {
4575 // LISA-001: `..` / `.` in any segment would silently normalise out via
4576 // url::Url::path_segments_mut, producing a client/server URL parser
4577 // differential. Refuse them at the input boundary.
4578 assert!(matches!(
4579 split_topic("persistent://../foo/bar"),
4580 Err(AdminError::InvalidName(_))
4581 ));
4582 assert!(matches!(
4583 split_topic("./foo/bar"),
4584 Err(AdminError::InvalidName(_))
4585 ));
4586 assert!(matches!(
4587 split_topic("tenant/./topic"),
4588 Err(AdminError::InvalidName(_))
4589 ));
4590 }
4591
4592 #[test]
4593 fn split_topic_rejects_control_bytes_and_percent_encoded_slash() {
4594 assert!(matches!(
4595 split_topic("tenant/ns/topic%2Fevil"),
4596 Err(AdminError::InvalidName(_))
4597 ));
4598 assert!(matches!(
4599 split_topic("tenant/ns/top\0ic"),
4600 Err(AdminError::InvalidName(_))
4601 ));
4602 }
4603}