Skip to main content

magnetar_admin/
lib.rs

1// SPDX-License-Identifier: Apache-2.0
2
3//! Apache Pulsar admin REST client (`/admin/v2/...`).
4//!
5//! Thin async wrapper around [`reqwest`] for the broker's JAX-RS admin API.
6//! TLS is via `rustls-tls`. There are no channels and no background tasks: every
7//! call is a one-shot `await` that resolves to a [`Result`].
8//!
9//! Endpoint paths mirror the broker. Each method's rustdoc cites the Java
10//! endpoint class (file + relevant `@Path` annotation) in `apache/pulsar` so a
11//! reader can confirm the URL and HTTP verb against upstream.
12//!
13//! ## Quick start
14//!
15//! ```no_run
16//! # async fn run() -> Result<(), Box<dyn std::error::Error>> {
17//! use magnetar_admin::{AdminClient, TenantInfo};
18//!
19//! let admin = AdminClient::builder()
20//!     .service_url("http://localhost:8080".parse()?)
21//!     .build()?;
22//!
23//! let tenants = admin.tenants_list().await?;
24//! println!("{tenants:?}");
25//!
26//! admin
27//!     .tenant_create(
28//!         "acme",
29//!         TenantInfo {
30//!             admin_roles: vec!["admin".into()],
31//!             allowed_clusters: vec!["standalone".into()],
32//!         },
33//!     )
34//!     .await?;
35//! # Ok(()) }
36//! ```
37
38#![warn(unreachable_pub)]
39#![forbid(unsafe_code)]
40
41mod tls_crypto;
42
43use std::collections::HashMap;
44use std::sync::Arc;
45use std::time::Duration;
46
47use magnetar_auth_oauth2::ClientCredentialsFlow;
48use magnetar_proto::MessageId;
49use reqwest::header::{AUTHORIZATION, HeaderMap, HeaderValue};
50use reqwest::{Method, RequestBuilder, Response, StatusCode};
51use serde::{Deserialize, Serialize};
52use url::Url;
53
54/// Default request timeout. Mirrors `PulsarAdminBuilder` Java default of 60s
55/// (see `pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/
56/// PulsarAdminBuilderImpl.java`).
57pub const DEFAULT_TIMEOUT: Duration = Duration::from_secs(60);
58
59/// Longer per-call timeout for `*_create_with_url` / `*_update_with_url`
60/// endpoints. The broker fetches the package from the supplied URL
61/// before responding, so the round-trip can comfortably exceed 60 s
62/// against a slow internal registry (S3 / GCS / function://) or a
63/// large `.nar` / `.jar` artifact. Overrides the client-level timeout
64/// for those calls only — every other admin verb keeps the 60 s
65/// budget [`DEFAULT_TIMEOUT`] provides.
66///
67/// 5 min matches the Java admin client's read-timeout for register
68/// paths.
69pub const PACKAGE_REGISTER_TIMEOUT: Duration = Duration::from_secs(300);
70
71/// Authentication strategy used by the admin client.
72///
73/// `Token(...)` adds `Authorization: Bearer <token>` to every request.
74/// Mirrors Java's `AuthenticationToken` provider. `OAuth2(...)` performs a
75/// `client_credentials` exchange against the IDP and attaches the resulting
76/// access token as a bearer credential — mirrors Java's
77/// `AuthenticationOAuth2`.
78#[derive(Clone, Default)]
79pub enum AdminAuth {
80    /// No authentication.
81    #[default]
82    None,
83    /// Bearer token. The string is the raw token; the `Bearer ` prefix is added
84    /// at request time.
85    Token(String),
86    /// `OAuth2` `client_credentials` flow. The cached access token is refreshed
87    /// (when missing or near expiry) and attached as `Authorization: Bearer
88    /// <access-token>` at request time. The flow is shared (`Arc`) so its
89    /// token cache is reused across every admin call.
90    OAuth2(Arc<ClientCredentialsFlow>),
91}
92
93impl std::fmt::Debug for AdminAuth {
94    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
95        // Redact the token body so calling `Debug` on the admin client never
96        // spills the bearer credential to tracing or stdout. Mirrors the
97        // `Credentials`/`ClientCredentialsFlow` Debug redaction in
98        // `magnetar-auth-oauth2`.
99        match self {
100            Self::None => f.write_str("None"),
101            Self::Token(_) => f.debug_tuple("Token").field(&"<redacted>").finish(),
102            // `ClientCredentialsFlow`'s own `Debug` already redacts the secret
103            // and cached token, so forwarding to it is safe.
104            Self::OAuth2(flow) => f.debug_tuple("OAuth2").field(flow).finish(),
105        }
106    }
107}
108
109/// Apache Pulsar admin REST client.
110///
111/// Holds two pre-computed base URLs: `base_url` anchored at
112/// `/admin/v2/` (clusters / tenants / namespaces / topics /
113/// subscriptions / brokers / bookies / schemas) and `base_url_v3`
114/// anchored at `/admin/v3/` (Pulsar Functions / IO Sources / IO Sinks
115/// / Packages). The split mirrors Pulsar's own routing — Java's
116/// `PulsarAdmin` keeps them separate too. Both URLs are derived from
117/// the same broker root at builder time, so a caller never has to
118/// know which version family an endpoint belongs to.
119#[derive(Debug, Clone)]
120pub struct AdminClient {
121    base_url: Url,
122    base_url_v3: Url,
123    http: reqwest::Client,
124    auth: AdminAuth,
125}
126
127impl AdminClient {
128    /// Start building an admin client.
129    #[must_use]
130    pub fn builder() -> AdminClientBuilder {
131        AdminClientBuilder::default()
132    }
133
134    /// Return the base URL the client targets (with the trailing `/admin/v2/`
135    /// component already appended). Exposed for tests and diagnostics.
136    #[must_use]
137    pub fn base_url(&self) -> &Url {
138        &self.base_url
139    }
140
141    /// Return the configured auth strategy. Exposed for tests and diagnostics.
142    #[must_use]
143    pub fn auth(&self) -> &AdminAuth {
144        &self.auth
145    }
146
147    // --- Cluster ---------------------------------------------------------
148
149    /// List clusters.
150    ///
151    /// `GET /admin/v2/clusters`.
152    /// Java: `pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Clusters.java`
153    /// (`@Path("/clusters")`) + `admin/impl/ClustersBase.java#getClusters`.
154    pub async fn cluster_list(&self) -> Result<Vec<String>, AdminError> {
155        let url = self.url(&["clusters"])?;
156        let resp = self.send(self.http.request(Method::GET, url)).await?;
157        json_ok(resp).await
158    }
159
160    /// List failure-domains configured on a cluster.
161    ///
162    /// `GET /admin/v2/clusters/{cluster}/failureDomains`. The broker returns
163    /// a `Map<String, FailureDomain>` keyed by domain name; each value
164    /// carries a `brokers: Set<String>` member. The map is exposed as a
165    /// raw `serde_json::Value` for forward-compat — broker minor versions
166    /// add fields.
167    /// Java: `ClustersBase#getFailureDomains`.
168    pub async fn cluster_failure_domains_list(
169        &self,
170        cluster: &str,
171    ) -> Result<serde_json::Value, AdminError> {
172        validate_segment(cluster)?;
173        let url = self.url(&["clusters", cluster, "failureDomains"])?;
174        let resp = self.send(self.http.request(Method::GET, url)).await?;
175        json_ok(resp).await
176    }
177
178    /// Get one failure-domain by name.
179    ///
180    /// `GET /admin/v2/clusters/{cluster}/failureDomains/{domain}`.
181    /// Java: `ClustersBase#getDomain`.
182    pub async fn cluster_failure_domain_get(
183        &self,
184        cluster: &str,
185        domain: &str,
186    ) -> Result<serde_json::Value, AdminError> {
187        validate_segment(cluster)?;
188        validate_segment(domain)?;
189        let url = self.url(&["clusters", cluster, "failureDomains", domain])?;
190        let resp = self.send(self.http.request(Method::GET, url)).await?;
191        json_ok(resp).await
192    }
193
194    /// List namespace-isolation policies configured on a cluster.
195    ///
196    /// `GET /admin/v2/clusters/{cluster}/namespaceIsolationPolicies`. The
197    /// broker returns a `Map<String, NamespaceIsolationData>` carrying
198    /// the namespace regex, primary/secondary broker lists, and the
199    /// auto-failover policy. Exposed as raw JSON for forward-compat.
200    ///
201    /// A cluster with no isolation policies configured surfaces as
202    /// `404 NamespaceIsolationPolicies for cluster ... does not exist`
203    /// (Pulsar 4.0.x) rather than an empty map; we mirror the Java
204    /// client's `Map<String, _>` surface by mapping that specific 404 to
205    /// an empty `{}` object. Other 404s (auth, wrong cluster) still
206    /// surface as `AdminError::Status`.
207    /// Java: `ClustersBase#getNamespaceIsolationPolicies`.
208    pub async fn namespace_isolation_policies_list(
209        &self,
210        cluster: &str,
211    ) -> Result<serde_json::Value, AdminError> {
212        validate_segment(cluster)?;
213        let url = self.url(&["clusters", cluster, "namespaceIsolationPolicies"])?;
214        let resp = self.send(self.http.request(Method::GET, url)).await?;
215        match json_ok::<serde_json::Value>(resp).await {
216            Ok(v) => Ok(v),
217            Err(AdminError::Status {
218                code: 404, body, ..
219            }) if body.contains("NamespaceIsolationPolicies") => {
220                Ok(serde_json::Value::Object(serde_json::Map::new()))
221            }
222            Err(e) => Err(e),
223        }
224    }
225
226    // --- Brokers ---------------------------------------------------------
227
228    /// List active brokers in a cluster.
229    ///
230    /// `GET /admin/v2/brokers/{cluster}`. Returns a list of `host:port`
231    /// strings — one entry per broker that's currently registered with
232    /// the cluster's metadata store. Java: `BrokersBase#getActiveBrokers`.
233    pub async fn brokers_list(&self, cluster: &str) -> Result<Vec<String>, AdminError> {
234        validate_segment(cluster)?;
235        let url = self.url(&["brokers", cluster])?;
236        let resp = self.send(self.http.request(Method::GET, url)).await?;
237        json_ok(resp).await
238    }
239
240    /// Get the current leader broker for the cluster.
241    ///
242    /// `GET /admin/v2/brokers/leaderBroker`. Returns `{ serviceUrl,
243    /// brokerId }`. Exposed as raw JSON for forward-compat — newer
244    /// brokers add `clusterName` and similar fields.
245    /// Java: `BrokersBase#getLeaderBroker`.
246    pub async fn brokers_leader(&self) -> Result<serde_json::Value, AdminError> {
247        let url = self.url(&["brokers", "leaderBroker"])?;
248        let resp = self.send(self.http.request(Method::GET, url)).await?;
249        json_ok(resp).await
250    }
251
252    /// List the names of all dynamic-config keys the broker exposes.
253    ///
254    /// `GET /admin/v2/brokers/configuration`. Returns the bare list of
255    /// `ServiceConfiguration` fields tagged `@FieldContext(dynamic = true)`
256    /// — the set of keys that `brokers_set_dynamic_config` accepts. Use
257    /// [`Self::brokers_dynamic_config_overrides`] for the current values.
258    ///
259    /// Pulsar 4 normally returns a JSON array (`List<String>` from
260    /// `BrokerService#getDynamicConfiguration`), but some packaging /
261    /// proxy paths surface the underlying `Map<String, ConfigField>`
262    /// shape instead. We accept both — array → values, object → keys —
263    /// to stay version-tolerant. Java: `BrokersBase#getDynamicConfigurationName`.
264    pub async fn brokers_dynamic_config_keys(&self) -> Result<Vec<String>, AdminError> {
265        let url = self.url(&["brokers", "configuration"])?;
266        let resp = self.send(self.http.request(Method::GET, url)).await?;
267        let v: serde_json::Value = json_ok(resp).await?;
268        match v {
269            serde_json::Value::Array(items) => Ok(items
270                .into_iter()
271                .filter_map(|x| x.as_str().map(str::to_owned))
272                .collect()),
273            serde_json::Value::Object(map) => Ok(map.into_iter().map(|(k, _)| k).collect()),
274            other => Err(AdminError::Protocol(format!(
275                "brokers/configuration returned unexpected shape: {other}"
276            ))),
277        }
278    }
279
280    /// Get the currently-overridden dynamic configuration values.
281    ///
282    /// `GET /admin/v2/brokers/configuration/values`. Returns a
283    /// `Map<String, String>` of every dynamic key the operator has set
284    /// (the broker omits keys still on their static / default value).
285    /// Exposed as raw JSON because broker minor versions add keys.
286    /// Java: `BrokersBase#getAllDynamicConfigurations`.
287    pub async fn brokers_dynamic_config_overrides(&self) -> Result<serde_json::Value, AdminError> {
288        let url = self.url(&["brokers", "configuration", "values"])?;
289        let resp = self.send(self.http.request(Method::GET, url)).await?;
290        json_ok(resp).await
291    }
292
293    /// Get the broker's runtime (merged static + dynamic) configuration.
294    ///
295    /// `GET /admin/v2/brokers/configuration/runtime`. Returns the full
296    /// `Map<String, String>` of `ServiceConfiguration` values as they
297    /// currently apply on the broker process — static defaults
298    /// overlaid with any `brokers_set_dynamic_config` overrides. Raw
299    /// JSON for forward-compat. Java: `BrokersBase#getRuntimeConfiguration`.
300    pub async fn brokers_runtime_config(&self) -> Result<serde_json::Value, AdminError> {
301        let url = self.url(&["brokers", "configuration", "runtime"])?;
302        let resp = self.send(self.http.request(Method::GET, url)).await?;
303        json_ok(resp).await
304    }
305
306    /// Get the broker's internal-stack endpoints.
307    ///
308    /// `GET /admin/v2/brokers/internal-configuration`. Returns the
309    /// `InternalConfigurationData` envelope — metadata-store URLs
310    /// (`zookeeperServers`, `configurationMetadataStoreUrl`),
311    /// `BookKeeper` metadata service URI, ledger root paths. Raw JSON
312    /// for forward-compat; the shape rolls between releases as the
313    /// metadata layer evolves.
314    /// Java: `BrokersBase#getInternalConfigurationData`.
315    pub async fn brokers_internal_config(&self) -> Result<serde_json::Value, AdminError> {
316        let url = self.url(&["brokers", "internal-configuration"])?;
317        let resp = self.send(self.http.request(Method::GET, url)).await?;
318        json_ok(resp).await
319    }
320
321    /// Probe broker health — produces and consumes one heartbeat message
322    /// on an internal topic.
323    ///
324    /// `GET /admin/v2/brokers/health`. The broker returns the plain-text
325    /// string `"ok"` on success; non-200 surfaces as `AdminError::Status`.
326    /// Java: `BrokersBase#healthCheck`.
327    pub async fn brokers_health_check(&self) -> Result<String, AdminError> {
328        let url = self.url(&["brokers", "health"])?;
329        let resp = self.send(self.http.request(Method::GET, url)).await?;
330        let resp = ensure_status(resp).await?;
331        Ok(resp.resp.text().await?)
332    }
333
334    /// List the namespaces a specific broker currently owns.
335    ///
336    /// `GET /admin/v2/brokers/{cluster}/{broker}/ownedNamespaces`. The
337    /// `broker` argument must be the broker's `host:port` (matching the
338    /// strings [`Self::brokers_list`] returns). Returns a
339    /// `Map<String, NamespaceOwnershipStatus>` keyed by namespace name —
340    /// raw JSON for forward-compat.
341    /// Java: `BrokersBase#getOwnedNamespaces`.
342    pub async fn brokers_owned_namespaces(
343        &self,
344        cluster: &str,
345        broker: &str,
346    ) -> Result<serde_json::Value, AdminError> {
347        validate_segment(cluster)?;
348        validate_segment(broker)?;
349        let url = self.url(&["brokers", cluster, broker, "ownedNamespaces"])?;
350        let resp = self.send(self.http.request(Method::GET, url)).await?;
351        json_ok(resp).await
352    }
353
354    /// Override a dynamic broker configuration value.
355    ///
356    /// `POST /admin/v2/brokers/configuration/{name}/{value}`. Both the
357    /// key and the value travel in the URL path — there is no request
358    /// body — matching the broker's `updateDynamicConfiguration(@PathParam
359    /// String configName, @PathParam String configValue)` signature.
360    /// The key must be one of those returned by
361    /// [`Self::brokers_dynamic_config_keys`]; unknown keys yield 412.
362    /// Java: `BrokersBase#updateDynamicConfiguration`.
363    pub async fn brokers_set_dynamic_config(
364        &self,
365        name: &str,
366        value: &str,
367    ) -> Result<(), AdminError> {
368        validate_segment(name)?;
369        validate_segment(value)?;
370        let url = self.url(&["brokers", "configuration", name, value])?;
371        let resp = self.send(self.http.request(Method::POST, url)).await?;
372        empty_ok(resp).await
373    }
374
375    /// Drop a dynamic configuration override, reverting to the static value.
376    ///
377    /// `DELETE /admin/v2/brokers/configuration/{name}`. After the call
378    /// the key disappears from [`Self::brokers_dynamic_config_overrides`]
379    /// and [`Self::brokers_runtime_config`] reflects the underlying
380    /// static / default value again.
381    /// Java: `BrokersBase#deleteDynamicConfiguration`.
382    pub async fn brokers_delete_dynamic_config(&self, name: &str) -> Result<(), AdminError> {
383        validate_segment(name)?;
384        let url = self.url(&["brokers", "configuration", name])?;
385        let resp = self.send(self.http.request(Method::DELETE, url)).await?;
386        empty_ok(resp).await
387    }
388
389    // --- Bookies ---------------------------------------------------------
390
391    /// List every bookie the broker knows about — both writable and
392    /// read-only — as registered in `BookKeeper` metadata.
393    ///
394    /// `GET /admin/v2/bookies/all`. Returns the broker's
395    /// `BookiesClusterInfo` envelope — a `bookies: [{ address: "host:port" }]`
396    /// array. Raw JSON for forward-compat.
397    /// Java: `BookiesBase#getAllAvailableBookies`.
398    pub async fn bookies_list_all(&self) -> Result<serde_json::Value, AdminError> {
399        let url = self.url(&["bookies", "all"])?;
400        let resp = self.send(self.http.request(Method::GET, url)).await?;
401        json_ok(resp).await
402    }
403
404    /// Get every bookie's group + rack assignment, as configured for the
405    /// rack-aware placement policy.
406    ///
407    /// `GET /admin/v2/bookies/racks-info`. Returns the nested
408    /// `Map<group, Map<bookieAddress, BookieInfo>>` shape Pulsar
409    /// persists in metadata. Raw JSON because the wire shape exposes
410    /// nested maps that change between releases (the `default` group
411    /// is implicit on older brokers).
412    /// Java: `BookiesBase#getBookieRackInfo`.
413    pub async fn bookies_racks_info(&self) -> Result<serde_json::Value, AdminError> {
414        let url = self.url(&["bookies", "racks-info"])?;
415        let resp = self.send(self.http.request(Method::GET, url)).await?;
416        json_ok(resp).await
417    }
418
419    /// Set (or update) a bookie's rack assignment.
420    ///
421    /// `POST /admin/v2/bookies/racks-info/{bookie}?group={group}` with
422    /// a JSON [`BookieInfo`] body carrying only `{rack, hostname}`.
423    /// `bookie` is the `host:port` registered in `BookKeeper` metadata.
424    /// The placement policy picks up the new rack on its next
425    /// reconciliation tick.
426    ///
427    /// `group` is **a query parameter**, not a body field — Pulsar's
428    /// `BookiesBase#updateBookieRackInfo(@PathParam("bookie") String,
429    /// @QueryParam("group") String, BookieInfo)` Jackson-binds the body
430    /// to `{rack, hostname}` only; an unknown `group` body field is
431    /// silently ignored and the query param defaults to `null`,
432    /// dropping the operator's group choice on the wire.
433    /// Java: `BookiesBase#updateBookieRackInfo`.
434    pub async fn bookies_set_rack(
435        &self,
436        bookie: &str,
437        group: &str,
438        info: BookieInfo,
439    ) -> Result<(), AdminError> {
440        validate_segment(bookie)?;
441        let mut url = self.url(&["bookies", "racks-info", bookie])?;
442        url.query_pairs_mut().append_pair("group", group);
443        let resp = self
444            .send(self.http.request(Method::POST, url).json(&info))
445            .await?;
446        empty_ok(resp).await
447    }
448
449    /// Remove a bookie's rack assignment.
450    ///
451    /// `DELETE /admin/v2/bookies/racks-info/{bookie}`. The bookie falls
452    /// back to the placement policy's default group / rack until
453    /// [`Self::bookies_set_rack`] is called again.
454    /// Java: `BookiesBase#deleteBookieRackInfo`.
455    pub async fn bookies_delete_rack(&self, bookie: &str) -> Result<(), AdminError> {
456        validate_segment(bookie)?;
457        let url = self.url(&["bookies", "racks-info", bookie])?;
458        let resp = self.send(self.http.request(Method::DELETE, url)).await?;
459        empty_ok(resp).await
460    }
461
462    // --- Schemas ---------------------------------------------------------
463
464    /// Get the latest schema attached to a topic.
465    ///
466    /// `GET /admin/v2/schemas/{tenant}/{ns}/{topic}/schema`. Returns
467    /// `{ version, type, schema, properties, timestamp }`; raw JSON
468    /// because the `type` axis (`AVRO` / `JSON` / `PROTOBUF` /
469    /// `PROTOBUF_NATIVE` / `KEY_VALUE` / `STRING` / `BYTES` / …) is
470    /// open-ended and broker minor versions add keys (deletion
471    /// tombstones surface as `type: "DELETE"` on the GET, for
472    /// instance). Java: `SchemasResourceBase#getSchema`.
473    pub async fn schema_get_latest(&self, topic: &str) -> Result<serde_json::Value, AdminError> {
474        let (tenant, namespace, name) = split_topic(topic)?;
475        let url = self.url(&["schemas", tenant, namespace, name, "schema"])?;
476        let resp = self.send(self.http.request(Method::GET, url)).await?;
477        json_ok(resp).await
478    }
479
480    /// Get a specific schema version attached to a topic.
481    ///
482    /// `GET /admin/v2/schemas/{tenant}/{ns}/{topic}/schema/{version}`.
483    /// `version` is the monotonically-increasing integer the broker
484    /// assigns at registration. Same wire shape as
485    /// [`Self::schema_get_latest`].
486    /// Java: `SchemasResourceBase#getSchema` (with version path param).
487    pub async fn schema_get_version(
488        &self,
489        topic: &str,
490        version: i64,
491    ) -> Result<serde_json::Value, AdminError> {
492        let (tenant, namespace, name) = split_topic(topic)?;
493        let v = version.to_string();
494        let url = self.url(&["schemas", tenant, namespace, name, "schema", &v])?;
495        let resp = self.send(self.http.request(Method::GET, url)).await?;
496        json_ok(resp).await
497    }
498
499    /// List every schema version registered for a topic.
500    ///
501    /// `GET /admin/v2/schemas/{tenant}/{ns}/{topic}/schemas`. Pulsar 4
502    /// wraps the per-version entries in a
503    /// `GetAllVersionsSchemaResponse { getSchemaResponses: [...] }`
504    /// envelope (verified against `apache/pulsar@v4.0.4`
505    /// `SchemasResourceBase#convertToAllVersionsSchemaResponse`). We
506    /// unwrap that envelope at the boundary so callers see the flat
507    /// `Vec<Value>` they expect; a bare-array shape (older or
508    /// alternative serialisations) is still accepted. Raw JSON
509    /// per-entry for forward-compat. Java:
510    /// `SchemasResourceBase#getAllSchemas`.
511    pub async fn schema_list_versions(
512        &self,
513        topic: &str,
514    ) -> Result<Vec<serde_json::Value>, AdminError> {
515        let (tenant, namespace, name) = split_topic(topic)?;
516        let url = self.url(&["schemas", tenant, namespace, name, "schemas"])?;
517        let resp = self.send(self.http.request(Method::GET, url)).await?;
518        let v: serde_json::Value = json_ok(resp).await?;
519        match v {
520            serde_json::Value::Array(items) => Ok(items),
521            serde_json::Value::Object(mut envelope) => {
522                if let Some(serde_json::Value::Array(items)) = envelope.remove("getSchemaResponses")
523                {
524                    return Ok(items);
525                }
526                Err(AdminError::Protocol(format!(
527                    "schemas/.../schemas envelope missing `getSchemaResponses` array: {}",
528                    serde_json::Value::Object(envelope)
529                )))
530            }
531            other => Err(AdminError::Protocol(format!(
532                "schemas/.../schemas returned unexpected shape: {other}"
533            ))),
534        }
535    }
536
537    /// Register a new schema version on a topic.
538    ///
539    /// `POST /admin/v2/schemas/{tenant}/{ns}/{topic}/schema` with a JSON
540    /// [`PostSchemaPayload`] body. The broker returns `{ version: N }`;
541    /// raw JSON because the upstream response envelope wraps the
542    /// version under `data` on some 4.x point releases. Compatibility
543    /// is enforced server-side per the namespace's
544    /// `schemaCompatibilityStrategy` — incompatible posts fail with
545    /// 409. Java: `SchemasResourceBase#postSchema`.
546    pub async fn schema_post(
547        &self,
548        topic: &str,
549        payload: PostSchemaPayload,
550    ) -> Result<serde_json::Value, AdminError> {
551        let (tenant, namespace, name) = split_topic(topic)?;
552        let url = self.url(&["schemas", tenant, namespace, name, "schema"])?;
553        let resp = self
554            .send(self.http.request(Method::POST, url).json(&payload))
555            .await?;
556        json_ok(resp).await
557    }
558
559    /// Delete a topic's schema.
560    ///
561    /// `DELETE /admin/v2/schemas/{tenant}/{ns}/{topic}/schema?force={force}`.
562    /// `force = true` skips the broker's "is the schema in use"
563    /// guard — equivalent to `pulsar-admin schemas delete --force`.
564    /// Java: `SchemasResourceBase#deleteSchema`.
565    pub async fn schema_delete(&self, topic: &str, force: bool) -> Result<(), AdminError> {
566        let (tenant, namespace, name) = split_topic(topic)?;
567        let mut url = self.url(&["schemas", tenant, namespace, name, "schema"])?;
568        url.query_pairs_mut()
569            .append_pair("force", if force { "true" } else { "false" });
570        let resp = self.send(self.http.request(Method::DELETE, url)).await?;
571        empty_ok(resp).await
572    }
573
574    /// Check whether a candidate schema would be compatible with the
575    /// topic's current schema.
576    ///
577    /// `POST /admin/v2/schemas/{tenant}/{ns}/{topic}/compatibility` with
578    /// a JSON [`PostSchemaPayload`] body — the same shape
579    /// [`Self::schema_post`] sends, but the broker only evaluates
580    /// compatibility and never persists. Returns `{ isCompatible:
581    /// bool, schemaCompatibilityStrategy: "..." }`; raw JSON for
582    /// forward-compat.
583    /// Java: `SchemasResourceBase#testCompatibility`.
584    pub async fn schema_compatibility_check(
585        &self,
586        topic: &str,
587        payload: PostSchemaPayload,
588    ) -> Result<serde_json::Value, AdminError> {
589        let (tenant, namespace, name) = split_topic(topic)?;
590        let url = self.url(&["schemas", tenant, namespace, name, "compatibility"])?;
591        let resp = self
592            .send(self.http.request(Method::POST, url).json(&payload))
593            .await?;
594        json_ok(resp).await
595    }
596
597    // --- Pulsar Functions (read) ----------------------------------------
598
599    /// List every function registered under a namespace.
600    ///
601    /// `GET /admin/v3/functions/{tenant}/{namespace}`. Returns a JSON
602    /// array of bare function names (no tenant / namespace prefix) —
603    /// matches Java's `FunctionsBase#listFunctions` body shape.
604    pub async fn functions_list_by_namespace(
605        &self,
606        tenant: &str,
607        namespace: &str,
608    ) -> Result<Vec<String>, AdminError> {
609        validate_segment(tenant)?;
610        validate_segment(namespace)?;
611        let url = self.url_v3(&["functions", tenant, namespace])?;
612        let resp = self.send(self.http.request(Method::GET, url)).await?;
613        json_ok(resp).await
614    }
615
616    /// Get a function's registered `FunctionConfig`.
617    ///
618    /// `GET /admin/v3/functions/{tenant}/{namespace}/{name}`. The
619    /// `FunctionConfig` Java type has ~30 fields and grows on every
620    /// minor release — return raw JSON for forward-compat. Java:
621    /// `FunctionsBase#getFunctionInfo`.
622    pub async fn function_get(
623        &self,
624        tenant: &str,
625        namespace: &str,
626        name: &str,
627    ) -> Result<serde_json::Value, AdminError> {
628        validate_segment(tenant)?;
629        validate_segment(namespace)?;
630        validate_segment(name)?;
631        let url = self.url_v3(&["functions", tenant, namespace, name])?;
632        let resp = self.send(self.http.request(Method::GET, url)).await?;
633        json_ok(resp).await
634    }
635
636    /// Get a function's aggregate status (all instances).
637    ///
638    /// `GET /admin/v3/functions/{tenant}/{namespace}/{name}/status`.
639    /// Returns Java's `FunctionStatus` envelope:
640    /// `{numInstances, numRunning, instances: [...]}`. Raw JSON because
641    /// the per-instance shape carries broker-version-dependent fields.
642    /// Java: `FunctionsBase#getFunctionStatus`.
643    pub async fn function_status(
644        &self,
645        tenant: &str,
646        namespace: &str,
647        name: &str,
648    ) -> Result<serde_json::Value, AdminError> {
649        validate_segment(tenant)?;
650        validate_segment(namespace)?;
651        validate_segment(name)?;
652        let url = self.url_v3(&["functions", tenant, namespace, name, "status"])?;
653        let resp = self.send(self.http.request(Method::GET, url)).await?;
654        json_ok(resp).await
655    }
656
657    /// Get a function's aggregate runtime statistics (all instances).
658    ///
659    /// `GET /admin/v3/functions/{tenant}/{namespace}/{name}/stats`.
660    /// Returns Java's `FunctionStats` envelope — message rates,
661    /// processed counts, average latency, per-instance breakdown. Raw
662    /// JSON for forward-compat. Java: `FunctionsBase#getFunctionStats`.
663    pub async fn function_stats(
664        &self,
665        tenant: &str,
666        namespace: &str,
667        name: &str,
668    ) -> Result<serde_json::Value, AdminError> {
669        validate_segment(tenant)?;
670        validate_segment(namespace)?;
671        validate_segment(name)?;
672        let url = self.url_v3(&["functions", tenant, namespace, name, "stats"])?;
673        let resp = self.send(self.http.request(Method::GET, url)).await?;
674        json_ok(resp).await
675    }
676
677    /// Get one instance's status.
678    ///
679    /// `GET /admin/v3/functions/{tenant}/{namespace}/{name}/{instance_id}/status`.
680    /// `instance_id` is the integer index the broker assigns at
681    /// schedule time (`0..parallelism`). Java:
682    /// `FunctionsBase#getFunctionInstanceStatus`.
683    pub async fn function_instance_status(
684        &self,
685        tenant: &str,
686        namespace: &str,
687        name: &str,
688        instance_id: i32,
689    ) -> Result<serde_json::Value, AdminError> {
690        validate_segment(tenant)?;
691        validate_segment(namespace)?;
692        validate_segment(name)?;
693        let id = instance_id.to_string();
694        let url = self.url_v3(&["functions", tenant, namespace, name, &id, "status"])?;
695        let resp = self.send(self.http.request(Method::GET, url)).await?;
696        json_ok(resp).await
697    }
698
699    /// Get one instance's runtime statistics.
700    ///
701    /// `GET /admin/v3/functions/{tenant}/{namespace}/{name}/{instance_id}/stats`.
702    /// Java: `FunctionsBase#getFunctionInstanceStats`.
703    pub async fn function_instance_stats(
704        &self,
705        tenant: &str,
706        namespace: &str,
707        name: &str,
708        instance_id: i32,
709    ) -> Result<serde_json::Value, AdminError> {
710        validate_segment(tenant)?;
711        validate_segment(namespace)?;
712        validate_segment(name)?;
713        let id = instance_id.to_string();
714        let url = self.url_v3(&["functions", tenant, namespace, name, &id, "stats"])?;
715        let resp = self.send(self.http.request(Method::GET, url)).await?;
716        json_ok(resp).await
717    }
718
719    // --- Pulsar Functions (URL-based register / update) ----------------
720
721    /// Register a function from a remote package URL.
722    ///
723    /// `POST /admin/v3/functions/{tenant}/{namespace}/{name}` with a
724    /// `multipart/form-data` body carrying two parts: `url=<pkg-url>`
725    /// (the broker-resolvable HTTP / `function://` / `file://` URL of
726    /// the compiled package) and `functionConfig=<json>` (the
727    /// serialised [`FunctionConfig`]). The local-file upload path
728    /// (Java's `FunctionsBase#registerFunction` with a
729    /// `FormDataMultiPart` `data` part) is intentionally out of scope
730    /// for this method — operators with a pre-built JAR served over
731    /// HTTP / S3 / GCS use the URL path.
732    ///
733    /// The two-part envelope matches Java's
734    /// `FunctionsBase#registerFunction(@PathParam tenant, ...,
735    /// @FormDataParam("url") String functionPkgUrl,
736    /// @FormDataParam("functionConfig") FunctionConfig functionConfig)`
737    /// — when `data` is null and `url` is non-null the broker takes the
738    /// URL fast-path and skips the upload step.
739    pub async fn function_create_with_url(
740        &self,
741        tenant: &str,
742        namespace: &str,
743        name: &str,
744        url: &str,
745        config: FunctionConfig,
746    ) -> Result<(), AdminError> {
747        validate_segment(tenant)?;
748        validate_segment(namespace)?;
749        validate_segment(name)?;
750        let endpoint = self.url_v3(&["functions", tenant, namespace, name])?;
751        let form = function_pkg_form(url, &config)?;
752        let resp = self
753            .send(
754                self.http
755                    .request(Method::POST, endpoint)
756                    .multipart(form)
757                    .timeout(PACKAGE_REGISTER_TIMEOUT),
758            )
759            .await?;
760        empty_ok(resp).await
761    }
762
763    /// Update an existing function from a remote package URL.
764    ///
765    /// `PUT /admin/v3/functions/{tenant}/{namespace}/{name}` with the
766    /// same two-part `multipart/form-data` shape as
767    /// [`Self::function_create_with_url`]. Java:
768    /// `FunctionsBase#updateFunction` with non-null `pkgUrl`.
769    pub async fn function_update_with_url(
770        &self,
771        tenant: &str,
772        namespace: &str,
773        name: &str,
774        url: &str,
775        config: FunctionConfig,
776    ) -> Result<(), AdminError> {
777        validate_segment(tenant)?;
778        validate_segment(namespace)?;
779        validate_segment(name)?;
780        let endpoint = self.url_v3(&["functions", tenant, namespace, name])?;
781        let form = function_pkg_form(url, &config)?;
782        let resp = self
783            .send(
784                self.http
785                    .request(Method::PUT, endpoint)
786                    .multipart(form)
787                    .timeout(PACKAGE_REGISTER_TIMEOUT),
788            )
789            .await?;
790        empty_ok(resp).await
791    }
792
793    // --- Pulsar Functions (lifecycle) -----------------------------------
794
795    /// Deregister (delete) a function.
796    ///
797    /// `DELETE /admin/v3/functions/{tenant}/{namespace}/{name}`. The
798    /// broker stops every running instance and drops the
799    /// `FunctionConfig` from metadata. Java:
800    /// `FunctionsBase#deregisterFunction`.
801    pub async fn function_delete(
802        &self,
803        tenant: &str,
804        namespace: &str,
805        name: &str,
806    ) -> Result<(), AdminError> {
807        validate_segment(tenant)?;
808        validate_segment(namespace)?;
809        validate_segment(name)?;
810        let url = self.url_v3(&["functions", tenant, namespace, name])?;
811        let resp = self.send(self.http.request(Method::DELETE, url)).await?;
812        empty_ok(resp).await
813    }
814
815    /// Start every instance of a function (idempotent).
816    ///
817    /// `POST /admin/v3/functions/{tenant}/{namespace}/{name}/start`. No
818    /// body. Java: `FunctionsBase#startFunction`.
819    pub async fn function_start(
820        &self,
821        tenant: &str,
822        namespace: &str,
823        name: &str,
824    ) -> Result<(), AdminError> {
825        validate_segment(tenant)?;
826        validate_segment(namespace)?;
827        validate_segment(name)?;
828        let url = self.url_v3(&["functions", tenant, namespace, name, "start"])?;
829        let resp = self.send(self.http.request(Method::POST, url)).await?;
830        empty_ok(resp).await
831    }
832
833    /// Stop every instance of a function.
834    ///
835    /// `POST /admin/v3/functions/{tenant}/{namespace}/{name}/stop`. The
836    /// broker leaves the `FunctionConfig` in metadata; a subsequent
837    /// `function_start` brings it back. Java:
838    /// `FunctionsBase#stopFunction`.
839    pub async fn function_stop(
840        &self,
841        tenant: &str,
842        namespace: &str,
843        name: &str,
844    ) -> Result<(), AdminError> {
845        validate_segment(tenant)?;
846        validate_segment(namespace)?;
847        validate_segment(name)?;
848        let url = self.url_v3(&["functions", tenant, namespace, name, "stop"])?;
849        let resp = self.send(self.http.request(Method::POST, url)).await?;
850        empty_ok(resp).await
851    }
852
853    /// Restart every instance of a function.
854    ///
855    /// `POST /admin/v3/functions/{tenant}/{namespace}/{name}/restart`.
856    /// Java: `FunctionsBase#restartFunction`.
857    pub async fn function_restart(
858        &self,
859        tenant: &str,
860        namespace: &str,
861        name: &str,
862    ) -> Result<(), AdminError> {
863        validate_segment(tenant)?;
864        validate_segment(namespace)?;
865        validate_segment(name)?;
866        let url = self.url_v3(&["functions", tenant, namespace, name, "restart"])?;
867        let resp = self.send(self.http.request(Method::POST, url)).await?;
868        empty_ok(resp).await
869    }
870
871    /// Start one specific instance.
872    ///
873    /// `POST /admin/v3/functions/{tenant}/{namespace}/{name}/{instance_id}/start`.
874    /// Java: `FunctionsBase#startFunctionInstance`.
875    pub async fn function_start_instance(
876        &self,
877        tenant: &str,
878        namespace: &str,
879        name: &str,
880        instance_id: i32,
881    ) -> Result<(), AdminError> {
882        validate_segment(tenant)?;
883        validate_segment(namespace)?;
884        validate_segment(name)?;
885        let id = instance_id.to_string();
886        let url = self.url_v3(&["functions", tenant, namespace, name, &id, "start"])?;
887        let resp = self.send(self.http.request(Method::POST, url)).await?;
888        empty_ok(resp).await
889    }
890
891    /// Stop one specific instance.
892    ///
893    /// `POST /admin/v3/functions/{tenant}/{namespace}/{name}/{instance_id}/stop`.
894    /// Java: `FunctionsBase#stopFunctionInstance`.
895    pub async fn function_stop_instance(
896        &self,
897        tenant: &str,
898        namespace: &str,
899        name: &str,
900        instance_id: i32,
901    ) -> Result<(), AdminError> {
902        validate_segment(tenant)?;
903        validate_segment(namespace)?;
904        validate_segment(name)?;
905        let id = instance_id.to_string();
906        let url = self.url_v3(&["functions", tenant, namespace, name, &id, "stop"])?;
907        let resp = self.send(self.http.request(Method::POST, url)).await?;
908        empty_ok(resp).await
909    }
910
911    // --- Tenants ---------------------------------------------------------
912
913    /// List tenants.
914    ///
915    /// `GET /admin/v2/tenants`.
916    /// Java: `pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Tenants.java`
917    /// (`@Path("/tenants")`) + `admin/impl/TenantsBase.java#getTenants`.
918    pub async fn tenants_list(&self) -> Result<Vec<String>, AdminError> {
919        let url = self.url(&["tenants"])?;
920        let resp = self.send(self.http.request(Method::GET, url)).await?;
921        json_ok(resp).await
922    }
923
924    /// Create a tenant.
925    ///
926    /// `PUT /admin/v2/tenants/{tenant}` with a JSON [`TenantInfo`] body.
927    /// Java: `pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantsBase.java#
928    /// createTenant`.
929    pub async fn tenant_create(&self, name: &str, info: TenantInfo) -> Result<(), AdminError> {
930        let url = self.url(&["tenants", name])?;
931        let resp = self
932            .send(self.http.request(Method::PUT, url).json(&info))
933            .await?;
934        empty_ok(resp).await
935    }
936
937    /// Delete a tenant.
938    ///
939    /// `DELETE /admin/v2/tenants/{tenant}`.
940    /// Java: `pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantsBase.java#
941    /// deleteTenant`.
942    pub async fn tenant_delete(&self, name: &str) -> Result<(), AdminError> {
943        let url = self.url(&["tenants", name])?;
944        let resp = self.send(self.http.request(Method::DELETE, url)).await?;
945        empty_ok(resp).await
946    }
947
948    // --- Namespaces ------------------------------------------------------
949
950    /// List namespaces under a tenant.
951    ///
952    /// `GET /admin/v2/namespaces/{tenant}`.
953    /// Java: `pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java`
954    /// (`@Path("/namespaces")` + `@Path("/{tenant}")`).
955    pub async fn namespaces_list(&self, tenant: &str) -> Result<Vec<String>, AdminError> {
956        let url = self.url(&["namespaces", tenant])?;
957        let resp = self.send(self.http.request(Method::GET, url)).await?;
958        json_ok(resp).await
959    }
960
961    /// Create a namespace.
962    ///
963    /// `PUT /admin/v2/namespaces/{tenant}/{namespace}`. The namespace argument
964    /// is `tenant/namespace`, matching how Pulsar expresses fully qualified
965    /// namespace names on the wire and CLI.
966    /// Java: `pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java`
967    /// (`@PUT @Path("/{tenant}/{namespace}")`).
968    pub async fn namespace_create(&self, ns: &str) -> Result<(), AdminError> {
969        let (tenant, namespace) = split_namespace(ns)?;
970        let url = self.url(&["namespaces", tenant, namespace])?;
971        let resp = self.send(self.http.request(Method::PUT, url)).await?;
972        empty_ok(resp).await
973    }
974
975    /// Delete a namespace.
976    ///
977    /// `DELETE /admin/v2/namespaces/{tenant}/{namespace}`.
978    /// Java: `pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java`
979    /// (`@DELETE @Path("/{tenant}/{namespace}")`).
980    pub async fn namespace_delete(&self, ns: &str) -> Result<(), AdminError> {
981        let (tenant, namespace) = split_namespace(ns)?;
982        let url = self.url(&["namespaces", tenant, namespace])?;
983        let resp = self.send(self.http.request(Method::DELETE, url)).await?;
984        empty_ok(resp).await
985    }
986
987    /// Get a namespace's retention policy.
988    ///
989    /// `GET /admin/v2/namespaces/{tenant}/{ns}/retention`.
990    /// Returns `RetentionPolicies { retentionTimeInMinutes, retentionSizeInMB }`.
991    /// A fresh namespace, or a namespace whose retention was just
992    /// removed, surfaces as 204 / empty body / `null` — we fold those
993    /// to `RetentionPolicies::default()` (broker semantic).
994    /// Java: `NamespacesBase#getRetention`.
995    pub async fn namespace_get_retention(&self, ns: &str) -> Result<RetentionPolicies, AdminError> {
996        let (tenant, namespace) = split_namespace(ns)?;
997        let url = self.url(&["namespaces", tenant, namespace, "retention"])?;
998        let resp = self.send(self.http.request(Method::GET, url)).await?;
999        json_ok_or_default(resp).await
1000    }
1001
1002    /// Set a namespace's retention policy.
1003    ///
1004    /// `POST /admin/v2/namespaces/{tenant}/{ns}/retention` with a JSON
1005    /// `RetentionPolicies` body. `-1` means infinite (size or time).
1006    /// Java: `NamespacesBase#setRetention`.
1007    pub async fn namespace_set_retention(
1008        &self,
1009        ns: &str,
1010        policy: RetentionPolicies,
1011    ) -> Result<(), AdminError> {
1012        let (tenant, namespace) = split_namespace(ns)?;
1013        let url = self.url(&["namespaces", tenant, namespace, "retention"])?;
1014        let resp = self
1015            .send(self.http.request(Method::POST, url).json(&policy))
1016            .await?;
1017        empty_ok(resp).await
1018    }
1019
1020    /// Remove a namespace's retention policy (fall back to broker default).
1021    ///
1022    /// `DELETE /admin/v2/namespaces/{tenant}/{ns}/retention`.
1023    /// Java: `NamespacesBase#removeRetention`.
1024    pub async fn namespace_remove_retention(&self, ns: &str) -> Result<(), AdminError> {
1025        let (tenant, namespace) = split_namespace(ns)?;
1026        let url = self.url(&["namespaces", tenant, namespace, "retention"])?;
1027        let resp = self.send(self.http.request(Method::DELETE, url)).await?;
1028        empty_ok(resp).await
1029    }
1030
1031    /// Get all backlog-quota policies on a namespace.
1032    ///
1033    /// `GET /admin/v2/namespaces/{tenant}/{ns}/backlogQuotaMap`. Returns
1034    /// `Map<BacklogQuotaType, BacklogQuota>` — kept as raw JSON because
1035    /// broker versions add quota types (`message_age` since 2.10).
1036    /// Java: `NamespacesBase#getBacklogQuotaMap`.
1037    pub async fn namespace_get_backlog_quotas(
1038        &self,
1039        ns: &str,
1040    ) -> Result<serde_json::Value, AdminError> {
1041        let (tenant, namespace) = split_namespace(ns)?;
1042        let url = self.url(&["namespaces", tenant, namespace, "backlogQuotaMap"])?;
1043        let resp = self.send(self.http.request(Method::GET, url)).await?;
1044        json_ok(resp).await
1045    }
1046
1047    /// Set a backlog-quota policy on a namespace.
1048    ///
1049    /// `POST /admin/v2/namespaces/{tenant}/{ns}/backlogQuota?backlogQuotaType={type}`
1050    /// with a JSON `BacklogQuota` body. `backlog_quota_type` selects which
1051    /// dimension to limit (`destination_storage` for byte size, `message_age`
1052    /// for wall-clock TTL).
1053    /// Java: `NamespacesBase#setBacklogQuota`.
1054    pub async fn namespace_set_backlog_quota(
1055        &self,
1056        ns: &str,
1057        backlog_quota_type: BacklogQuotaType,
1058        quota: BacklogQuota,
1059    ) -> Result<(), AdminError> {
1060        let (tenant, namespace) = split_namespace(ns)?;
1061        let mut url = self.url(&["namespaces", tenant, namespace, "backlogQuota"])?;
1062        url.query_pairs_mut()
1063            .append_pair("backlogQuotaType", backlog_quota_type.as_query_value());
1064        let resp = self
1065            .send(self.http.request(Method::POST, url).json(&quota))
1066            .await?;
1067        empty_ok(resp).await
1068    }
1069
1070    /// Remove a backlog-quota policy from a namespace.
1071    ///
1072    /// `DELETE /admin/v2/namespaces/{tenant}/{ns}/backlogQuota?backlogQuotaType={type}`.
1073    /// Java: `NamespacesBase#removeBacklogQuota`.
1074    pub async fn namespace_remove_backlog_quota(
1075        &self,
1076        ns: &str,
1077        backlog_quota_type: BacklogQuotaType,
1078    ) -> Result<(), AdminError> {
1079        let (tenant, namespace) = split_namespace(ns)?;
1080        let mut url = self.url(&["namespaces", tenant, namespace, "backlogQuota"])?;
1081        url.query_pairs_mut()
1082            .append_pair("backlogQuotaType", backlog_quota_type.as_query_value());
1083        let resp = self.send(self.http.request(Method::DELETE, url)).await?;
1084        empty_ok(resp).await
1085    }
1086
1087    /// Get a namespace's message-TTL (seconds).
1088    ///
1089    /// `GET /admin/v2/namespaces/{tenant}/{ns}/messageTTL`. Returns a
1090    /// bare integer (or `null` if no TTL is set — which decodes as
1091    /// `Option::None`).
1092    /// Java: `NamespacesBase#getNamespaceMessageTTL`.
1093    pub async fn namespace_get_message_ttl(&self, ns: &str) -> Result<Option<i32>, AdminError> {
1094        let (tenant, namespace) = split_namespace(ns)?;
1095        let url = self.url(&["namespaces", tenant, namespace, "messageTTL"])?;
1096        let resp = self.send(self.http.request(Method::GET, url)).await?;
1097        json_ok_optional(resp).await
1098    }
1099
1100    /// Set a namespace's message-TTL (seconds).
1101    ///
1102    /// `POST /admin/v2/namespaces/{tenant}/{ns}/messageTTL` with a bare
1103    /// integer body. `0` disables (broker treats as no TTL).
1104    /// Java: `NamespacesBase#setNamespaceMessageTTL`.
1105    pub async fn namespace_set_message_ttl(
1106        &self,
1107        ns: &str,
1108        ttl_seconds: i32,
1109    ) -> Result<(), AdminError> {
1110        let (tenant, namespace) = split_namespace(ns)?;
1111        let url = self.url(&["namespaces", tenant, namespace, "messageTTL"])?;
1112        let resp = self
1113            .send(self.http.request(Method::POST, url).json(&ttl_seconds))
1114            .await?;
1115        empty_ok(resp).await
1116    }
1117
1118    /// Remove a namespace's message-TTL (fall back to broker default).
1119    ///
1120    /// `DELETE /admin/v2/namespaces/{tenant}/{ns}/messageTTL`.
1121    /// Java: `NamespacesBase#removeNamespaceMessageTTL`.
1122    pub async fn namespace_remove_message_ttl(&self, ns: &str) -> Result<(), AdminError> {
1123        let (tenant, namespace) = split_namespace(ns)?;
1124        let url = self.url(&["namespaces", tenant, namespace, "messageTTL"])?;
1125        let resp = self.send(self.http.request(Method::DELETE, url)).await?;
1126        empty_ok(resp).await
1127    }
1128
1129    // --- Namespace policies — persistence + rates ----------------------
1130
1131    /// Get a namespace's persistence policy.
1132    ///
1133    /// `GET /admin/v2/namespaces/{tenant}/{ns}/persistence`. Returns the
1134    /// `BookKeeper` ensemble / write-quorum / ack-quorum triple plus the
1135    /// managed-ledger mark-delete rate cap. `null` body decodes to
1136    /// `PersistencePolicies::default()` via `#[serde(default)]`.
1137    /// Java: `NamespacesBase#getPersistence`.
1138    pub async fn namespace_get_persistence(
1139        &self,
1140        ns: &str,
1141    ) -> Result<PersistencePolicies, AdminError> {
1142        let (tenant, namespace) = split_namespace(ns)?;
1143        let url = self.url(&["namespaces", tenant, namespace, "persistence"])?;
1144        let resp = self.send(self.http.request(Method::GET, url)).await?;
1145        json_ok_or_default(resp).await
1146    }
1147
1148    /// Set a namespace's persistence policy.
1149    ///
1150    /// `POST /admin/v2/namespaces/{tenant}/{ns}/persistence` with a JSON
1151    /// `PersistencePolicies` body.
1152    /// Java: `NamespacesBase#setPersistence`.
1153    pub async fn namespace_set_persistence(
1154        &self,
1155        ns: &str,
1156        policy: PersistencePolicies,
1157    ) -> Result<(), AdminError> {
1158        let (tenant, namespace) = split_namespace(ns)?;
1159        let url = self.url(&["namespaces", tenant, namespace, "persistence"])?;
1160        let resp = self
1161            .send(self.http.request(Method::POST, url).json(&policy))
1162            .await?;
1163        empty_ok(resp).await
1164    }
1165
1166    /// Remove a namespace's persistence policy (fall back to broker default).
1167    ///
1168    /// `DELETE /admin/v2/namespaces/{tenant}/{ns}/persistence`.
1169    /// Java: `NamespacesBase#deletePersistence`.
1170    pub async fn namespace_remove_persistence(&self, ns: &str) -> Result<(), AdminError> {
1171        let (tenant, namespace) = split_namespace(ns)?;
1172        let url = self.url(&["namespaces", tenant, namespace, "persistence"])?;
1173        let resp = self.send(self.http.request(Method::DELETE, url)).await?;
1174        empty_ok(resp).await
1175    }
1176
1177    /// Get a namespace's consumer dispatch-rate policy.
1178    ///
1179    /// `GET /admin/v2/namespaces/{tenant}/{ns}/dispatchRate`. Returns
1180    /// the per-namespace consumer-dispatch throttle (msg/sec, byte/sec,
1181    /// window in seconds). `-1` on either dimension means unlimited.
1182    /// Java: `NamespacesBase#getDispatchRate`.
1183    pub async fn namespace_get_dispatch_rate(&self, ns: &str) -> Result<DispatchRate, AdminError> {
1184        let (tenant, namespace) = split_namespace(ns)?;
1185        let url = self.url(&["namespaces", tenant, namespace, "dispatchRate"])?;
1186        let resp = self.send(self.http.request(Method::GET, url)).await?;
1187        json_ok_or_default(resp).await
1188    }
1189
1190    /// Set a namespace's consumer dispatch-rate policy.
1191    ///
1192    /// `POST /admin/v2/namespaces/{tenant}/{ns}/dispatchRate` with a
1193    /// JSON `DispatchRate` body.
1194    /// Java: `NamespacesBase#setDispatchRate`.
1195    pub async fn namespace_set_dispatch_rate(
1196        &self,
1197        ns: &str,
1198        rate: DispatchRate,
1199    ) -> Result<(), AdminError> {
1200        let (tenant, namespace) = split_namespace(ns)?;
1201        let url = self.url(&["namespaces", tenant, namespace, "dispatchRate"])?;
1202        let resp = self
1203            .send(self.http.request(Method::POST, url).json(&rate))
1204            .await?;
1205        empty_ok(resp).await
1206    }
1207
1208    /// Remove a namespace's consumer dispatch-rate policy.
1209    ///
1210    /// `DELETE /admin/v2/namespaces/{tenant}/{ns}/dispatchRate`.
1211    /// Java: `NamespacesBase#deleteDispatchRate`.
1212    pub async fn namespace_remove_dispatch_rate(&self, ns: &str) -> Result<(), AdminError> {
1213        let (tenant, namespace) = split_namespace(ns)?;
1214        let url = self.url(&["namespaces", tenant, namespace, "dispatchRate"])?;
1215        let resp = self.send(self.http.request(Method::DELETE, url)).await?;
1216        empty_ok(resp).await
1217    }
1218
1219    /// Get a namespace's per-subscription dispatch-rate policy.
1220    ///
1221    /// `GET /admin/v2/namespaces/{tenant}/{ns}/subscriptionDispatchRate`.
1222    /// Reuses the [`DispatchRate`] body shape — the policy applies per
1223    /// subscription rather than aggregated across all consumers.
1224    /// Java: `NamespacesBase#getSubscriptionDispatchRate`.
1225    pub async fn namespace_get_subscription_dispatch_rate(
1226        &self,
1227        ns: &str,
1228    ) -> Result<DispatchRate, AdminError> {
1229        let (tenant, namespace) = split_namespace(ns)?;
1230        let url = self.url(&["namespaces", tenant, namespace, "subscriptionDispatchRate"])?;
1231        let resp = self.send(self.http.request(Method::GET, url)).await?;
1232        json_ok(resp).await
1233    }
1234
1235    /// Set a namespace's per-subscription dispatch-rate policy.
1236    ///
1237    /// `POST /admin/v2/namespaces/{tenant}/{ns}/subscriptionDispatchRate`
1238    /// with a JSON `DispatchRate` body.
1239    /// Java: `NamespacesBase#setSubscriptionDispatchRate`.
1240    pub async fn namespace_set_subscription_dispatch_rate(
1241        &self,
1242        ns: &str,
1243        rate: DispatchRate,
1244    ) -> Result<(), AdminError> {
1245        let (tenant, namespace) = split_namespace(ns)?;
1246        let url = self.url(&["namespaces", tenant, namespace, "subscriptionDispatchRate"])?;
1247        let resp = self
1248            .send(self.http.request(Method::POST, url).json(&rate))
1249            .await?;
1250        empty_ok(resp).await
1251    }
1252
1253    /// Remove a namespace's per-subscription dispatch-rate policy.
1254    ///
1255    /// `DELETE /admin/v2/namespaces/{tenant}/{ns}/subscriptionDispatchRate`.
1256    /// Java: `NamespacesBase#deleteSubscriptionDispatchRate`.
1257    pub async fn namespace_remove_subscription_dispatch_rate(
1258        &self,
1259        ns: &str,
1260    ) -> Result<(), AdminError> {
1261        let (tenant, namespace) = split_namespace(ns)?;
1262        let url = self.url(&["namespaces", tenant, namespace, "subscriptionDispatchRate"])?;
1263        let resp = self.send(self.http.request(Method::DELETE, url)).await?;
1264        empty_ok(resp).await
1265    }
1266
1267    /// Get a namespace's cross-cluster replicator dispatch-rate policy.
1268    ///
1269    /// `GET /admin/v2/namespaces/{tenant}/{ns}/replicatorDispatchRate`.
1270    /// Reuses the [`DispatchRate`] body shape — the policy throttles
1271    /// outbound geo-replication traffic from this cluster.
1272    /// Java: `NamespacesBase#getReplicatorDispatchRate`.
1273    pub async fn namespace_get_replicator_dispatch_rate(
1274        &self,
1275        ns: &str,
1276    ) -> Result<DispatchRate, AdminError> {
1277        let (tenant, namespace) = split_namespace(ns)?;
1278        let url = self.url(&["namespaces", tenant, namespace, "replicatorDispatchRate"])?;
1279        let resp = self.send(self.http.request(Method::GET, url)).await?;
1280        json_ok(resp).await
1281    }
1282
1283    /// Set a namespace's cross-cluster replicator dispatch-rate policy.
1284    ///
1285    /// `POST /admin/v2/namespaces/{tenant}/{ns}/replicatorDispatchRate`
1286    /// with a JSON `DispatchRate` body.
1287    /// Java: `NamespacesBase#setReplicatorDispatchRate`.
1288    pub async fn namespace_set_replicator_dispatch_rate(
1289        &self,
1290        ns: &str,
1291        rate: DispatchRate,
1292    ) -> Result<(), AdminError> {
1293        let (tenant, namespace) = split_namespace(ns)?;
1294        let url = self.url(&["namespaces", tenant, namespace, "replicatorDispatchRate"])?;
1295        let resp = self
1296            .send(self.http.request(Method::POST, url).json(&rate))
1297            .await?;
1298        empty_ok(resp).await
1299    }
1300
1301    /// Remove a namespace's cross-cluster replicator dispatch-rate policy.
1302    ///
1303    /// `DELETE /admin/v2/namespaces/{tenant}/{ns}/replicatorDispatchRate`.
1304    /// Java: `NamespacesBase#removeReplicatorDispatchRate`.
1305    pub async fn namespace_remove_replicator_dispatch_rate(
1306        &self,
1307        ns: &str,
1308    ) -> Result<(), AdminError> {
1309        let (tenant, namespace) = split_namespace(ns)?;
1310        let url = self.url(&["namespaces", tenant, namespace, "replicatorDispatchRate"])?;
1311        let resp = self.send(self.http.request(Method::DELETE, url)).await?;
1312        empty_ok(resp).await
1313    }
1314
1315    /// Get a namespace's publish-rate policy.
1316    ///
1317    /// `GET /admin/v2/namespaces/{tenant}/{ns}/publishRate`. Returns
1318    /// the producer-side throttle (msg/sec + byte/sec). `-1` on either
1319    /// dimension means unlimited.
1320    /// Java: `NamespacesBase#getPublishRate`.
1321    pub async fn namespace_get_publish_rate(&self, ns: &str) -> Result<PublishRate, AdminError> {
1322        let (tenant, namespace) = split_namespace(ns)?;
1323        let url = self.url(&["namespaces", tenant, namespace, "publishRate"])?;
1324        let resp = self.send(self.http.request(Method::GET, url)).await?;
1325        json_ok_or_default(resp).await
1326    }
1327
1328    /// Set a namespace's publish-rate policy.
1329    ///
1330    /// `POST /admin/v2/namespaces/{tenant}/{ns}/publishRate` with a JSON
1331    /// `PublishRate` body.
1332    /// Java: `NamespacesBase#setPublishRate`.
1333    pub async fn namespace_set_publish_rate(
1334        &self,
1335        ns: &str,
1336        rate: PublishRate,
1337    ) -> Result<(), AdminError> {
1338        let (tenant, namespace) = split_namespace(ns)?;
1339        let url = self.url(&["namespaces", tenant, namespace, "publishRate"])?;
1340        let resp = self
1341            .send(self.http.request(Method::POST, url).json(&rate))
1342            .await?;
1343        empty_ok(resp).await
1344    }
1345
1346    /// Remove a namespace's publish-rate policy.
1347    ///
1348    /// `DELETE /admin/v2/namespaces/{tenant}/{ns}/publishRate`.
1349    /// Java: `NamespacesBase#removePublishRate`.
1350    pub async fn namespace_remove_publish_rate(&self, ns: &str) -> Result<(), AdminError> {
1351        let (tenant, namespace) = split_namespace(ns)?;
1352        let url = self.url(&["namespaces", tenant, namespace, "publishRate"])?;
1353        let resp = self.send(self.http.request(Method::DELETE, url)).await?;
1354        empty_ok(resp).await
1355    }
1356
1357    // --- Namespace policies — limits + dedup + delayed delivery -----
1358
1359    /// Get a namespace's broker-side message deduplication flag.
1360    ///
1361    /// `GET /admin/v2/namespaces/{tenant}/{ns}/deduplication`. Returns a
1362    /// bare JSON boolean, or `null` (decoded as `None`) when the policy
1363    /// is unset and the broker default applies.
1364    /// Java: `NamespacesBase#getDeduplication`.
1365    pub async fn namespace_get_deduplication(&self, ns: &str) -> Result<Option<bool>, AdminError> {
1366        let (tenant, namespace) = split_namespace(ns)?;
1367        let url = self.url(&["namespaces", tenant, namespace, "deduplication"])?;
1368        let resp = self.send(self.http.request(Method::GET, url)).await?;
1369        json_ok_optional(resp).await
1370    }
1371
1372    /// Set a namespace's broker-side message deduplication flag.
1373    ///
1374    /// `POST /admin/v2/namespaces/{tenant}/{ns}/deduplication` with a
1375    /// bare JSON boolean body.
1376    /// Java: `NamespacesBase#modifyDeduplication`.
1377    pub async fn namespace_set_deduplication(
1378        &self,
1379        ns: &str,
1380        enabled: bool,
1381    ) -> Result<(), AdminError> {
1382        let (tenant, namespace) = split_namespace(ns)?;
1383        let url = self.url(&["namespaces", tenant, namespace, "deduplication"])?;
1384        let resp = self
1385            .send(self.http.request(Method::POST, url).json(&enabled))
1386            .await?;
1387        empty_ok(resp).await
1388    }
1389
1390    /// Remove a namespace's deduplication flag (fall back to broker default).
1391    ///
1392    /// `DELETE /admin/v2/namespaces/{tenant}/{ns}/deduplication`.
1393    /// Java: `NamespacesBase#removeDeduplication`.
1394    pub async fn namespace_remove_deduplication(&self, ns: &str) -> Result<(), AdminError> {
1395        let (tenant, namespace) = split_namespace(ns)?;
1396        let url = self.url(&["namespaces", tenant, namespace, "deduplication"])?;
1397        let resp = self.send(self.http.request(Method::DELETE, url)).await?;
1398        empty_ok(resp).await
1399    }
1400
1401    /// Get a namespace's deduplication-snapshot interval (entries).
1402    ///
1403    /// `GET /admin/v2/namespaces/{tenant}/{ns}/deduplicationSnapshotInterval`.
1404    /// Returns a bare integer (the entry count between dedup cursor
1405    /// snapshots), or `null` (decoded as `None`) when the broker default
1406    /// applies.
1407    /// Java: `NamespacesBase#getDeduplicationSnapshotInterval`.
1408    pub async fn namespace_get_deduplication_snapshot_interval(
1409        &self,
1410        ns: &str,
1411    ) -> Result<Option<i32>, AdminError> {
1412        let (tenant, namespace) = split_namespace(ns)?;
1413        let url = self.url(&[
1414            "namespaces",
1415            tenant,
1416            namespace,
1417            "deduplicationSnapshotInterval",
1418        ])?;
1419        let resp = self.send(self.http.request(Method::GET, url)).await?;
1420        json_ok_optional(resp).await
1421    }
1422
1423    /// Set a namespace's deduplication-snapshot interval (entries).
1424    ///
1425    /// `POST /admin/v2/namespaces/{tenant}/{ns}/deduplicationSnapshotInterval`
1426    /// with a bare JSON integer body.
1427    /// Java: `NamespacesBase#setDeduplicationSnapshotInterval`.
1428    pub async fn namespace_set_deduplication_snapshot_interval(
1429        &self,
1430        ns: &str,
1431        interval_entries: i32,
1432    ) -> Result<(), AdminError> {
1433        let (tenant, namespace) = split_namespace(ns)?;
1434        let url = self.url(&[
1435            "namespaces",
1436            tenant,
1437            namespace,
1438            "deduplicationSnapshotInterval",
1439        ])?;
1440        let resp = self
1441            .send(self.http.request(Method::POST, url).json(&interval_entries))
1442            .await?;
1443        empty_ok(resp).await
1444    }
1445
1446    /// Remove a namespace's deduplication-snapshot interval override.
1447    ///
1448    /// `DELETE /admin/v2/namespaces/{tenant}/{ns}/deduplicationSnapshotInterval`.
1449    /// Java: `NamespacesBase#deleteDeduplicationSnapshotInterval`.
1450    pub async fn namespace_remove_deduplication_snapshot_interval(
1451        &self,
1452        ns: &str,
1453    ) -> Result<(), AdminError> {
1454        let (tenant, namespace) = split_namespace(ns)?;
1455        let url = self.url(&[
1456            "namespaces",
1457            tenant,
1458            namespace,
1459            "deduplicationSnapshotInterval",
1460        ])?;
1461        let resp = self.send(self.http.request(Method::DELETE, url)).await?;
1462        empty_ok(resp).await
1463    }
1464
1465    /// Get a namespace's compaction threshold (bytes).
1466    ///
1467    /// `GET /admin/v2/namespaces/{tenant}/{ns}/compactionThreshold`. Returns
1468    /// a bare integer (bytes of accumulated topic backlog above which the
1469    /// broker triggers automatic compaction), or `null` (decoded as `None`)
1470    /// when the broker default applies.
1471    /// Java: `NamespacesBase#getCompactionThreshold`.
1472    pub async fn namespace_get_compaction_threshold(
1473        &self,
1474        ns: &str,
1475    ) -> Result<Option<i64>, AdminError> {
1476        let (tenant, namespace) = split_namespace(ns)?;
1477        let url = self.url(&["namespaces", tenant, namespace, "compactionThreshold"])?;
1478        let resp = self.send(self.http.request(Method::GET, url)).await?;
1479        json_ok_optional(resp).await
1480    }
1481
1482    /// Set a namespace's compaction threshold (bytes).
1483    ///
1484    /// `PUT /admin/v2/namespaces/{tenant}/{ns}/compactionThreshold` with
1485    /// a bare JSON long body. `0` disables automatic compaction.
1486    ///
1487    /// Note: this endpoint is `@PUT` in Pulsar 4 (`Namespaces.java`
1488    /// declares `@PUT @Path("/{tenant}/{namespace}/compactionThreshold")`),
1489    /// not POST — using POST yields a `405 Method Not Allowed`.
1490    /// Java: `NamespacesBase#setCompactionThreshold`.
1491    pub async fn namespace_set_compaction_threshold(
1492        &self,
1493        ns: &str,
1494        threshold_bytes: i64,
1495    ) -> Result<(), AdminError> {
1496        let (tenant, namespace) = split_namespace(ns)?;
1497        let url = self.url(&["namespaces", tenant, namespace, "compactionThreshold"])?;
1498        let resp = self
1499            .send(self.http.request(Method::PUT, url).json(&threshold_bytes))
1500            .await?;
1501        empty_ok(resp).await
1502    }
1503
1504    /// Remove a namespace's compaction threshold override.
1505    ///
1506    /// `DELETE /admin/v2/namespaces/{tenant}/{ns}/compactionThreshold`.
1507    /// Java: `NamespacesBase#deleteCompactionThreshold`.
1508    pub async fn namespace_remove_compaction_threshold(&self, ns: &str) -> Result<(), AdminError> {
1509        let (tenant, namespace) = split_namespace(ns)?;
1510        let url = self.url(&["namespaces", tenant, namespace, "compactionThreshold"])?;
1511        let resp = self.send(self.http.request(Method::DELETE, url)).await?;
1512        empty_ok(resp).await
1513    }
1514
1515    /// Get a namespace's delayed-delivery policy.
1516    ///
1517    /// `GET /admin/v2/namespaces/{tenant}/{ns}/delayedDelivery`. Returns
1518    /// the active flag + tick time (the broker's index-tick granularity
1519    /// for delivering delayed messages). `null` decodes as `None`.
1520    /// Java: `NamespacesBase#getDelayedDeliveryPolicies`.
1521    pub async fn namespace_get_delayed_delivery(
1522        &self,
1523        ns: &str,
1524    ) -> Result<Option<DelayedDeliveryPolicies>, AdminError> {
1525        let (tenant, namespace) = split_namespace(ns)?;
1526        let url = self.url(&["namespaces", tenant, namespace, "delayedDelivery"])?;
1527        let resp = self.send(self.http.request(Method::GET, url)).await?;
1528        json_ok_optional(resp).await
1529    }
1530
1531    /// Set a namespace's delayed-delivery policy.
1532    ///
1533    /// `POST /admin/v2/namespaces/{tenant}/{ns}/delayedDelivery` with a
1534    /// JSON `DelayedDeliveryPolicies` body.
1535    /// Java: `NamespacesBase#setDelayedDeliveryPolicies`.
1536    pub async fn namespace_set_delayed_delivery(
1537        &self,
1538        ns: &str,
1539        policy: DelayedDeliveryPolicies,
1540    ) -> Result<(), AdminError> {
1541        let (tenant, namespace) = split_namespace(ns)?;
1542        let url = self.url(&["namespaces", tenant, namespace, "delayedDelivery"])?;
1543        let resp = self
1544            .send(self.http.request(Method::POST, url).json(&policy))
1545            .await?;
1546        empty_ok(resp).await
1547    }
1548
1549    /// Remove a namespace's delayed-delivery policy override.
1550    ///
1551    /// `DELETE /admin/v2/namespaces/{tenant}/{ns}/delayedDelivery`.
1552    /// Java: `NamespacesBase#removeDelayedDeliveryPolicies`.
1553    pub async fn namespace_remove_delayed_delivery(&self, ns: &str) -> Result<(), AdminError> {
1554        let (tenant, namespace) = split_namespace(ns)?;
1555        let url = self.url(&["namespaces", tenant, namespace, "delayedDelivery"])?;
1556        let resp = self.send(self.http.request(Method::DELETE, url)).await?;
1557        empty_ok(resp).await
1558    }
1559
1560    /// Get a namespace's max-producers-per-topic limit.
1561    ///
1562    /// `GET /admin/v2/namespaces/{tenant}/{ns}/maxProducersPerTopic`. Returns
1563    /// a bare integer (the per-topic ceiling on concurrent producer
1564    /// connections), or `null` (decoded as `None`) when the broker default
1565    /// applies.
1566    /// Java: `NamespacesBase#getMaxProducersPerTopic`.
1567    pub async fn namespace_get_max_producers_per_topic(
1568        &self,
1569        ns: &str,
1570    ) -> Result<Option<i32>, AdminError> {
1571        let (tenant, namespace) = split_namespace(ns)?;
1572        let url = self.url(&["namespaces", tenant, namespace, "maxProducersPerTopic"])?;
1573        let resp = self.send(self.http.request(Method::GET, url)).await?;
1574        json_ok_optional(resp).await
1575    }
1576
1577    /// Set a namespace's max-producers-per-topic limit.
1578    ///
1579    /// `POST /admin/v2/namespaces/{tenant}/{ns}/maxProducersPerTopic` with
1580    /// a bare JSON integer body. `0` disables the limit.
1581    /// Java: `NamespacesBase#setMaxProducersPerTopic`.
1582    pub async fn namespace_set_max_producers_per_topic(
1583        &self,
1584        ns: &str,
1585        max_producers: i32,
1586    ) -> Result<(), AdminError> {
1587        let (tenant, namespace) = split_namespace(ns)?;
1588        let url = self.url(&["namespaces", tenant, namespace, "maxProducersPerTopic"])?;
1589        let resp = self
1590            .send(self.http.request(Method::POST, url).json(&max_producers))
1591            .await?;
1592        empty_ok(resp).await
1593    }
1594
1595    /// Remove a namespace's max-producers-per-topic limit override.
1596    ///
1597    /// `DELETE /admin/v2/namespaces/{tenant}/{ns}/maxProducersPerTopic`.
1598    /// Java: `NamespacesBase#removeMaxProducersPerTopic`.
1599    pub async fn namespace_remove_max_producers_per_topic(
1600        &self,
1601        ns: &str,
1602    ) -> Result<(), AdminError> {
1603        let (tenant, namespace) = split_namespace(ns)?;
1604        let url = self.url(&["namespaces", tenant, namespace, "maxProducersPerTopic"])?;
1605        let resp = self.send(self.http.request(Method::DELETE, url)).await?;
1606        empty_ok(resp).await
1607    }
1608
1609    /// Get a namespace's max-consumers-per-topic limit.
1610    ///
1611    /// `GET /admin/v2/namespaces/{tenant}/{ns}/maxConsumersPerTopic`. Returns
1612    /// a bare integer (the per-topic ceiling on concurrent consumer
1613    /// connections across all subscriptions), or `null` (decoded as `None`)
1614    /// when the broker default applies.
1615    /// Java: `NamespacesBase#getMaxConsumersPerTopic`.
1616    pub async fn namespace_get_max_consumers_per_topic(
1617        &self,
1618        ns: &str,
1619    ) -> Result<Option<i32>, AdminError> {
1620        let (tenant, namespace) = split_namespace(ns)?;
1621        let url = self.url(&["namespaces", tenant, namespace, "maxConsumersPerTopic"])?;
1622        let resp = self.send(self.http.request(Method::GET, url)).await?;
1623        json_ok_optional(resp).await
1624    }
1625
1626    /// Set a namespace's max-consumers-per-topic limit.
1627    ///
1628    /// `POST /admin/v2/namespaces/{tenant}/{ns}/maxConsumersPerTopic` with
1629    /// a bare JSON integer body. `0` disables the limit.
1630    /// Java: `NamespacesBase#setMaxConsumersPerTopic`.
1631    pub async fn namespace_set_max_consumers_per_topic(
1632        &self,
1633        ns: &str,
1634        max_consumers: i32,
1635    ) -> Result<(), AdminError> {
1636        let (tenant, namespace) = split_namespace(ns)?;
1637        let url = self.url(&["namespaces", tenant, namespace, "maxConsumersPerTopic"])?;
1638        let resp = self
1639            .send(self.http.request(Method::POST, url).json(&max_consumers))
1640            .await?;
1641        empty_ok(resp).await
1642    }
1643
1644    /// Remove a namespace's max-consumers-per-topic limit override.
1645    ///
1646    /// `DELETE /admin/v2/namespaces/{tenant}/{ns}/maxConsumersPerTopic`.
1647    /// Java: `NamespacesBase#removeMaxConsumersPerTopic`.
1648    pub async fn namespace_remove_max_consumers_per_topic(
1649        &self,
1650        ns: &str,
1651    ) -> Result<(), AdminError> {
1652        let (tenant, namespace) = split_namespace(ns)?;
1653        let url = self.url(&["namespaces", tenant, namespace, "maxConsumersPerTopic"])?;
1654        let resp = self.send(self.http.request(Method::DELETE, url)).await?;
1655        empty_ok(resp).await
1656    }
1657
1658    /// Get a namespace's max-unacked-messages-per-consumer limit.
1659    ///
1660    /// `GET /admin/v2/namespaces/{tenant}/{ns}/maxUnackedMessagesPerConsumer`.
1661    /// Returns a bare integer (the broker's per-consumer permit-pool cap
1662    /// before it stops dispatching), or `null` (decoded as `None`) when
1663    /// the broker default applies.
1664    /// Java: `NamespacesBase#getMaxUnackedMessagesPerConsumer`.
1665    pub async fn namespace_get_max_unacked_messages_per_consumer(
1666        &self,
1667        ns: &str,
1668    ) -> Result<Option<i32>, AdminError> {
1669        let (tenant, namespace) = split_namespace(ns)?;
1670        let url = self.url(&[
1671            "namespaces",
1672            tenant,
1673            namespace,
1674            "maxUnackedMessagesPerConsumer",
1675        ])?;
1676        let resp = self.send(self.http.request(Method::GET, url)).await?;
1677        json_ok_optional(resp).await
1678    }
1679
1680    /// Set a namespace's max-unacked-messages-per-consumer limit.
1681    ///
1682    /// `POST /admin/v2/namespaces/{tenant}/{ns}/maxUnackedMessagesPerConsumer`
1683    /// with a bare JSON integer body. `0` disables the limit.
1684    /// Java: `NamespacesBase#setMaxUnackedMessagesPerConsumer`.
1685    pub async fn namespace_set_max_unacked_messages_per_consumer(
1686        &self,
1687        ns: &str,
1688        max_unacked: i32,
1689    ) -> Result<(), AdminError> {
1690        let (tenant, namespace) = split_namespace(ns)?;
1691        let url = self.url(&[
1692            "namespaces",
1693            tenant,
1694            namespace,
1695            "maxUnackedMessagesPerConsumer",
1696        ])?;
1697        let resp = self
1698            .send(self.http.request(Method::POST, url).json(&max_unacked))
1699            .await?;
1700        empty_ok(resp).await
1701    }
1702
1703    /// Remove a namespace's max-unacked-messages-per-consumer override.
1704    ///
1705    /// `DELETE /admin/v2/namespaces/{tenant}/{ns}/maxUnackedMessagesPerConsumer`.
1706    /// Java: `NamespacesBase#removeMaxUnackedMessagesPerConsumer`.
1707    pub async fn namespace_remove_max_unacked_messages_per_consumer(
1708        &self,
1709        ns: &str,
1710    ) -> Result<(), AdminError> {
1711        let (tenant, namespace) = split_namespace(ns)?;
1712        let url = self.url(&[
1713            "namespaces",
1714            tenant,
1715            namespace,
1716            "maxUnackedMessagesPerConsumer",
1717        ])?;
1718        let resp = self.send(self.http.request(Method::DELETE, url)).await?;
1719        empty_ok(resp).await
1720    }
1721
1722    /// Get a namespace's max-unacked-messages-per-subscription limit.
1723    ///
1724    /// `GET /admin/v2/namespaces/{tenant}/{ns}/maxUnackedMessagesPerSubscription`.
1725    /// Returns a bare integer (the broker's per-subscription unacked
1726    /// ceiling — once exceeded the broker stops dispatching to every
1727    /// consumer on that subscription), or `null` (decoded as `None`)
1728    /// when the broker default applies.
1729    /// Java: `NamespacesBase#getMaxUnackedMessagesPerSubscription`.
1730    pub async fn namespace_get_max_unacked_messages_per_subscription(
1731        &self,
1732        ns: &str,
1733    ) -> Result<Option<i32>, AdminError> {
1734        let (tenant, namespace) = split_namespace(ns)?;
1735        let url = self.url(&[
1736            "namespaces",
1737            tenant,
1738            namespace,
1739            "maxUnackedMessagesPerSubscription",
1740        ])?;
1741        let resp = self.send(self.http.request(Method::GET, url)).await?;
1742        json_ok_optional(resp).await
1743    }
1744
1745    /// Set a namespace's max-unacked-messages-per-subscription limit.
1746    ///
1747    /// `POST /admin/v2/namespaces/{tenant}/{ns}/maxUnackedMessagesPerSubscription`
1748    /// with a bare JSON integer body. `0` disables the limit.
1749    /// Java: `NamespacesBase#setMaxUnackedMessagesPerSubscription`.
1750    pub async fn namespace_set_max_unacked_messages_per_subscription(
1751        &self,
1752        ns: &str,
1753        max_unacked: i32,
1754    ) -> Result<(), AdminError> {
1755        let (tenant, namespace) = split_namespace(ns)?;
1756        let url = self.url(&[
1757            "namespaces",
1758            tenant,
1759            namespace,
1760            "maxUnackedMessagesPerSubscription",
1761        ])?;
1762        let resp = self
1763            .send(self.http.request(Method::POST, url).json(&max_unacked))
1764            .await?;
1765        empty_ok(resp).await
1766    }
1767
1768    /// Remove a namespace's max-unacked-messages-per-subscription override.
1769    ///
1770    /// `DELETE /admin/v2/namespaces/{tenant}/{ns}/maxUnackedMessagesPerSubscription`.
1771    /// Java: `NamespacesBase#removeMaxUnackedMessagesPerSubscription`.
1772    pub async fn namespace_remove_max_unacked_messages_per_subscription(
1773        &self,
1774        ns: &str,
1775    ) -> Result<(), AdminError> {
1776        let (tenant, namespace) = split_namespace(ns)?;
1777        let url = self.url(&[
1778            "namespaces",
1779            tenant,
1780            namespace,
1781            "maxUnackedMessagesPerSubscription",
1782        ])?;
1783        let resp = self.send(self.http.request(Method::DELETE, url)).await?;
1784        empty_ok(resp).await
1785    }
1786
1787    // --- Topics ----------------------------------------------------------
1788
1789    /// List persistent topics in a namespace.
1790    ///
1791    /// `GET /admin/v2/persistent/{tenant}/{namespace}`.
1792    /// Java: `pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java`
1793    /// (`@Path("/persistent")` + `@GET @Path("/{tenant}/{namespace}")`).
1794    pub async fn topics_list(&self, namespace: &str) -> Result<Vec<String>, AdminError> {
1795        let (tenant, namespace) = split_namespace(namespace)?;
1796        let url = self.url(&["persistent", tenant, namespace])?;
1797        let resp = self.send(self.http.request(Method::GET, url)).await?;
1798        json_ok(resp).await
1799    }
1800
1801    /// Create a non-partitioned persistent topic.
1802    ///
1803    /// `PUT /admin/v2/persistent/{tenant}/{namespace}/{topic}`.
1804    /// Java: `PersistentTopics.java#createNonPartitionedTopic`
1805    /// (`@PUT @Path("/{tenant}/{namespace}/{topic}")`).
1806    pub async fn topic_create_non_partitioned(&self, topic: &str) -> Result<(), AdminError> {
1807        self.topic_create_non_partitioned_with_properties(topic, &HashMap::new())
1808            .await
1809    }
1810
1811    async fn topic_create_non_partitioned_with_properties(
1812        &self,
1813        topic: &str,
1814        properties: &HashMap<String, String>,
1815    ) -> Result<(), AdminError> {
1816        let (tenant, namespace, name) = split_topic(topic)?;
1817        let url = self.url(&["persistent", tenant, namespace, name])?;
1818        let resp = self
1819            .send(self.http.request(Method::PUT, url).json(properties))
1820            .await?;
1821        empty_ok(resp).await
1822    }
1823
1824    /// Create a partitioned topic with `partitions` partitions.
1825    ///
1826    /// `PUT /admin/v2/persistent/{tenant}/{namespace}/{topic}/partitions`
1827    /// with the partition count as a JSON integer body.
1828    /// Java: `PersistentTopics.java#createPartitionedTopic`
1829    /// (`@PUT @Path("/{tenant}/{namespace}/{topic}/partitions")`).
1830    pub async fn topic_create_partitioned(
1831        &self,
1832        topic: &str,
1833        partitions: u32,
1834    ) -> Result<(), AdminError> {
1835        let (tenant, namespace, name) = split_topic(topic)?;
1836        let url = self.url(&["persistent", tenant, namespace, name, "partitions"])?;
1837        let resp = self
1838            .send(self.http.request(Method::PUT, url).json(&partitions))
1839            .await?;
1840        empty_ok(resp).await
1841    }
1842
1843    /// Delete a topic, auto-detecting partitioned vs non-partitioned.
1844    ///
1845    /// Pulsar exposes two distinct delete endpoints — the partitioned
1846    /// parent uses `DELETE .../{topic}/partitions?force=…` and the
1847    /// non-partitioned topic uses `DELETE .../{topic}?force=…`. Hitting
1848    /// the partitioned endpoint on a non-partitioned topic returns 404
1849    /// ("Topic is not partitioned"), which used to surface as "the
1850    /// topic doesn't exist" to operators using `magnetar admin topics
1851    /// delete`.
1852    ///
1853    /// Probe via `topic_partitions_count` first (`GET .../partitions`
1854    /// returns `partitions: 0` for non-partitioned topics, `> 0` for a
1855    /// partitioned parent) and route to the matching endpoint. Same
1856    /// behaviour as pulsarctl's `topics delete`.
1857    ///
1858    /// Java: `PersistentTopics.java#deletePartitionedTopic` /
1859    /// `PersistentTopics.java#deleteTopic`.
1860    pub async fn topic_delete(&self, topic: &str, force: bool) -> Result<(), AdminError> {
1861        let (tenant, namespace, name) = split_topic(topic)?;
1862        let partitions = self.topic_partitions_count(topic).await?;
1863        let force_str = if force { "true" } else { "false" };
1864        let mut url = if partitions > 0 {
1865            self.url(&["persistent", tenant, namespace, name, "partitions"])?
1866        } else {
1867            self.url(&["persistent", tenant, namespace, name])?
1868        };
1869        url.query_pairs_mut().append_pair("force", force_str);
1870        let resp = self.send(self.http.request(Method::DELETE, url)).await?;
1871        empty_ok(resp).await
1872    }
1873
1874    /// Get topic stats.
1875    ///
1876    /// `GET /admin/v2/persistent/{tenant}/{namespace}/{topic}/stats`.
1877    /// Java: `PersistentTopics.java#getStats`
1878    /// (`@GET @Path("/{tenant}/{namespace}/{topic}/stats")`,
1879    /// response shape `PersistentTopicStats`).
1880    ///
1881    /// For a **partitioned** topic, the broker returns 404 on this endpoint
1882    /// because there is no ledger backing the parent name. Call
1883    /// [`Self::topic_partitioned_stats`] instead, or look up the count via
1884    /// [`Self::topic_partitions_count`] first.
1885    pub async fn topic_stats(&self, topic: &str) -> Result<TopicStats, AdminError> {
1886        let (tenant, namespace, name) = split_topic(topic)?;
1887        let url = self.url(&["persistent", tenant, namespace, name, "stats"])?;
1888        let resp = self.send(self.http.request(Method::GET, url)).await?;
1889        json_ok(resp).await
1890    }
1891
1892    /// Get aggregated stats for a partitioned topic.
1893    ///
1894    /// `GET /admin/v2/persistent/{tenant}/{namespace}/{topic}/partitioned-stats?
1895    /// perPartition=false`. Java: `PersistentTopics.java#getPartitionedStats`
1896    /// (`@GET @Path("/{tenant}/{namespace}/{topic}/partitioned-stats")`,
1897    /// response shape `PartitionedTopicStats` which extends
1898    /// `PersistentTopicStats` with `partitions: Map<String, TopicStats>`
1899    /// and `metadata: PartitionedTopicMetadata`).
1900    ///
1901    /// magnetar exposes only the aggregated top-level metrics through the
1902    /// same [`TopicStats`] shape — the broker populates the rate, throughput,
1903    /// size, and counter fields at the response root summed across partitions.
1904    /// The `partitions` and `metadata` fields are
1905    /// dropped on deserialisation; for per-partition detail call
1906    /// [`Self::topic_stats`] on each `<topic>-partition-N` instead. We pass
1907    /// `perPartition=false` to keep the wire response small.
1908    pub async fn topic_partitioned_stats(&self, topic: &str) -> Result<TopicStats, AdminError> {
1909        let (tenant, namespace, name) = split_topic(topic)?;
1910        let mut url = self.url(&["persistent", tenant, namespace, name, "partitioned-stats"])?;
1911        url.query_pairs_mut().append_pair("perPartition", "false");
1912        let resp = self.send(self.http.request(Method::GET, url)).await?;
1913        json_ok(resp).await
1914    }
1915
1916    /// Resolve the partition count of a topic.
1917    ///
1918    /// `GET /admin/v2/persistent/{tenant}/{namespace}/{topic}/partitions`.
1919    /// Java: `PersistentTopics.java#getPartitionedMetadata`
1920    /// (`@GET @Path("/{tenant}/{namespace}/{topic}/partitions")`,
1921    /// response shape `PartitionedTopicMetadata{ partitions: int }`).
1922    ///
1923    /// Returns `0` for non-partitioned topics; lets a caller disambiguate
1924    /// between [`Self::topic_stats`] and [`Self::topic_partitioned_stats`]
1925    /// when the topology is not known in advance.
1926    pub async fn topic_partitions_count(&self, topic: &str) -> Result<u32, AdminError> {
1927        let (tenant, namespace, name) = split_topic(topic)?;
1928        let url = self.url(&["persistent", tenant, namespace, name, "partitions"])?;
1929        let resp = self.send(self.http.request(Method::GET, url)).await?;
1930        let meta: PartitionedTopicMetadata = json_ok(resp).await?;
1931        Ok(meta.partitions)
1932    }
1933
1934    /// Resolve a broker-entry-metadata `index` to a [`MessageId`] (PIP-415).
1935    ///
1936    /// `GET /admin/v2/persistent/{tenant}/{namespace}/{topic}/getMessageIdByIndex?index={index}`.
1937    /// Per [PIP-415](https://github.com/apache/pulsar/blob/master/pip/pip-415.md)
1938    /// this is **REST-only** — the spec's "Binary protocol" section is
1939    /// intentionally empty and the canonical implementation PR
1940    /// [`apache/pulsar#24222`](https://github.com/apache/pulsar/pull/24222)
1941    /// (merged 2025-06-23) touches only admin / broker / CLI Java code.
1942    ///
1943    /// Java:
1944    /// `pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java`
1945    /// (`@GET @Path("/{tenant}/{namespace}/{topic}/getMessageIdByIndex")`,
1946    /// `@QueryParam("index") long`); admin-client side is
1947    /// `pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/
1948    /// TopicsImpl.java#getMessageIdByIndexAsync` which deserialises the
1949    /// response into `MessageIdImpl` (i.e. `{ledgerId, entryId, partitionIndex}`).
1950    ///
1951    /// `topic` follows the same rule as every other topic-scoped method:
1952    /// either `persistent://tenant/ns/topic` or `tenant/ns/topic`. For a
1953    /// partitioned topic, pass the specific partition (`my-topic-partition-0`).
1954    ///
1955    /// The response carries only `(ledgerId, entryId, partitionIndex)`. The
1956    /// returned [`MessageId`] sets `batch_index = -1` and `batch_size = -1`
1957    /// because the broker resolves at entry granularity — see PIP-415 §"Why
1958    /// Precise Index Matching Isn't Implemented on the Broker Side".
1959    pub async fn topic_get_message_id_by_index(
1960        &self,
1961        topic: &str,
1962        index: i64,
1963    ) -> Result<MessageId, AdminError> {
1964        let (tenant, namespace, name) = split_topic(topic)?;
1965        let mut url = self.url(&["persistent", tenant, namespace, name, "getMessageIdByIndex"])?;
1966        url.query_pairs_mut()
1967            .append_pair("index", &index.to_string());
1968        let resp = self.send(self.http.request(Method::GET, url)).await?;
1969        let dto: MessageIdResponse = json_ok(resp).await?;
1970        dto.try_into_message_id()
1971    }
1972
1973    /// Trigger ledger compaction for a topic.
1974    ///
1975    /// `PUT /admin/v2/persistent/{tenant}/{namespace}/{topic}/compaction`.
1976    /// Returns 204 on success; the broker queues the work asynchronously —
1977    /// poll [`Self::topic_compaction_status`] to observe progress.
1978    /// Java: `PersistentTopics#triggerCompaction`.
1979    pub async fn topic_compact(&self, topic: &str) -> Result<(), AdminError> {
1980        let (tenant, namespace, name) = split_topic(topic)?;
1981        let url = self.url(&["persistent", tenant, namespace, name, "compaction"])?;
1982        let resp = self.send(self.http.request(Method::PUT, url)).await?;
1983        empty_ok(resp).await
1984    }
1985
1986    /// Get the current compaction status for a topic.
1987    ///
1988    /// `GET /admin/v2/persistent/{tenant}/{namespace}/{topic}/compaction`.
1989    /// Returns Java's `LongRunningProcessStatus`: `status` ∈ {`NOT_RUN`,
1990    /// `RUNNING`, `SUCCESS`, `ERROR`} plus an optional `lastError` string.
1991    /// Java: `PersistentTopics#compactionStatus`.
1992    pub async fn topic_compaction_status(
1993        &self,
1994        topic: &str,
1995    ) -> Result<LongRunningProcessStatus, AdminError> {
1996        let (tenant, namespace, name) = split_topic(topic)?;
1997        let url = self.url(&["persistent", tenant, namespace, name, "compaction"])?;
1998        let resp = self.send(self.http.request(Method::GET, url)).await?;
1999        json_ok(resp).await
2000    }
2001
2002    /// Unload a topic from its current broker (forces rebalancing).
2003    ///
2004    /// `PUT /admin/v2/persistent/{tenant}/{namespace}/{topic}/unload`.
2005    /// Operators use this to drain a hot broker or to re-elect ownership
2006    /// after a configuration change. Java: `PersistentTopics#unloadTopic`.
2007    pub async fn topic_unload(&self, topic: &str) -> Result<(), AdminError> {
2008        let (tenant, namespace, name) = split_topic(topic)?;
2009        let url = self.url(&["persistent", tenant, namespace, name, "unload"])?;
2010        let resp = self.send(self.http.request(Method::PUT, url)).await?;
2011        empty_ok(resp).await
2012    }
2013
2014    /// Terminate (seal) a topic — no further produces succeed.
2015    ///
2016    /// `POST /admin/v2/persistent/{tenant}/{namespace}/{topic}/terminate`.
2017    /// Returns the [`MessageId`] of the last message that landed before
2018    /// the seal, or `None` when the broker reports the
2019    /// `MessageIdImpl(-1, -1)` sentinel — meaning the topic was sealed
2020    /// before any confirmed entry was written (a freshly-created topic,
2021    /// or a topic whose owner was just re-elected). The Java client
2022    /// surfaces that case as `MessageId.earliest`; we use `Option` so
2023    /// callers don't have to special-case a magic value.
2024    /// Java: `PersistentTopics#terminate`.
2025    pub async fn topic_terminate(&self, topic: &str) -> Result<Option<MessageId>, AdminError> {
2026        let (tenant, namespace, name) = split_topic(topic)?;
2027        let url = self.url(&["persistent", tenant, namespace, name, "terminate"])?;
2028        let resp = self.send(self.http.request(Method::POST, url)).await?;
2029        let dto: MessageIdResponse = json_ok(resp).await?;
2030        if dto.ledger_id < 0 && dto.entry_id < 0 {
2031            return Ok(None);
2032        }
2033        dto.try_into_message_id().map(Some)
2034    }
2035
2036    /// Grow a partitioned topic's partition count.
2037    ///
2038    /// `POST /admin/v2/persistent/{tenant}/{namespace}/{topic}/partitions`
2039    /// with a bare JSON integer body. Only forward (grow) is supported by
2040    /// the broker — shrinking returns 409. Java:
2041    /// `PersistentTopics#updatePartitionedTopic`.
2042    pub async fn topic_update_partitions(
2043        &self,
2044        topic: &str,
2045        new_partitions: u32,
2046    ) -> Result<(), AdminError> {
2047        let (tenant, namespace, name) = split_topic(topic)?;
2048        let url = self.url(&["persistent", tenant, namespace, name, "partitions"])?;
2049        let resp = self
2050            .send(self.http.request(Method::POST, url).json(&new_partitions))
2051            .await?;
2052        empty_ok(resp).await
2053    }
2054
2055    // --- Topic policies — per-topic overrides ---------------------------
2056
2057    /// Get a topic's retention policy.
2058    ///
2059    /// `GET /admin/v2/persistent/{tenant}/{ns}/{topic}/retention`.
2060    /// Returns the per-topic [`RetentionPolicies`] override; the broker
2061    /// emits a `RetentionPolicies` JSON when the policy is set and a bare
2062    /// `null` (decoded as `RetentionPolicies::default()` via `#[serde(default)]`)
2063    /// when no override is in place — callers fall back to the namespace
2064    /// policy in that case. Java: `PersistentTopicsBase#getRetention`.
2065    pub async fn topic_get_retention(&self, topic: &str) -> Result<RetentionPolicies, AdminError> {
2066        let (tenant, namespace, name) = split_topic(topic)?;
2067        let url = self.url(&["persistent", tenant, namespace, name, "retention"])?;
2068        let resp = self.send(self.http.request(Method::GET, url)).await?;
2069        json_ok_or_default(resp).await
2070    }
2071
2072    /// Set a topic's retention policy (overrides the namespace default).
2073    ///
2074    /// `POST /admin/v2/persistent/{tenant}/{ns}/{topic}/retention` with a
2075    /// JSON `RetentionPolicies` body. `-1` means infinite (size or time).
2076    /// Java: `PersistentTopicsBase#setRetention`.
2077    pub async fn topic_set_retention(
2078        &self,
2079        topic: &str,
2080        policy: RetentionPolicies,
2081    ) -> Result<(), AdminError> {
2082        let (tenant, namespace, name) = split_topic(topic)?;
2083        let url = self.url(&["persistent", tenant, namespace, name, "retention"])?;
2084        let resp = self
2085            .send(self.http.request(Method::POST, url).json(&policy))
2086            .await?;
2087        empty_ok(resp).await
2088    }
2089
2090    /// Remove a topic's retention policy (fall back to namespace default).
2091    ///
2092    /// `DELETE /admin/v2/persistent/{tenant}/{ns}/{topic}/retention`.
2093    /// Java: `PersistentTopicsBase#removeRetention`.
2094    pub async fn topic_remove_retention(&self, topic: &str) -> Result<(), AdminError> {
2095        let (tenant, namespace, name) = split_topic(topic)?;
2096        let url = self.url(&["persistent", tenant, namespace, name, "retention"])?;
2097        let resp = self.send(self.http.request(Method::DELETE, url)).await?;
2098        empty_ok(resp).await
2099    }
2100
2101    /// Get all backlog-quota policies on a topic.
2102    ///
2103    /// `GET /admin/v2/persistent/{tenant}/{ns}/{topic}/backlogQuotaMap`.
2104    /// Returns `Map<BacklogQuotaType, BacklogQuota>` — kept as raw JSON
2105    /// for the same reason as [`Self::namespace_get_backlog_quotas`]:
2106    /// broker minor versions add quota types.
2107    /// Java: `PersistentTopicsBase#getBacklogQuotaMap`.
2108    pub async fn topic_get_backlog_quotas(
2109        &self,
2110        topic: &str,
2111    ) -> Result<serde_json::Value, AdminError> {
2112        let (tenant, namespace, name) = split_topic(topic)?;
2113        let url = self.url(&["persistent", tenant, namespace, name, "backlogQuotaMap"])?;
2114        let resp = self.send(self.http.request(Method::GET, url)).await?;
2115        json_ok(resp).await
2116    }
2117
2118    /// Set a backlog-quota policy on a topic (overrides the namespace
2119    /// default for the matching `backlogQuotaType`).
2120    ///
2121    /// `POST /admin/v2/persistent/{tenant}/{ns}/{topic}/backlogQuota
2122    /// ?backlogQuotaType={type}` with a JSON `BacklogQuota` body.
2123    /// Java: `PersistentTopicsBase#setBacklogQuota`.
2124    pub async fn topic_set_backlog_quota(
2125        &self,
2126        topic: &str,
2127        backlog_quota_type: BacklogQuotaType,
2128        quota: BacklogQuota,
2129    ) -> Result<(), AdminError> {
2130        let (tenant, namespace, name) = split_topic(topic)?;
2131        let mut url = self.url(&["persistent", tenant, namespace, name, "backlogQuota"])?;
2132        url.query_pairs_mut()
2133            .append_pair("backlogQuotaType", backlog_quota_type.as_query_value());
2134        let resp = self
2135            .send(self.http.request(Method::POST, url).json(&quota))
2136            .await?;
2137        empty_ok(resp).await
2138    }
2139
2140    /// Remove a backlog-quota policy from a topic.
2141    ///
2142    /// `DELETE /admin/v2/persistent/{tenant}/{ns}/{topic}/backlogQuota
2143    /// ?backlogQuotaType={type}`.
2144    /// Java: `PersistentTopicsBase#removeBacklogQuota`.
2145    pub async fn topic_remove_backlog_quota(
2146        &self,
2147        topic: &str,
2148        backlog_quota_type: BacklogQuotaType,
2149    ) -> Result<(), AdminError> {
2150        let (tenant, namespace, name) = split_topic(topic)?;
2151        let mut url = self.url(&["persistent", tenant, namespace, name, "backlogQuota"])?;
2152        url.query_pairs_mut()
2153            .append_pair("backlogQuotaType", backlog_quota_type.as_query_value());
2154        let resp = self.send(self.http.request(Method::DELETE, url)).await?;
2155        empty_ok(resp).await
2156    }
2157
2158    /// Get a topic's message-TTL (seconds, or `null` if unset).
2159    ///
2160    /// `GET /admin/v2/persistent/{tenant}/{ns}/{topic}/messageTTL`. Returns
2161    /// a bare integer when the override is set, `null` (decoded as
2162    /// `Option::None`) when no topic-level override is in place.
2163    /// Java: `PersistentTopicsBase#getMessageTTL`.
2164    pub async fn topic_get_message_ttl(&self, topic: &str) -> Result<Option<i32>, AdminError> {
2165        let (tenant, namespace, name) = split_topic(topic)?;
2166        let url = self.url(&["persistent", tenant, namespace, name, "messageTTL"])?;
2167        let resp = self.send(self.http.request(Method::GET, url)).await?;
2168        json_ok_optional(resp).await
2169    }
2170
2171    /// Set a topic's message-TTL (seconds).
2172    ///
2173    /// `POST /admin/v2/persistent/{tenant}/{ns}/{topic}/messageTTL?messageTTL={n}`.
2174    /// `0` disables (broker treats as no TTL).
2175    ///
2176    /// Note: unlike the **namespace**-level `setNamespaceMessageTTL` (which
2177    /// takes the TTL as a JSON int body), the topic-level setter binds
2178    /// `@QueryParam("messageTTL") Integer messageTTL` on both Pulsar 4.0 and
2179    /// 4.2 (`pulsar-broker/.../v2/PersistentTopics.java#setMessageTTL`).
2180    /// Sending the value as a JSON body returns 204 No Content but the
2181    /// broker reads the query param as `null` and silently treats the call
2182    /// as "no override" — the topic policy never persists, and a subsequent
2183    /// `topic_get_message_ttl` surfaces `Ok(None)`. Older Pulsar releases
2184    /// tolerated both encodings; 4.2 enforces the query-param shape.
2185    /// Java: `PersistentTopicsBase#setMessageTTL`.
2186    pub async fn topic_set_message_ttl(
2187        &self,
2188        topic: &str,
2189        ttl_seconds: i32,
2190    ) -> Result<(), AdminError> {
2191        let (tenant, namespace, name) = split_topic(topic)?;
2192        let mut url = self.url(&["persistent", tenant, namespace, name, "messageTTL"])?;
2193        url.query_pairs_mut()
2194            .append_pair("messageTTL", &ttl_seconds.to_string());
2195        let resp = self.send(self.http.request(Method::POST, url)).await?;
2196        empty_ok(resp).await
2197    }
2198
2199    /// Remove a topic's message-TTL (fall back to namespace default).
2200    ///
2201    /// `DELETE /admin/v2/persistent/{tenant}/{ns}/{topic}/messageTTL`.
2202    /// Java: `PersistentTopicsBase#removeMessageTTL`.
2203    pub async fn topic_remove_message_ttl(&self, topic: &str) -> Result<(), AdminError> {
2204        let (tenant, namespace, name) = split_topic(topic)?;
2205        let url = self.url(&["persistent", tenant, namespace, name, "messageTTL"])?;
2206        let resp = self.send(self.http.request(Method::DELETE, url)).await?;
2207        empty_ok(resp).await
2208    }
2209
2210    /// Get a topic's persistence policy.
2211    ///
2212    /// `GET /admin/v2/persistent/{tenant}/{ns}/{topic}/persistence`. The
2213    /// broker emits a `PersistencePolicies` JSON when the topic override
2214    /// is set and `null` (decoded as `Option::None`) when no override is
2215    /// in place — callers fall back to the namespace policy in that case.
2216    /// Java: `PersistentTopicsBase#getPersistence`.
2217    pub async fn topic_get_persistence(
2218        &self,
2219        topic: &str,
2220    ) -> Result<Option<PersistencePolicies>, AdminError> {
2221        let (tenant, namespace, name) = split_topic(topic)?;
2222        let url = self.url(&["persistent", tenant, namespace, name, "persistence"])?;
2223        let resp = self.send(self.http.request(Method::GET, url)).await?;
2224        json_ok_optional(resp).await
2225    }
2226
2227    /// Set a topic's persistence policy (overrides the namespace default).
2228    ///
2229    /// `POST /admin/v2/persistent/{tenant}/{ns}/{topic}/persistence` with a
2230    /// JSON `PersistencePolicies` body.
2231    /// Java: `PersistentTopicsBase#setPersistence`.
2232    pub async fn topic_set_persistence(
2233        &self,
2234        topic: &str,
2235        policy: PersistencePolicies,
2236    ) -> Result<(), AdminError> {
2237        let (tenant, namespace, name) = split_topic(topic)?;
2238        let url = self.url(&["persistent", tenant, namespace, name, "persistence"])?;
2239        let resp = self
2240            .send(self.http.request(Method::POST, url).json(&policy))
2241            .await?;
2242        empty_ok(resp).await
2243    }
2244
2245    /// Remove a topic's persistence policy (fall back to namespace default).
2246    ///
2247    /// `DELETE /admin/v2/persistent/{tenant}/{ns}/{topic}/persistence`.
2248    /// Java: `PersistentTopicsBase#removePersistence`.
2249    pub async fn topic_remove_persistence(&self, topic: &str) -> Result<(), AdminError> {
2250        let (tenant, namespace, name) = split_topic(topic)?;
2251        let url = self.url(&["persistent", tenant, namespace, name, "persistence"])?;
2252        let resp = self.send(self.http.request(Method::DELETE, url)).await?;
2253        empty_ok(resp).await
2254    }
2255
2256    /// Get a topic's consumer dispatch-rate policy (or `null` if no override).
2257    ///
2258    /// `GET /admin/v2/persistent/{tenant}/{ns}/{topic}/dispatchRate`. The
2259    /// broker emits the per-topic [`DispatchRate`] override or `null` when
2260    /// no override is set; callers fall back to the namespace policy in the
2261    /// `None` case. Java: `PersistentTopicsBase#getDispatchRate`.
2262    pub async fn topic_get_dispatch_rate(
2263        &self,
2264        topic: &str,
2265    ) -> Result<Option<DispatchRate>, AdminError> {
2266        let (tenant, namespace, name) = split_topic(topic)?;
2267        let url = self.url(&["persistent", tenant, namespace, name, "dispatchRate"])?;
2268        let resp = self.send(self.http.request(Method::GET, url)).await?;
2269        json_ok_optional(resp).await
2270    }
2271
2272    /// Set a topic's consumer dispatch-rate policy (overrides namespace default).
2273    ///
2274    /// `POST /admin/v2/persistent/{tenant}/{ns}/{topic}/dispatchRate` with a
2275    /// JSON `DispatchRate` body. Java: `PersistentTopicsBase#setDispatchRate`.
2276    pub async fn topic_set_dispatch_rate(
2277        &self,
2278        topic: &str,
2279        rate: DispatchRate,
2280    ) -> Result<(), AdminError> {
2281        let (tenant, namespace, name) = split_topic(topic)?;
2282        let url = self.url(&["persistent", tenant, namespace, name, "dispatchRate"])?;
2283        let resp = self
2284            .send(self.http.request(Method::POST, url).json(&rate))
2285            .await?;
2286        empty_ok(resp).await
2287    }
2288
2289    /// Remove a topic's consumer dispatch-rate policy.
2290    ///
2291    /// `DELETE /admin/v2/persistent/{tenant}/{ns}/{topic}/dispatchRate`.
2292    /// Java: `PersistentTopicsBase#removeDispatchRate`.
2293    pub async fn topic_remove_dispatch_rate(&self, topic: &str) -> Result<(), AdminError> {
2294        let (tenant, namespace, name) = split_topic(topic)?;
2295        let url = self.url(&["persistent", tenant, namespace, name, "dispatchRate"])?;
2296        let resp = self.send(self.http.request(Method::DELETE, url)).await?;
2297        empty_ok(resp).await
2298    }
2299
2300    /// Get a topic's per-subscription dispatch-rate policy (or `null`).
2301    ///
2302    /// `GET /admin/v2/persistent/{tenant}/{ns}/{topic}/subscriptionDispatchRate`.
2303    /// Reuses the [`DispatchRate`] body shape — the policy applies per
2304    /// subscription rather than aggregated across all consumers.
2305    /// Java: `PersistentTopicsBase#getSubscriptionDispatchRate`.
2306    pub async fn topic_get_subscription_dispatch_rate(
2307        &self,
2308        topic: &str,
2309    ) -> Result<Option<DispatchRate>, AdminError> {
2310        let (tenant, namespace, name) = split_topic(topic)?;
2311        let url = self.url(&[
2312            "persistent",
2313            tenant,
2314            namespace,
2315            name,
2316            "subscriptionDispatchRate",
2317        ])?;
2318        let resp = self.send(self.http.request(Method::GET, url)).await?;
2319        json_ok_optional(resp).await
2320    }
2321
2322    /// Set a topic's per-subscription dispatch-rate policy (overrides namespace default).
2323    ///
2324    /// `POST /admin/v2/persistent/{tenant}/{ns}/{topic}/subscriptionDispatchRate`
2325    /// with a JSON `DispatchRate` body.
2326    /// Java: `PersistentTopicsBase#setSubscriptionDispatchRate`.
2327    pub async fn topic_set_subscription_dispatch_rate(
2328        &self,
2329        topic: &str,
2330        rate: DispatchRate,
2331    ) -> Result<(), AdminError> {
2332        let (tenant, namespace, name) = split_topic(topic)?;
2333        let url = self.url(&[
2334            "persistent",
2335            tenant,
2336            namespace,
2337            name,
2338            "subscriptionDispatchRate",
2339        ])?;
2340        let resp = self
2341            .send(self.http.request(Method::POST, url).json(&rate))
2342            .await?;
2343        empty_ok(resp).await
2344    }
2345
2346    /// Remove a topic's per-subscription dispatch-rate policy.
2347    ///
2348    /// `DELETE /admin/v2/persistent/{tenant}/{ns}/{topic}/subscriptionDispatchRate`.
2349    /// Java: `PersistentTopicsBase#removeSubscriptionDispatchRate`.
2350    pub async fn topic_remove_subscription_dispatch_rate(
2351        &self,
2352        topic: &str,
2353    ) -> Result<(), AdminError> {
2354        let (tenant, namespace, name) = split_topic(topic)?;
2355        let url = self.url(&[
2356            "persistent",
2357            tenant,
2358            namespace,
2359            name,
2360            "subscriptionDispatchRate",
2361        ])?;
2362        let resp = self.send(self.http.request(Method::DELETE, url)).await?;
2363        empty_ok(resp).await
2364    }
2365
2366    /// Get a topic's cross-cluster replicator dispatch-rate policy (or `null`).
2367    ///
2368    /// `GET /admin/v2/persistent/{tenant}/{ns}/{topic}/replicatorDispatchRate`.
2369    /// Reuses the [`DispatchRate`] body shape — the policy throttles
2370    /// outbound geo-replication traffic from this cluster.
2371    /// Java: `PersistentTopicsBase#getReplicatorDispatchRate`.
2372    pub async fn topic_get_replicator_dispatch_rate(
2373        &self,
2374        topic: &str,
2375    ) -> Result<Option<DispatchRate>, AdminError> {
2376        let (tenant, namespace, name) = split_topic(topic)?;
2377        let url = self.url(&[
2378            "persistent",
2379            tenant,
2380            namespace,
2381            name,
2382            "replicatorDispatchRate",
2383        ])?;
2384        let resp = self.send(self.http.request(Method::GET, url)).await?;
2385        json_ok_optional(resp).await
2386    }
2387
2388    /// Set a topic's cross-cluster replicator dispatch-rate policy.
2389    ///
2390    /// `POST /admin/v2/persistent/{tenant}/{ns}/{topic}/replicatorDispatchRate`
2391    /// with a JSON `DispatchRate` body.
2392    /// Java: `PersistentTopicsBase#setReplicatorDispatchRate`.
2393    pub async fn topic_set_replicator_dispatch_rate(
2394        &self,
2395        topic: &str,
2396        rate: DispatchRate,
2397    ) -> Result<(), AdminError> {
2398        let (tenant, namespace, name) = split_topic(topic)?;
2399        let url = self.url(&[
2400            "persistent",
2401            tenant,
2402            namespace,
2403            name,
2404            "replicatorDispatchRate",
2405        ])?;
2406        let resp = self
2407            .send(self.http.request(Method::POST, url).json(&rate))
2408            .await?;
2409        empty_ok(resp).await
2410    }
2411
2412    /// Remove a topic's cross-cluster replicator dispatch-rate policy.
2413    ///
2414    /// `DELETE /admin/v2/persistent/{tenant}/{ns}/{topic}/replicatorDispatchRate`.
2415    /// Java: `PersistentTopicsBase#removeReplicatorDispatchRate`.
2416    pub async fn topic_remove_replicator_dispatch_rate(
2417        &self,
2418        topic: &str,
2419    ) -> Result<(), AdminError> {
2420        let (tenant, namespace, name) = split_topic(topic)?;
2421        let url = self.url(&[
2422            "persistent",
2423            tenant,
2424            namespace,
2425            name,
2426            "replicatorDispatchRate",
2427        ])?;
2428        let resp = self.send(self.http.request(Method::DELETE, url)).await?;
2429        empty_ok(resp).await
2430    }
2431
2432    /// Get a topic's publish-rate policy (or `null` if no override).
2433    ///
2434    /// `GET /admin/v2/persistent/{tenant}/{ns}/{topic}/publishRate`. Returns
2435    /// the per-topic [`PublishRate`] producer-side throttle (msg/sec +
2436    /// byte/sec). `-1` on either dimension means unlimited.
2437    /// Java: `PersistentTopicsBase#getPublishRate`.
2438    pub async fn topic_get_publish_rate(
2439        &self,
2440        topic: &str,
2441    ) -> Result<Option<PublishRate>, AdminError> {
2442        let (tenant, namespace, name) = split_topic(topic)?;
2443        let url = self.url(&["persistent", tenant, namespace, name, "publishRate"])?;
2444        let resp = self.send(self.http.request(Method::GET, url)).await?;
2445        json_ok_optional(resp).await
2446    }
2447
2448    /// Set a topic's publish-rate policy (overrides namespace default).
2449    ///
2450    /// `POST /admin/v2/persistent/{tenant}/{ns}/{topic}/publishRate` with a
2451    /// JSON `PublishRate` body. Java: `PersistentTopicsBase#setPublishRate`.
2452    pub async fn topic_set_publish_rate(
2453        &self,
2454        topic: &str,
2455        rate: PublishRate,
2456    ) -> Result<(), AdminError> {
2457        let (tenant, namespace, name) = split_topic(topic)?;
2458        let url = self.url(&["persistent", tenant, namespace, name, "publishRate"])?;
2459        let resp = self
2460            .send(self.http.request(Method::POST, url).json(&rate))
2461            .await?;
2462        empty_ok(resp).await
2463    }
2464
2465    /// Remove a topic's publish-rate policy.
2466    ///
2467    /// `DELETE /admin/v2/persistent/{tenant}/{ns}/{topic}/publishRate`.
2468    /// Java: `PersistentTopicsBase#removePublishRate`.
2469    pub async fn topic_remove_publish_rate(&self, topic: &str) -> Result<(), AdminError> {
2470        let (tenant, namespace, name) = split_topic(topic)?;
2471        let url = self.url(&["persistent", tenant, namespace, name, "publishRate"])?;
2472        let resp = self.send(self.http.request(Method::DELETE, url)).await?;
2473        empty_ok(resp).await
2474    }
2475
2476    /// Get a topic's max-producers cap (or `null` if no override).
2477    ///
2478    /// `GET /admin/v2/persistent/{tenant}/{ns}/{topic}/maxProducers`. Returns
2479    /// a bare integer when the override is set, `null` (decoded as
2480    /// `Option::None`) when no topic-level cap is in place.
2481    /// Java: `PersistentTopicsBase#getMaxProducers`.
2482    pub async fn topic_get_max_producers(&self, topic: &str) -> Result<Option<i32>, AdminError> {
2483        let (tenant, namespace, name) = split_topic(topic)?;
2484        let url = self.url(&["persistent", tenant, namespace, name, "maxProducers"])?;
2485        let resp = self.send(self.http.request(Method::GET, url)).await?;
2486        json_ok_optional(resp).await
2487    }
2488
2489    /// Set a topic's max-producers cap.
2490    ///
2491    /// `POST /admin/v2/persistent/{tenant}/{ns}/{topic}/maxProducers` with
2492    /// a bare integer body. `0` disables (broker treats as unlimited).
2493    /// Java: `PersistentTopicsBase#setMaxProducers`.
2494    pub async fn topic_set_max_producers(
2495        &self,
2496        topic: &str,
2497        max_producers: i32,
2498    ) -> Result<(), AdminError> {
2499        let (tenant, namespace, name) = split_topic(topic)?;
2500        let url = self.url(&["persistent", tenant, namespace, name, "maxProducers"])?;
2501        let resp = self
2502            .send(self.http.request(Method::POST, url).json(&max_producers))
2503            .await?;
2504        empty_ok(resp).await
2505    }
2506
2507    /// Remove a topic's max-producers cap (fall back to namespace / broker default).
2508    ///
2509    /// `DELETE /admin/v2/persistent/{tenant}/{ns}/{topic}/maxProducers`.
2510    /// Java: `PersistentTopicsBase#removeMaxProducers`.
2511    pub async fn topic_remove_max_producers(&self, topic: &str) -> Result<(), AdminError> {
2512        let (tenant, namespace, name) = split_topic(topic)?;
2513        let url = self.url(&["persistent", tenant, namespace, name, "maxProducers"])?;
2514        let resp = self.send(self.http.request(Method::DELETE, url)).await?;
2515        empty_ok(resp).await
2516    }
2517
2518    /// Get a topic's max-consumers cap (or `null` if no override).
2519    ///
2520    /// `GET /admin/v2/persistent/{tenant}/{ns}/{topic}/maxConsumers`. Returns
2521    /// a bare integer when the override is set, `null` (decoded as
2522    /// `Option::None`) when no topic-level cap is in place.
2523    /// Java: `PersistentTopicsBase#getMaxConsumers`.
2524    pub async fn topic_get_max_consumers(&self, topic: &str) -> Result<Option<i32>, AdminError> {
2525        let (tenant, namespace, name) = split_topic(topic)?;
2526        let url = self.url(&["persistent", tenant, namespace, name, "maxConsumers"])?;
2527        let resp = self.send(self.http.request(Method::GET, url)).await?;
2528        json_ok_optional(resp).await
2529    }
2530
2531    /// Set a topic's max-consumers cap.
2532    ///
2533    /// `POST /admin/v2/persistent/{tenant}/{ns}/{topic}/maxConsumers` with
2534    /// a bare integer body. `0` disables (broker treats as unlimited).
2535    /// Java: `PersistentTopicsBase#setMaxConsumers`.
2536    pub async fn topic_set_max_consumers(
2537        &self,
2538        topic: &str,
2539        max_consumers: i32,
2540    ) -> Result<(), AdminError> {
2541        let (tenant, namespace, name) = split_topic(topic)?;
2542        let url = self.url(&["persistent", tenant, namespace, name, "maxConsumers"])?;
2543        let resp = self
2544            .send(self.http.request(Method::POST, url).json(&max_consumers))
2545            .await?;
2546        empty_ok(resp).await
2547    }
2548
2549    /// Remove a topic's max-consumers cap (fall back to namespace / broker default).
2550    ///
2551    /// `DELETE /admin/v2/persistent/{tenant}/{ns}/{topic}/maxConsumers`.
2552    /// Java: `PersistentTopicsBase#removeMaxConsumers`.
2553    pub async fn topic_remove_max_consumers(&self, topic: &str) -> Result<(), AdminError> {
2554        let (tenant, namespace, name) = split_topic(topic)?;
2555        let url = self.url(&["persistent", tenant, namespace, name, "maxConsumers"])?;
2556        let resp = self.send(self.http.request(Method::DELETE, url)).await?;
2557        empty_ok(resp).await
2558    }
2559
2560    // --- Subscriptions ---------------------------------------------------
2561
2562    /// List subscription names on a topic.
2563    ///
2564    /// `GET /admin/v2/persistent/{tenant}/{namespace}/{topic}/subscriptions`.
2565    /// Java: `PersistentTopics#getSubscriptions`.
2566    pub async fn subscriptions_list(&self, topic: &str) -> Result<Vec<String>, AdminError> {
2567        let (tenant, namespace, name) = split_topic(topic)?;
2568        let url = self.url(&["persistent", tenant, namespace, name, "subscriptions"])?;
2569        let resp = self.send(self.http.request(Method::GET, url)).await?;
2570        json_ok(resp).await
2571    }
2572
2573    /// Reset a subscription's cursor to a specific message-id position.
2574    ///
2575    /// `POST /admin/v2/persistent/{tenant}/{namespace}/{topic}/subscription/{sub}/resetcursor`
2576    /// with body `{ledgerId, entryId, partitionIndex, batchIndex, isExcluded}`.
2577    /// `is_excluded = true` skips the message at `message_id` itself; `false`
2578    /// leaves it eligible for redelivery. Java: `PersistentTopics#resetCursorOnPosition`.
2579    pub async fn subscription_reset_cursor_to_position(
2580        &self,
2581        topic: &str,
2582        subscription: &str,
2583        message_id: MessageId,
2584        is_excluded: bool,
2585    ) -> Result<(), AdminError> {
2586        let (tenant, namespace, name) = split_topic(topic)?;
2587        validate_segment(subscription)?;
2588        let url = self.url(&[
2589            "persistent",
2590            tenant,
2591            namespace,
2592            name,
2593            "subscription",
2594            subscription,
2595            "resetcursor",
2596        ])?;
2597        let body = ResetCursorData {
2598            ledger_id: message_id_field_for_wire(message_id.ledger_id),
2599            entry_id: message_id_field_for_wire(message_id.entry_id),
2600            partition_index: message_id.partition,
2601            batch_index: message_id.batch_index,
2602            is_excluded,
2603        };
2604        let resp = self
2605            .send(self.http.request(Method::POST, url).json(&body))
2606            .await?;
2607        empty_ok(resp).await
2608    }
2609
2610    /// Reset a subscription's cursor to a wall-clock timestamp (millis since epoch).
2611    ///
2612    /// `POST /admin/v2/persistent/{tenant}/{namespace}/{topic}/subscription/{sub}/resetcursor/
2613    /// {timestamp}`. Java: `PersistentTopics#resetCursor(topic, sub, timestamp)`.
2614    pub async fn subscription_reset_cursor_to_timestamp(
2615        &self,
2616        topic: &str,
2617        subscription: &str,
2618        timestamp_millis: u64,
2619    ) -> Result<(), AdminError> {
2620        let (tenant, namespace, name) = split_topic(topic)?;
2621        validate_segment(subscription)?;
2622        let timestamp = timestamp_millis.to_string();
2623        let url = self.url(&[
2624            "persistent",
2625            tenant,
2626            namespace,
2627            name,
2628            "subscription",
2629            subscription,
2630            "resetcursor",
2631            &timestamp,
2632        ])?;
2633        let resp = self.send(self.http.request(Method::POST, url)).await?;
2634        empty_ok(resp).await
2635    }
2636
2637    /// Advance a subscription's cursor past N undelivered messages.
2638    ///
2639    /// `POST /admin/v2/persistent/{tenant}/{namespace}/{topic}/subscription/{sub}/skip/
2640    /// {numMessages}`. Java: `PersistentTopics#skipMessages`.
2641    pub async fn subscription_skip_messages(
2642        &self,
2643        topic: &str,
2644        subscription: &str,
2645        num_messages: u64,
2646    ) -> Result<(), AdminError> {
2647        let (tenant, namespace, name) = split_topic(topic)?;
2648        validate_segment(subscription)?;
2649        let n = num_messages.to_string();
2650        let url = self.url(&[
2651            "persistent",
2652            tenant,
2653            namespace,
2654            name,
2655            "subscription",
2656            subscription,
2657            "skip",
2658            &n,
2659        ])?;
2660        let resp = self.send(self.http.request(Method::POST, url)).await?;
2661        empty_ok(resp).await
2662    }
2663
2664    /// Drain the entire backlog of a subscription (clear-backlog).
2665    ///
2666    /// `POST /admin/v2/persistent/{tenant}/{namespace}/{topic}/subscription/{sub}/skip_all`.
2667    /// Java: `PersistentTopics#skipAllMessages`.
2668    pub async fn subscription_skip_all_messages(
2669        &self,
2670        topic: &str,
2671        subscription: &str,
2672    ) -> Result<(), AdminError> {
2673        let (tenant, namespace, name) = split_topic(topic)?;
2674        validate_segment(subscription)?;
2675        let url = self.url(&[
2676            "persistent",
2677            tenant,
2678            namespace,
2679            name,
2680            "subscription",
2681            subscription,
2682            "skip_all",
2683        ])?;
2684        let resp = self.send(self.http.request(Method::POST, url)).await?;
2685        empty_ok(resp).await
2686    }
2687
2688    /// Expire all messages older than `expire_time_seconds` for a subscription.
2689    ///
2690    /// `POST /admin/v2/persistent/{tenant}/{namespace}/{topic}/subscription/{sub}/expireMessages/
2691    /// {seconds}`. Java: `PersistentTopics#expireMessagesForSubscription`.
2692    pub async fn subscription_expire_messages(
2693        &self,
2694        topic: &str,
2695        subscription: &str,
2696        expire_time_seconds: u64,
2697    ) -> Result<(), AdminError> {
2698        let (tenant, namespace, name) = split_topic(topic)?;
2699        validate_segment(subscription)?;
2700        let s = expire_time_seconds.to_string();
2701        let url = self.url(&[
2702            "persistent",
2703            tenant,
2704            namespace,
2705            name,
2706            "subscription",
2707            subscription,
2708            "expireMessages",
2709            &s,
2710        ])?;
2711        let resp = self.send(self.http.request(Method::POST, url)).await?;
2712        empty_ok(resp).await
2713    }
2714
2715    /// Delete (unsubscribe) a subscription.
2716    ///
2717    /// `DELETE /admin/v2/persistent/{tenant}/{namespace}/{topic}/subscription/{sub}?force={force}`.
2718    /// `force = true` disconnects active consumers before deletion. Java:
2719    /// `PersistentTopics#deleteSubscription`.
2720    pub async fn subscription_delete(
2721        &self,
2722        topic: &str,
2723        subscription: &str,
2724        force: bool,
2725    ) -> Result<(), AdminError> {
2726        let (tenant, namespace, name) = split_topic(topic)?;
2727        validate_segment(subscription)?;
2728        let mut url = self.url(&[
2729            "persistent",
2730            tenant,
2731            namespace,
2732            name,
2733            "subscription",
2734            subscription,
2735        ])?;
2736        if force {
2737            url.query_pairs_mut().append_pair("force", "true");
2738        }
2739        let resp = self.send(self.http.request(Method::DELETE, url)).await?;
2740        empty_ok(resp).await
2741    }
2742
2743    // --- Shadow topics (PIP-180 / ADR-0033) ------------------------------
2744
2745    /// Create a shadow topic ([PIP-180](https://github.com/apache/pulsar/blob/master/pip/pip-180.md)).
2746    ///
2747    /// Creates the shadow as a regular non-partitioned topic with the broker-reserved
2748    /// `PULSAR.SHADOW_SOURCE` topic property pointing at the source topic:
2749    /// `PUT /admin/v2/persistent/{tenant}/{namespace}/{shadow}` with body
2750    /// `{ "PULSAR.SHADOW_SOURCE": "persistent://tenant/ns/source" }`.
2751    /// Then links the created shadow in the source topic policy list via
2752    /// `PUT /admin/v2/persistent/{tenant}/{namespace}/{source}/shadowTopics`.
2753    ///
2754    /// Java:
2755    /// `pulsar-client-admin/.../TopicsImpl.java#createShadowTopicAsync` builds the same property
2756    /// map before calling `createNonPartitionedTopicAsync` for non-partitioned sources.
2757    ///
2758    /// Errors mirror the existing `AdminError` taxonomy: 404 → `Status { code:
2759    /// 404, .. }` (the source topic does not exist), 409 → `Status { code:
2760    /// 409, .. }` (the shadow topic already exists on this source),
2761    /// 401/403 → `Status { code: 401|403, .. }` (auth).
2762    pub async fn create_shadow_topic(&self, source: &str, shadow: &str) -> Result<(), AdminError> {
2763        let (source_tenant, source_namespace, source_name) = split_topic(source)?;
2764        // Validate the shadow name eagerly so a misformatted argument errors
2765        // out with `InvalidName` rather than as a broker 4xx after we've
2766        // already crossed the wire.
2767        let (shadow_tenant, shadow_namespace, shadow_name) = split_topic(shadow)?;
2768        let source = format!("persistent://{source_tenant}/{source_namespace}/{source_name}");
2769        let shadow = format!("persistent://{shadow_tenant}/{shadow_namespace}/{shadow_name}");
2770        let mut properties = HashMap::new();
2771        properties.insert("PULSAR.SHADOW_SOURCE".to_owned(), source.clone());
2772        self.topic_create_non_partitioned_with_properties(&shadow, &properties)
2773            .await?;
2774        self.set_shadow_topics(&source, &[shadow]).await
2775    }
2776
2777    async fn set_shadow_topics(&self, source: &str, shadows: &[String]) -> Result<(), AdminError> {
2778        let (tenant, namespace, name) = split_topic(source)?;
2779        let url = self.url(&["persistent", tenant, namespace, name, "shadowTopics"])?;
2780        let resp = self
2781            .send(self.http.request(Method::PUT, url).json(shadows))
2782            .await?;
2783        empty_ok(resp).await
2784    }
2785
2786    /// Delete a shadow topic (PIP-180).
2787    ///
2788    /// `DELETE /admin/v2/persistent/{tenant}/{namespace}/{topic}` where
2789    /// `{topic}` is the **shadow** topic name. PIP-180's deletion contract
2790    /// goes through the regular topic-delete path on the shadow itself —
2791    /// the broker recognises the topic as a shadow and detaches it from
2792    /// the source ledger atomically with the metadata delete.
2793    ///
2794    /// `force` controls whether active subscribers are kicked off before
2795    /// the delete (`?force=true`) or whether the broker rejects the
2796    /// request when subscribers exist (`?force=false`, the default).
2797    ///
2798    /// Java: `org.apache.pulsar.client.admin.Topics#deleteShadowTopic` calls
2799    /// the same `@DELETE @Path("/{tenant}/{namespace}/{topic}")` endpoint.
2800    pub async fn delete_shadow_topic(&self, shadow: &str, force: bool) -> Result<(), AdminError> {
2801        let (tenant, namespace, name) = split_topic(shadow)?;
2802        let mut url = self.url(&["persistent", tenant, namespace, name])?;
2803        url.query_pairs_mut()
2804            .append_pair("force", if force { "true" } else { "false" });
2805        let resp = self.send(self.http.request(Method::DELETE, url)).await?;
2806        empty_ok(resp).await
2807    }
2808
2809    /// List the shadow topics created on a source topic (PIP-180).
2810    ///
2811    /// `GET /admin/v2/persistent/{tenant}/{namespace}/{topic}/shadowTopics`
2812    /// where `{topic}` is the **source** topic name. The broker returns a
2813    /// JSON array of fully-qualified shadow topic names.
2814    ///
2815    /// Java:
2816    /// `pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java`
2817    /// (`@GET @Path("/{tenant}/{namespace}/{topic}/shadowTopics")`).
2818    ///
2819    /// Used by the runtime engine at consumer subscribe time: when the user
2820    /// subscribes to a topic the runtime cannot yet classify, a single
2821    /// `get_shadow_topics` lookup on every other topic in the namespace is
2822    /// expensive; instead the runtime calls `get_shadow_topics(subscribed)`
2823    /// on the topic itself — a non-shadow topic returns an empty array, a
2824    /// shadow topic surfaces nothing but the broker has already populated
2825    /// the consumer's `shadow_metadata` via the topic's policy.
2826    /// (See `crates/magnetar-runtime-tokio/src/client.rs::subscribe`.)
2827    pub async fn get_shadow_topics(&self, source: &str) -> Result<Vec<String>, AdminError> {
2828        let (tenant, namespace, name) = split_topic(source)?;
2829        let url = self.url(&["persistent", tenant, namespace, name, "shadowTopics"])?;
2830        let resp = self.send(self.http.request(Method::GET, url)).await?;
2831        json_ok_or_default(resp).await
2832    }
2833
2834    /// Resolve the **source** topic of a shadow topic (PIP-180).
2835    ///
2836    /// `GET /admin/v2/persistent/{tenant}/{namespace}/{topic}/properties`.
2837    /// Returns the `PULSAR.SHADOW_SOURCE` property when the queried topic is a
2838    /// shadow; returns `None` when it is a regular topic. Used by the runtime
2839    /// at subscribe time to populate
2840    /// [`magnetar_proto::ShadowTopicMetadata::source_topic`] on the new
2841    /// consumer (so the receive path can emit
2842    /// [`magnetar_proto::ConnectionEvent::MessageReceivedFromShadow`]
2843    /// without an out-of-band lookup per message).
2844    ///
2845    /// Java: `org.apache.pulsar.client.admin.Topics#getShadowSource` —
2846    /// `TopicsImpl#getShadowSourceAsync` delegates to `getPropertiesAsync`.
2847    pub async fn get_shadow_source(&self, shadow: &str) -> Result<Option<String>, AdminError> {
2848        let (tenant, namespace, name) = split_topic(shadow)?;
2849        let url = self.url(&["persistent", tenant, namespace, name, "properties"])?;
2850        let resp = self.send(self.http.request(Method::GET, url)).await?;
2851        let mut properties: HashMap<String, String> = json_ok_or_default(resp).await?;
2852        Ok(properties
2853            .remove("PULSAR.SHADOW_SOURCE")
2854            .filter(|source| !source.is_empty()))
2855    }
2856
2857    // --- Pulsar IO Sources (/admin/v3/sources/...) ----------------------
2858
2859    /// List sources configured under a namespace.
2860    ///
2861    /// `GET /admin/v3/sources/{tenant}/{namespace}`. Returns the list of
2862    /// source names (the broker emits a JSON array of strings — one
2863    /// entry per declared source, regardless of running state).
2864    /// Java:
2865    /// `pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Sources.java`
2866    /// (`@Path("/sources")`) +
2867    /// `pulsar-broker/.../admin/impl/SourcesBase.java#listSources`.
2868    pub async fn sources_list_by_namespace(
2869        &self,
2870        tenant: &str,
2871        namespace: &str,
2872    ) -> Result<Vec<String>, AdminError> {
2873        validate_segment(tenant)?;
2874        validate_segment(namespace)?;
2875        let url = self.url_v3(&["sources", tenant, namespace])?;
2876        let resp = self.send(self.http.request(Method::GET, url)).await?;
2877        json_ok(resp).await
2878    }
2879
2880    /// Get one source's configuration.
2881    ///
2882    /// `GET /admin/v3/sources/{tenant}/{namespace}/{name}`. Returns the
2883    /// stored `SourceConfig` envelope as raw JSON — minor broker
2884    /// versions extend the shape (new connector knobs, secret refs)
2885    /// faster than a typed Rust DTO can keep up.
2886    /// Java: `SourcesBase#getSourceInfo`.
2887    pub async fn source_get(
2888        &self,
2889        tenant: &str,
2890        namespace: &str,
2891        name: &str,
2892    ) -> Result<serde_json::Value, AdminError> {
2893        validate_segment(tenant)?;
2894        validate_segment(namespace)?;
2895        validate_segment(name)?;
2896        let url = self.url_v3(&["sources", tenant, namespace, name])?;
2897        let resp = self.send(self.http.request(Method::GET, url)).await?;
2898        json_ok(resp).await
2899    }
2900
2901    /// Get a source's running status (per-instance worker telemetry).
2902    ///
2903    /// `GET /admin/v3/sources/{tenant}/{namespace}/{name}/status`.
2904    /// Returns the broker's `SourceStatus` envelope (`numInstances`,
2905    /// `numRunning`, per-instance `workerId` + `running` + last
2906    /// received timestamp). Exposed as raw JSON for forward-compat.
2907    /// Java: `SourcesBase#getSourceStatus`.
2908    pub async fn source_status(
2909        &self,
2910        tenant: &str,
2911        namespace: &str,
2912        name: &str,
2913    ) -> Result<serde_json::Value, AdminError> {
2914        validate_segment(tenant)?;
2915        validate_segment(namespace)?;
2916        validate_segment(name)?;
2917        let url = self.url_v3(&["sources", tenant, namespace, name, "status"])?;
2918        let resp = self.send(self.http.request(Method::GET, url)).await?;
2919        json_ok(resp).await
2920    }
2921
2922    /// Register a source from a remote package URL.
2923    ///
2924    /// `POST /admin/v3/sources/{tenant}/{namespace}/{name}` with
2925    /// `multipart/form-data` carrying two parts: a `url` text part (the
2926    /// package URL — `http(s)://`, `file://`, or `function://` per the
2927    /// broker's `WorkerUtils#downloadFileFromPackageUrl`) and a
2928    /// `sourceConfig` JSON part with the [`SourceConfig`] body.
2929    /// A sibling `source_create` (binary upload) is intentionally not
2930    /// yet exposed — this URL-based variant covers every
2931    /// CI/operator scenario that does not need to ship a JAR through
2932    /// the admin client itself.
2933    /// Java: `SourcesBase#registerSource`.
2934    pub async fn source_create_with_url(
2935        &self,
2936        tenant: &str,
2937        namespace: &str,
2938        name: &str,
2939        url: &str,
2940        config: SourceConfig,
2941    ) -> Result<(), AdminError> {
2942        validate_segment(tenant)?;
2943        validate_segment(namespace)?;
2944        validate_segment(name)?;
2945        let endpoint = self.url_v3(&["sources", tenant, namespace, name])?;
2946        let form = build_url_config_multipart(url, "sourceConfig", &config)?;
2947        let resp = self
2948            .send(
2949                self.http
2950                    .request(Method::POST, endpoint)
2951                    .multipart(form)
2952                    .timeout(PACKAGE_REGISTER_TIMEOUT),
2953            )
2954            .await?;
2955        empty_ok(resp).await
2956    }
2957
2958    /// Update a source from a remote package URL.
2959    ///
2960    /// `PUT /admin/v3/sources/{tenant}/{namespace}/{name}` with the
2961    /// same multipart shape as [`Self::source_create_with_url`].
2962    /// Java: `SourcesBase#updateSource`.
2963    pub async fn source_update_with_url(
2964        &self,
2965        tenant: &str,
2966        namespace: &str,
2967        name: &str,
2968        url: &str,
2969        config: SourceConfig,
2970    ) -> Result<(), AdminError> {
2971        validate_segment(tenant)?;
2972        validate_segment(namespace)?;
2973        validate_segment(name)?;
2974        let endpoint = self.url_v3(&["sources", tenant, namespace, name])?;
2975        let form = build_url_config_multipart(url, "sourceConfig", &config)?;
2976        let resp = self
2977            .send(
2978                self.http
2979                    .request(Method::PUT, endpoint)
2980                    .multipart(form)
2981                    .timeout(PACKAGE_REGISTER_TIMEOUT),
2982            )
2983            .await?;
2984        empty_ok(resp).await
2985    }
2986
2987    /// Delete a source.
2988    ///
2989    /// `DELETE /admin/v3/sources/{tenant}/{namespace}/{name}`. Removes
2990    /// the source declaration and tears the running instances down.
2991    /// Java: `SourcesBase#deregisterSource`.
2992    pub async fn source_delete(
2993        &self,
2994        tenant: &str,
2995        namespace: &str,
2996        name: &str,
2997    ) -> Result<(), AdminError> {
2998        validate_segment(tenant)?;
2999        validate_segment(namespace)?;
3000        validate_segment(name)?;
3001        let url = self.url_v3(&["sources", tenant, namespace, name])?;
3002        let resp = self.send(self.http.request(Method::DELETE, url)).await?;
3003        empty_ok(resp).await
3004    }
3005
3006    /// Start every instance of a source.
3007    ///
3008    /// `POST /admin/v3/sources/{tenant}/{namespace}/{name}/start`.
3009    /// Java: `SourcesBase#startSource`.
3010    pub async fn source_start(
3011        &self,
3012        tenant: &str,
3013        namespace: &str,
3014        name: &str,
3015    ) -> Result<(), AdminError> {
3016        validate_segment(tenant)?;
3017        validate_segment(namespace)?;
3018        validate_segment(name)?;
3019        let url = self.url_v3(&["sources", tenant, namespace, name, "start"])?;
3020        let resp = self.send(self.http.request(Method::POST, url)).await?;
3021        empty_ok(resp).await
3022    }
3023
3024    /// Stop every instance of a source.
3025    ///
3026    /// `POST /admin/v3/sources/{tenant}/{namespace}/{name}/stop`.
3027    /// Java: `SourcesBase#stopSource`.
3028    pub async fn source_stop(
3029        &self,
3030        tenant: &str,
3031        namespace: &str,
3032        name: &str,
3033    ) -> Result<(), AdminError> {
3034        validate_segment(tenant)?;
3035        validate_segment(namespace)?;
3036        validate_segment(name)?;
3037        let url = self.url_v3(&["sources", tenant, namespace, name, "stop"])?;
3038        let resp = self.send(self.http.request(Method::POST, url)).await?;
3039        empty_ok(resp).await
3040    }
3041
3042    /// Restart every instance of a source.
3043    ///
3044    /// `POST /admin/v3/sources/{tenant}/{namespace}/{name}/restart`.
3045    /// Java: `SourcesBase#restartSource`.
3046    pub async fn source_restart(
3047        &self,
3048        tenant: &str,
3049        namespace: &str,
3050        name: &str,
3051    ) -> Result<(), AdminError> {
3052        validate_segment(tenant)?;
3053        validate_segment(namespace)?;
3054        validate_segment(name)?;
3055        let url = self.url_v3(&["sources", tenant, namespace, name, "restart"])?;
3056        let resp = self.send(self.http.request(Method::POST, url)).await?;
3057        empty_ok(resp).await
3058    }
3059
3060    // --- Pulsar IO Sinks (/admin/v3/sinks/...) --------------------------
3061
3062    /// List sinks configured under a namespace.
3063    ///
3064    /// `GET /admin/v3/sinks/{tenant}/{namespace}`. Returns the list of
3065    /// sink names. Mirrors [`Self::sources_list_by_namespace`] —
3066    /// Pulsar's Sources / Sinks REST surfaces are intentionally
3067    /// symmetric.
3068    /// Java:
3069    /// `pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Sinks.java`
3070    /// (`@Path("/sinks")`) +
3071    /// `pulsar-broker/.../admin/impl/SinksBase.java#listSinks`.
3072    pub async fn sinks_list_by_namespace(
3073        &self,
3074        tenant: &str,
3075        namespace: &str,
3076    ) -> Result<Vec<String>, AdminError> {
3077        validate_segment(tenant)?;
3078        validate_segment(namespace)?;
3079        let url = self.url_v3(&["sinks", tenant, namespace])?;
3080        let resp = self.send(self.http.request(Method::GET, url)).await?;
3081        json_ok(resp).await
3082    }
3083
3084    /// Get one sink's configuration.
3085    ///
3086    /// `GET /admin/v3/sinks/{tenant}/{namespace}/{name}`. Returns the
3087    /// stored `SinkConfig` as raw JSON for the same forward-compat
3088    /// reason as [`Self::source_get`]. Java: `SinksBase#getSinkInfo`.
3089    pub async fn sink_get(
3090        &self,
3091        tenant: &str,
3092        namespace: &str,
3093        name: &str,
3094    ) -> Result<serde_json::Value, AdminError> {
3095        validate_segment(tenant)?;
3096        validate_segment(namespace)?;
3097        validate_segment(name)?;
3098        let url = self.url_v3(&["sinks", tenant, namespace, name])?;
3099        let resp = self.send(self.http.request(Method::GET, url)).await?;
3100        json_ok(resp).await
3101    }
3102
3103    /// Get a sink's running status (per-instance worker telemetry).
3104    ///
3105    /// `GET /admin/v3/sinks/{tenant}/{namespace}/{name}/status`.
3106    /// Same envelope shape as the Sources status. Raw JSON.
3107    /// Java: `SinksBase#getSinkStatus`.
3108    pub async fn sink_status(
3109        &self,
3110        tenant: &str,
3111        namespace: &str,
3112        name: &str,
3113    ) -> Result<serde_json::Value, AdminError> {
3114        validate_segment(tenant)?;
3115        validate_segment(namespace)?;
3116        validate_segment(name)?;
3117        let url = self.url_v3(&["sinks", tenant, namespace, name, "status"])?;
3118        let resp = self.send(self.http.request(Method::GET, url)).await?;
3119        json_ok(resp).await
3120    }
3121
3122    /// Register a sink from a remote package URL.
3123    ///
3124    /// `POST /admin/v3/sinks/{tenant}/{namespace}/{name}` with
3125    /// `multipart/form-data` carrying a `url` text part and a
3126    /// `sinkConfig` JSON part. Mirrors
3127    /// [`Self::source_create_with_url`]; the only wire-level
3128    /// difference is the JSON-part field name (`sinkConfig` vs
3129    /// `sourceConfig`). Java: `SinksBase#registerSink`.
3130    pub async fn sink_create_with_url(
3131        &self,
3132        tenant: &str,
3133        namespace: &str,
3134        name: &str,
3135        url: &str,
3136        config: SinkConfig,
3137    ) -> Result<(), AdminError> {
3138        validate_segment(tenant)?;
3139        validate_segment(namespace)?;
3140        validate_segment(name)?;
3141        let endpoint = self.url_v3(&["sinks", tenant, namespace, name])?;
3142        let form = build_url_config_multipart(url, "sinkConfig", &config)?;
3143        let resp = self
3144            .send(
3145                self.http
3146                    .request(Method::POST, endpoint)
3147                    .multipart(form)
3148                    .timeout(PACKAGE_REGISTER_TIMEOUT),
3149            )
3150            .await?;
3151        empty_ok(resp).await
3152    }
3153
3154    /// Update a sink from a remote package URL.
3155    ///
3156    /// `PUT /admin/v3/sinks/{tenant}/{namespace}/{name}` with the same
3157    /// multipart shape as [`Self::sink_create_with_url`].
3158    /// Java: `SinksBase#updateSink`.
3159    pub async fn sink_update_with_url(
3160        &self,
3161        tenant: &str,
3162        namespace: &str,
3163        name: &str,
3164        url: &str,
3165        config: SinkConfig,
3166    ) -> Result<(), AdminError> {
3167        validate_segment(tenant)?;
3168        validate_segment(namespace)?;
3169        validate_segment(name)?;
3170        let endpoint = self.url_v3(&["sinks", tenant, namespace, name])?;
3171        let form = build_url_config_multipart(url, "sinkConfig", &config)?;
3172        let resp = self
3173            .send(
3174                self.http
3175                    .request(Method::PUT, endpoint)
3176                    .multipart(form)
3177                    .timeout(PACKAGE_REGISTER_TIMEOUT),
3178            )
3179            .await?;
3180        empty_ok(resp).await
3181    }
3182
3183    /// Delete a sink.
3184    ///
3185    /// `DELETE /admin/v3/sinks/{tenant}/{namespace}/{name}`.
3186    /// Java: `SinksBase#deregisterSink`.
3187    pub async fn sink_delete(
3188        &self,
3189        tenant: &str,
3190        namespace: &str,
3191        name: &str,
3192    ) -> Result<(), AdminError> {
3193        validate_segment(tenant)?;
3194        validate_segment(namespace)?;
3195        validate_segment(name)?;
3196        let url = self.url_v3(&["sinks", tenant, namespace, name])?;
3197        let resp = self.send(self.http.request(Method::DELETE, url)).await?;
3198        empty_ok(resp).await
3199    }
3200
3201    /// Start every instance of a sink.
3202    ///
3203    /// `POST /admin/v3/sinks/{tenant}/{namespace}/{name}/start`.
3204    /// Java: `SinksBase#startSink`.
3205    pub async fn sink_start(
3206        &self,
3207        tenant: &str,
3208        namespace: &str,
3209        name: &str,
3210    ) -> Result<(), AdminError> {
3211        validate_segment(tenant)?;
3212        validate_segment(namespace)?;
3213        validate_segment(name)?;
3214        let url = self.url_v3(&["sinks", tenant, namespace, name, "start"])?;
3215        let resp = self.send(self.http.request(Method::POST, url)).await?;
3216        empty_ok(resp).await
3217    }
3218
3219    /// Stop every instance of a sink.
3220    ///
3221    /// `POST /admin/v3/sinks/{tenant}/{namespace}/{name}/stop`.
3222    /// Java: `SinksBase#stopSink`.
3223    pub async fn sink_stop(
3224        &self,
3225        tenant: &str,
3226        namespace: &str,
3227        name: &str,
3228    ) -> Result<(), AdminError> {
3229        validate_segment(tenant)?;
3230        validate_segment(namespace)?;
3231        validate_segment(name)?;
3232        let url = self.url_v3(&["sinks", tenant, namespace, name, "stop"])?;
3233        let resp = self.send(self.http.request(Method::POST, url)).await?;
3234        empty_ok(resp).await
3235    }
3236
3237    /// Restart every instance of a sink.
3238    ///
3239    /// `POST /admin/v3/sinks/{tenant}/{namespace}/{name}/restart`.
3240    /// Java: `SinksBase#restartSink`.
3241    pub async fn sink_restart(
3242        &self,
3243        tenant: &str,
3244        namespace: &str,
3245        name: &str,
3246    ) -> Result<(), AdminError> {
3247        validate_segment(tenant)?;
3248        validate_segment(namespace)?;
3249        validate_segment(name)?;
3250        let url = self.url_v3(&["sinks", tenant, namespace, name, "restart"])?;
3251        let resp = self.send(self.http.request(Method::POST, url)).await?;
3252        empty_ok(resp).await
3253    }
3254
3255    // --- Pulsar Packages (/admin/v3/packages/...) -----------------------
3256
3257    /// List package names declared under (`type`, `tenant`, `namespace`).
3258    ///
3259    /// `GET /admin/v3/packages/{type}/{tenant}/{namespace}`. Returns the
3260    /// list of package names — *not* versions; one entry per declared
3261    /// package. Use [`Self::package_versions_list`] to enumerate the
3262    /// versions of one package.
3263    /// Java: `pulsar-broker/src/main/java/org/apache/pulsar/broker/
3264    /// admin/v3/Packages.java#listPackages`.
3265    pub async fn packages_list(
3266        &self,
3267        pkg_type: PackageType,
3268        tenant: &str,
3269        namespace: &str,
3270    ) -> Result<Vec<String>, AdminError> {
3271        validate_segment(tenant)?;
3272        validate_segment(namespace)?;
3273        let url = self.url_v3(&["packages", pkg_type.as_str(), tenant, namespace])?;
3274        let resp = self.send(self.http.request(Method::GET, url)).await?;
3275        json_ok(resp).await
3276    }
3277
3278    /// List the versions declared for one package.
3279    ///
3280    /// `GET /admin/v3/packages/{type}/{tenant}/{namespace}/{name}`.
3281    /// Returns the list of version strings (Pulsar treats versions as
3282    /// opaque strings — `1.0.0`, `latest`, build hashes — and only the
3283    /// metadata endpoints understand them).
3284    /// Java: `PackagesBase#listPackageVersions`.
3285    pub async fn package_versions_list(
3286        &self,
3287        pkg_type: PackageType,
3288        tenant: &str,
3289        namespace: &str,
3290        name: &str,
3291    ) -> Result<Vec<String>, AdminError> {
3292        validate_segment(tenant)?;
3293        validate_segment(namespace)?;
3294        validate_segment(name)?;
3295        let url = self.url_v3(&["packages", pkg_type.as_str(), tenant, namespace, name])?;
3296        let resp = self.send(self.http.request(Method::GET, url)).await?;
3297        json_ok(resp).await
3298    }
3299
3300    /// Get the metadata envelope for one package version.
3301    ///
3302    /// `GET /admin/v3/packages/{type}/{tenant}/{namespace}/{name}/
3303    /// {version}/metadata`. Returns the `PackageMetadata` envelope as
3304    /// raw JSON for forward-compat — broker minor versions extend the
3305    /// shape with `tags`, `documentationUrl`, etc.
3306    /// Java: `PackagesBase#getPackageMetadata`.
3307    pub async fn package_metadata_get(
3308        &self,
3309        pkg_type: PackageType,
3310        tenant: &str,
3311        namespace: &str,
3312        name: &str,
3313        version: &str,
3314    ) -> Result<serde_json::Value, AdminError> {
3315        validate_segment(tenant)?;
3316        validate_segment(namespace)?;
3317        validate_segment(name)?;
3318        validate_segment(version)?;
3319        let url = self.url_v3(&[
3320            "packages",
3321            pkg_type.as_str(),
3322            tenant,
3323            namespace,
3324            name,
3325            version,
3326            "metadata",
3327        ])?;
3328        let resp = self.send(self.http.request(Method::GET, url)).await?;
3329        json_ok(resp).await
3330    }
3331
3332    /// Replace the metadata envelope for one package version.
3333    ///
3334    /// `PUT /admin/v3/packages/{type}/{tenant}/{namespace}/{name}/
3335    /// {version}/metadata` with a [`PackageMetadata`] JSON body.
3336    /// The broker rejects this verb with 404 if the package version
3337    /// does not exist — `package_metadata_set` is *update*, never
3338    /// *create*. Java: `PackagesBase#updatePackageMetadata`.
3339    pub async fn package_metadata_set(
3340        &self,
3341        pkg_type: PackageType,
3342        tenant: &str,
3343        namespace: &str,
3344        name: &str,
3345        version: &str,
3346        metadata: PackageMetadata,
3347    ) -> Result<(), AdminError> {
3348        validate_segment(tenant)?;
3349        validate_segment(namespace)?;
3350        validate_segment(name)?;
3351        validate_segment(version)?;
3352        let url = self.url_v3(&[
3353            "packages",
3354            pkg_type.as_str(),
3355            tenant,
3356            namespace,
3357            name,
3358            version,
3359            "metadata",
3360        ])?;
3361        let resp = self
3362            .send(self.http.request(Method::PUT, url).json(&metadata))
3363            .await?;
3364        empty_ok(resp).await
3365    }
3366
3367    /// Delete one package version.
3368    ///
3369    /// `DELETE /admin/v3/packages/{type}/{tenant}/{namespace}/{name}/
3370    /// {version}`. The broker drops the version's metadata + storage
3371    /// atomically; other versions of the same package are untouched.
3372    /// Java: `PackagesBase#deletePackage`.
3373    pub async fn package_delete(
3374        &self,
3375        pkg_type: PackageType,
3376        tenant: &str,
3377        namespace: &str,
3378        name: &str,
3379        version: &str,
3380    ) -> Result<(), AdminError> {
3381        validate_segment(tenant)?;
3382        validate_segment(namespace)?;
3383        validate_segment(name)?;
3384        validate_segment(version)?;
3385        let url = self.url_v3(&[
3386            "packages",
3387            pkg_type.as_str(),
3388            tenant,
3389            namespace,
3390            name,
3391            version,
3392        ])?;
3393        let resp = self.send(self.http.request(Method::DELETE, url)).await?;
3394        empty_ok(resp).await
3395    }
3396
3397    // --- Internal --------------------------------------------------------
3398
3399    /// Build a request URL by joining `segments` onto `base_url`. Each segment
3400    /// is percent-encoded for the URL path.
3401    fn url(&self, segments: &[&str]) -> Result<Url, AdminError> {
3402        Self::url_for(&self.base_url, segments)
3403    }
3404
3405    /// Build a request URL by joining `segments` onto the `/admin/v3/`
3406    /// base. Used by Pulsar Functions / IO Sources / IO Sinks /
3407    /// Packages, which Pulsar moved off of `/admin/v2/` historically.
3408    fn url_v3(&self, segments: &[&str]) -> Result<Url, AdminError> {
3409        Self::url_for(&self.base_url_v3, segments)
3410    }
3411
3412    /// Shared URL-builder body for the v2 / v3 helpers.
3413    fn url_for(base: &Url, segments: &[&str]) -> Result<Url, AdminError> {
3414        let mut url = base.clone();
3415        {
3416            // `Url::path_segments_mut` only fails for cannot-be-a-base URLs;
3417            // builder already rejected those.
3418            let mut path = url
3419                .path_segments_mut()
3420                .map_err(|()| AdminError::Builder("base url is cannot-be-a-base".into()))?;
3421            // Both `base_url` (anchored at `/admin/v2/`) and `base_url_v3`
3422            // (`/admin/v3/`) carry a trailing slash, which produces a
3423            // sentinel empty trailing segment in `path_segments_mut`. Drop
3424            // it before appending API segments — otherwise pushes land
3425            // after the empty, producing `/admin/v2//persistent/...`. Real
3426            // brokers tolerate the double slash; strict mocks (wiremock)
3427            // do not, and Java's `PulsarAdmin` emits the single-slash
3428            // form.
3429            path.pop_if_empty();
3430            for segment in segments {
3431                path.push(segment);
3432            }
3433        }
3434        Ok(url)
3435    }
3436
3437    /// Apply auth headers and dispatch.
3438    ///
3439    /// Returns an [`ApiResponse`] carrying the response alongside the
3440    /// resolved method + URL, so the decode helpers can attribute a
3441    /// non-JSON body to the exact request that produced it
3442    /// (see [`AdminError::Decode`] / [`AdminError::Status`]).
3443    async fn send(&self, req: RequestBuilder) -> Result<ApiResponse, AdminError> {
3444        let req = match &self.auth {
3445            AdminAuth::None => req,
3446            AdminAuth::Token(tok) => bearer(req, tok)?,
3447            AdminAuth::OAuth2(flow) => {
3448                // Refresh the cached token if it is missing or near expiry,
3449                // then attach it. `ensure_fresh` is a no-op when the cached
3450                // token is still valid, so the steady-state path is a single
3451                // mutex read.
3452                flow.ensure_fresh()
3453                    .await
3454                    .map_err(|err| AdminError::Auth(format!("oauth2 token refresh: {err}")))?;
3455                // A cache miss (None) and an IDP that handed back an empty
3456                // `access_token` (Some(empty)) are both "no usable token" —
3457                // collapse them into one guard.
3458                let token = flow
3459                    .cached_access_token()
3460                    .filter(|t| !t.is_empty())
3461                    .ok_or_else(|| {
3462                        AdminError::Auth("oauth2 returned an empty access token".to_owned())
3463                    })?;
3464                // The access token is base64url JWT text — valid UTF-8 — but
3465                // guard the conversion rather than assume it.
3466                let tok = std::str::from_utf8(&token).map_err(|err| {
3467                    AdminError::Auth(format!("oauth2 access token is not valid utf-8: {err}"))
3468                })?;
3469                bearer(req, tok)?
3470            }
3471        };
3472        // Build the request so the resolved method + URL are captured for
3473        // diagnostics, then execute on the shared client. This is
3474        // behaviorally identical to `req.send()`.
3475        let request = req.build()?;
3476        let method = request.method().clone();
3477        let url = request.url().clone();
3478        let resp = self.http.execute(request).await?;
3479        Ok(ApiResponse { method, url, resp })
3480    }
3481}
3482
3483/// A dispatched response paired with the request's resolved method + URL.
3484///
3485/// Threaded from [`AdminClient::send`] into the decode helpers so a
3486/// decode / status failure can name the exact request that produced it
3487/// (see [`AdminError::Decode`] and [`AdminError::Status`]).
3488struct ApiResponse {
3489    method: Method,
3490    url: Url,
3491    resp: Response,
3492}
3493
3494/// Pulsar Functions configuration — the subset of Java's
3495/// `org.apache.pulsar.common.functions.FunctionConfig` that the URL-based
3496/// `register` / `update` calls actually require. The Java type carries
3497/// ~30 fields (process / k8s runtime tuning, secrets, deadletter
3498/// topics, …); we expose the load-bearing ones an operator running a
3499/// pre-compiled JAR needs. Unknown fields on the wire are tolerated by
3500/// the broker, so adding extra knobs is an additive change later.
3501///
3502/// `tenant` / `namespace` / `name` are duplicated in the body because
3503/// the broker re-validates them against the URL path (Java's
3504/// `WorkerUtils.validateFunctionName`). The CLI fills these in from the
3505/// fully qualified name the operator passes on the command line.
3506///
3507/// `runtime` is the string `"JAVA"`, `"PYTHON"`, or `"GO"` — matches
3508/// Java's `org.apache.pulsar.common.functions.FunctionConfig.Runtime`
3509/// enum serialised by name.
3510#[derive(Debug, Clone, Default, Deserialize, Serialize)]
3511#[serde(default, rename_all = "camelCase")]
3512pub struct FunctionConfig {
3513    /// Owning tenant.
3514    pub tenant: String,
3515    /// Owning namespace (bare name, not `tenant/namespace`).
3516    pub namespace: String,
3517    /// Function name (unique within the namespace).
3518    pub name: String,
3519    /// Fully qualified entry-point class name (`com.acme.MyFunction`
3520    /// for Java; `module.fn` for Python; `main` for Go).
3521    #[serde(rename = "className")]
3522    pub class_name: String,
3523    /// Input topics the function subscribes to.
3524    pub inputs: Vec<String>,
3525    /// Output topic the function produces to. Empty when the function
3526    /// has no output sink.
3527    pub output: String,
3528    /// Runtime — `"JAVA"`, `"PYTHON"`, or `"GO"`.
3529    pub runtime: String,
3530    /// Number of parallel instances the worker schedules.
3531    pub parallelism: i32,
3532    /// Optional opaque user-config map passed to the function's
3533    /// `Context#getUserConfigValue`. JSON object on the wire.
3534    #[serde(rename = "userConfig", skip_serializing_if = "Option::is_none")]
3535    pub user_config: Option<serde_json::Value>,
3536}
3537
3538/// Tenant policy info — admin roles and allowed clusters.
3539///
3540/// Mirrors Java's `org.apache.pulsar.common.policies.data.TenantInfoImpl` —
3541/// the JSON keys (`adminRoles`, `allowedClusters`) match upstream verbatim.
3542#[derive(Debug, Clone, Default, Serialize, Deserialize)]
3543pub struct TenantInfo {
3544    /// Roles permitted to administrate the tenant.
3545    #[serde(rename = "adminRoles")]
3546    pub admin_roles: Vec<String>,
3547    /// Cluster names the tenant may use.
3548    #[serde(rename = "allowedClusters")]
3549    pub allowed_clusters: Vec<String>,
3550}
3551
3552/// Wire shape of the PIP-415 `getMessageIdByIndex` response.
3553///
3554/// Mirrors Java's `MessageIdImpl` JSON shape (Jackson default property-name
3555/// serialisation): `{ledgerId, entryId, partitionIndex}`. See
3556/// `pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java`.
3557///
3558/// Kept as a deserialise-only DTO and converted into
3559/// [`magnetar_proto::MessageId`] at the boundary so callers do not see this
3560/// wire detail. Not exposed publicly.
3561#[derive(Debug, Clone, Deserialize)]
3562#[serde(rename_all = "camelCase")]
3563struct MessageIdResponse {
3564    ledger_id: i64,
3565    entry_id: i64,
3566    #[serde(default = "default_partition_index")]
3567    partition_index: i32,
3568}
3569
3570fn default_partition_index() -> i32 {
3571    -1
3572}
3573
3574impl MessageIdResponse {
3575    /// Convert the REST response into the canonical [`MessageId`]. The broker
3576    /// resolves at entry granularity, so `batch_index` / `batch_size` are not
3577    /// part of the JSON — they default to `-1` (the same sentinel
3578    /// `MessageId::from_pb` uses for `MessageIdData` without batch fields).
3579    ///
3580    /// Returns `AdminError::Protocol` if the broker emits a negative
3581    /// `ledgerId` or `entryId` — both fields are `u64` in the canonical type
3582    /// (matching the proto wire format) and Java's `MessageIdImpl` cannot
3583    /// represent negative values either, so a negative wire value is a
3584    /// broker bug we must surface rather than silently wrap.
3585    fn try_into_message_id(self) -> Result<MessageId, AdminError> {
3586        let ledger_id = u64::try_from(self.ledger_id).map_err(|_| {
3587            AdminError::Protocol(format!("negative ledgerId from broker: {}", self.ledger_id))
3588        })?;
3589        let entry_id = u64::try_from(self.entry_id).map_err(|_| {
3590            AdminError::Protocol(format!("negative entryId from broker: {}", self.entry_id))
3591        })?;
3592        Ok(MessageId {
3593            ledger_id,
3594            entry_id,
3595            partition: self.partition_index,
3596            batch_index: -1,
3597            batch_size: -1,
3598            // PIP-460 (ADR-0031): admin REST never resolves a scalable
3599            // segment id; the field only exists under `scalable-topics`.
3600            #[cfg(feature = "scalable-topics")]
3601            segment_id: None,
3602        })
3603    }
3604}
3605
3606/// Topic stats. Intentionally permissive: the Java
3607/// `PersistentTopicStatsImpl` shape is large and shifts between releases;
3608/// we extract the high-signal rates, throughput, sizes, and counters and
3609/// pass the rest through as raw JSON.
3610///
3611/// All scalar fields default to `0` / `0.0` when the broker omits them, so a
3612/// release that drops or renames one decodes cleanly instead of failing the
3613/// whole stats call.
3614#[derive(Debug, Clone, Default, Deserialize)]
3615#[serde(default)]
3616pub struct TopicStats {
3617    /// Current publish rate into the topic, in messages/sec, averaged over
3618    /// the broker's most recent stats window. Java: `PersistentTopicStats#msgRateIn`.
3619    #[serde(rename = "msgRateIn")]
3620    pub msg_rate_in: f64,
3621    /// Current dispatch rate out of the topic, in messages/sec.
3622    /// Java: `PersistentTopicStats#msgRateOut`.
3623    #[serde(rename = "msgRateOut")]
3624    pub msg_rate_out: f64,
3625    /// Current publish throughput into the topic, in bytes/sec.
3626    /// Java: `PersistentTopicStats#msgThroughputIn`.
3627    #[serde(rename = "msgThroughputIn")]
3628    pub msg_throughput_in: f64,
3629    /// Current dispatch throughput out of the topic, in bytes/sec.
3630    /// Java: `PersistentTopicStats#msgThroughputOut`.
3631    #[serde(rename = "msgThroughputOut")]
3632    pub msg_throughput_out: f64,
3633    /// Average message size, in bytes, over the most recent stats window.
3634    /// Java: `PersistentTopicStats#averageMsgSize`.
3635    #[serde(rename = "averageMsgSize")]
3636    pub average_msg_size: f64,
3637    /// Total messages received.
3638    #[serde(rename = "msgInCounter")]
3639    pub msg_in_counter: i64,
3640    /// Total bytes received.
3641    #[serde(rename = "bytesInCounter")]
3642    pub bytes_in_counter: i64,
3643    /// Total storage used by the topic's managed ledger, in bytes (includes
3644    /// replicas). Java: `PersistentTopicStats#storageSize`.
3645    #[serde(rename = "storageSize")]
3646    pub storage_size: i64,
3647    /// Current backlog size across all subscriptions, in bytes.
3648    /// Java: `PersistentTopicStats#backlogSize`.
3649    #[serde(rename = "backlogSize")]
3650    pub backlog_size: i64,
3651    /// Publishers, raw JSON because the schema is large and version-dependent.
3652    pub publishers: Vec<serde_json::Value>,
3653    /// Subscriptions map (raw JSON).
3654    pub subscriptions: serde_json::Value,
3655}
3656
3657/// Partitioned-topic metadata, as returned by
3658/// `GET /admin/v2/persistent/{tenant}/{namespace}/{topic}/partitions`.
3659/// Java: `org.apache.pulsar.common.partition.PartitionedTopicMetadata`.
3660/// Only the partition count is consumed; broker-side extensions are ignored.
3661#[derive(Debug, Clone, Default, Deserialize)]
3662#[serde(default)]
3663struct PartitionedTopicMetadata {
3664    partitions: u32,
3665}
3666
3667/// Java `RetentionPolicies` — namespace-level retention policy. `-1` for
3668/// either dimension means infinite. The broker applies whichever quota
3669/// becomes binding first (time OR size).
3670#[derive(Debug, Clone, Copy, Default, Deserialize, Serialize)]
3671#[serde(default, rename_all = "camelCase")]
3672pub struct RetentionPolicies {
3673    /// Maximum retention time in minutes. `-1` = infinite, `0` = none.
3674    pub retention_time_in_minutes: i32,
3675    /// Maximum retention size in megabytes. `-1` = infinite, `0` = none.
3676    #[serde(rename = "retentionSizeInMB")]
3677    pub retention_size_in_mb: i64,
3678}
3679
3680/// Java `PersistencePolicies` — namespace-level `BookKeeper` layout +
3681/// managed-ledger write-shaping knobs. Maps to the broker's
3682/// `org.apache.pulsar.common.policies.data.PersistencePolicies`.
3683///
3684/// `Default::default()` returns the broker's documented default for a
3685/// new namespace (`2/2/2/0.0`), NOT all zeros — the broker rejects
3686/// ensemble values < 1 on `set`, so an all-zero policy is unusable
3687/// for a round-trip. Missing fields on decode default the same way:
3688/// a partial body where the broker omits one field round-trips with
3689/// the legal default, never with the illegal `0`.
3690#[derive(Debug, Clone, Copy, Deserialize, Serialize)]
3691#[serde(rename_all = "camelCase")]
3692pub struct PersistencePolicies {
3693    /// `BookKeeper` ensemble size — the number of bookies the managed
3694    /// ledger striping is spread across. Default: 2.
3695    #[serde(default = "default_bookkeeper_quorum")]
3696    pub bookkeeper_ensemble: i32,
3697    /// `BookKeeper` write quorum — the number of bookies each entry is
3698    /// written to. Default: 2.
3699    #[serde(default = "default_bookkeeper_quorum")]
3700    pub bookkeeper_write_quorum: i32,
3701    /// `BookKeeper` ack quorum — the number of acks required before an
3702    /// add is considered durable. Default: 2.
3703    #[serde(default = "default_bookkeeper_quorum")]
3704    pub bookkeeper_ack_quorum: i32,
3705    /// Managed-ledger mark-delete-rate cap (ops/sec). `0.0` disables
3706    /// the throttle. Default: 0.0 (disabled).
3707    #[serde(default)]
3708    pub managed_ledger_max_mark_delete_rate: f64,
3709}
3710
3711impl Default for PersistencePolicies {
3712    fn default() -> Self {
3713        Self {
3714            bookkeeper_ensemble: 2,
3715            bookkeeper_write_quorum: 2,
3716            bookkeeper_ack_quorum: 2,
3717            managed_ledger_max_mark_delete_rate: 0.0,
3718        }
3719    }
3720}
3721
3722#[inline]
3723fn default_bookkeeper_quorum() -> i32 {
3724    2
3725}
3726
3727/// Java `DispatchRate` — a sliding-window throttle (msg/sec + byte/sec
3728/// over a `ratePeriodInSecond` window). Shared shape between the
3729/// per-namespace consumer dispatch rate, the per-subscription dispatch
3730/// rate, and the cross-cluster replicator dispatch rate.
3731///
3732/// `-1` on either rate dimension disables that axis of the throttle —
3733/// missing fields default to `-1` (not `0`) so a broker-omitted
3734/// dimension round-trips as "no throttle", never as "throttle to
3735/// zero" (which would block consumer dispatch on the namespace).
3736#[derive(Debug, Clone, Copy, Deserialize, Serialize)]
3737#[serde(rename_all = "camelCase")]
3738pub struct DispatchRate {
3739    /// Throttle in messages/sec. `-1` = unlimited.
3740    #[serde(default = "neg_one_i32")]
3741    pub dispatch_throttling_rate_in_msg: i32,
3742    /// Throttle in bytes/sec. `-1` = unlimited.
3743    #[serde(default = "neg_one_i64")]
3744    pub dispatch_throttling_rate_in_byte: i64,
3745    /// Window size in seconds the throttle averages over.
3746    #[serde(default = "default_rate_period_seconds")]
3747    pub rate_period_in_second: i32,
3748    /// If `true`, dispatch rate is interpreted as an addend on top of
3749    /// the namespace publish rate rather than an absolute cap.
3750    #[serde(default)]
3751    pub relative_to_publish_rate: bool,
3752}
3753
3754impl Default for DispatchRate {
3755    fn default() -> Self {
3756        Self {
3757            dispatch_throttling_rate_in_msg: -1,
3758            dispatch_throttling_rate_in_byte: -1,
3759            rate_period_in_second: 1,
3760            relative_to_publish_rate: false,
3761        }
3762    }
3763}
3764
3765/// Java `PublishRate` — producer-side throttle (msg/sec + byte/sec).
3766/// `-1` on either dimension disables that axis of the throttle.
3767/// Missing fields default to `-1` (not `0`) — same sentinel semantics
3768/// as [`DispatchRate`]. Unlike `DispatchRate`, there is no
3769/// rate-period field; the broker uses a fixed 1-second window.
3770#[derive(Debug, Clone, Copy, Deserialize, Serialize)]
3771#[serde(rename_all = "camelCase")]
3772pub struct PublishRate {
3773    /// Throttle in messages/sec. `-1` = unlimited.
3774    #[serde(default = "neg_one_i32")]
3775    pub publish_throttling_rate_in_msg: i32,
3776    /// Throttle in bytes/sec. `-1` = unlimited.
3777    #[serde(default = "neg_one_i64")]
3778    pub publish_throttling_rate_in_byte: i64,
3779}
3780
3781impl Default for PublishRate {
3782    fn default() -> Self {
3783        Self {
3784            publish_throttling_rate_in_msg: -1,
3785            publish_throttling_rate_in_byte: -1,
3786        }
3787    }
3788}
3789
3790#[inline]
3791fn neg_one_i32() -> i32 {
3792    -1
3793}
3794#[inline]
3795fn neg_one_i64() -> i64 {
3796    -1
3797}
3798#[inline]
3799fn default_rate_period_seconds() -> i32 {
3800    1
3801}
3802
3803/// Java `DelayedDeliveryPolicies` — namespace-level switch + index-tick
3804/// granularity for the broker's delayed-message delivery tracker.
3805/// Maps to `org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies`.
3806/// `tick_time` controls how often the broker's delay-index buckets are
3807/// re-evaluated; smaller values give tighter delivery accuracy at a
3808/// higher tracker cost.
3809///
3810/// The Java field name is `tickTime` (carrying a `@JsonProperty("tickTime")`
3811/// annotation), **not** `tickTimeMillis`. The unit is documented as
3812/// milliseconds (see the upstream class doc), but the wire key drops
3813/// the unit suffix — Jackson on the broker only binds the literal
3814/// `tickTime`.
3815#[derive(Debug, Clone, Copy, Default, Deserialize, Serialize)]
3816#[serde(default, rename_all = "camelCase")]
3817pub struct DelayedDeliveryPolicies {
3818    /// Whether delayed delivery is enabled for the namespace.
3819    pub active: bool,
3820    /// Index-tick granularity in milliseconds. Wire key `tickTime`.
3821    #[serde(rename = "tickTime")]
3822    pub tick_time_millis: i64,
3823}
3824
3825/// Java `BacklogQuota` — one entry in the namespace-level backlog quota
3826/// map. `policy` is a string (`producer_request_hold`,
3827/// `producer_exception`, `consumer_backlog_eviction`) rather than a
3828/// closed Rust enum so new broker enum values forward-decode cleanly.
3829///
3830/// `-1` on either limit dimension disables that axis — missing fields
3831/// default to `-1` (not `0`) so a broker-omitted dimension round-trips
3832/// as "no quota", never as "expire-everything" or "block-everything".
3833#[derive(Debug, Clone, Deserialize, Serialize)]
3834#[serde(rename_all = "camelCase")]
3835pub struct BacklogQuota {
3836    /// Maximum allowed backlog in bytes (when type=`destination_storage`).
3837    /// `-1` = unlimited.
3838    #[serde(default = "neg_one_i64")]
3839    pub limit_size: i64,
3840    /// Maximum allowed backlog age in seconds (when type=`message_age`).
3841    /// `-1` = unlimited.
3842    #[serde(default = "neg_one_i32")]
3843    pub limit_time: i32,
3844    /// Action when the quota is exceeded.
3845    #[serde(default)]
3846    pub policy: String,
3847}
3848
3849impl Default for BacklogQuota {
3850    fn default() -> Self {
3851        Self {
3852            limit_size: -1,
3853            limit_time: -1,
3854            policy: String::new(),
3855        }
3856    }
3857}
3858
3859/// Java `BookieInfo` — a single bookie's rack assignment, as stored in
3860/// the `racks-info` metadata path and shipped on
3861/// [`AdminClient::bookies_set_rack`]. Field names are camelCase on the
3862/// wire (matching `org.apache.pulsar.common.policies.data.BookieInfo`,
3863/// which carries only `rack` and `hostname`).
3864///
3865/// The placement group is **not** part of this body — Pulsar's
3866/// `BookiesBase#updateBookieRackInfo` exposes `group` as a
3867/// `@QueryParam`, and the JSON body Jackson-binds only to
3868/// `{rack, hostname}`. Treating it as a body field silently drops the
3869/// operator's group choice on the wire.
3870#[derive(Debug, Clone, Default, Deserialize, Serialize)]
3871#[serde(default, rename_all = "camelCase")]
3872pub struct BookieInfo {
3873    /// Rack identifier within the group — opaque to the broker, only
3874    /// the placement policy cares about it.
3875    pub rack: String,
3876    /// Resolved hostname for the bookie. The broker uses it for
3877    /// log lines; it does not have to match DNS.
3878    pub hostname: String,
3879}
3880
3881/// Java `PostSchemaPayload` — the request body for
3882/// [`AdminClient::schema_post`] and
3883/// [`AdminClient::schema_compatibility_check`]. The Java DTO has
3884/// (`type`, `schema`, `properties`); both keys travel as-is on the wire.
3885/// `schema` is the canonical-form blob for AVRO / JSON / PROTOBUF and
3886/// the protobuf descriptor for `PROTOBUF_NATIVE`.
3887#[derive(Debug, Clone, Default, Deserialize, Serialize)]
3888#[serde(default)]
3889pub struct PostSchemaPayload {
3890    /// Schema type (`AVRO` / `JSON` / `PROTOBUF` / `PROTOBUF_NATIVE` /
3891    /// `KEY_VALUE` / `STRING` / `BYTES` / ...).
3892    #[serde(rename = "type")]
3893    pub schema_type: String,
3894    /// Schema definition, encoded per the type axis.
3895    pub schema: String,
3896    /// User-defined per-schema properties.
3897    pub properties: std::collections::HashMap<String, String>,
3898}
3899
3900/// Java `SourceConfig` — declarative description of a Pulsar IO
3901/// Source. Mirrors `org.apache.pulsar.common.io.SourceConfig` (Jackson
3902/// camelCase on the wire). Only the fields the JAX-RS `create` /
3903/// `update` paths require are typed; per-connector knobs ride along in
3904/// the open-ended `configs` map so a forward broker can add fields
3905/// without a magnetar release.
3906#[derive(Debug, Clone, Default, Deserialize, Serialize)]
3907#[serde(default, rename_all = "camelCase")]
3908pub struct SourceConfig {
3909    /// Tenant owning the source. Must match the URL path tenant.
3910    pub tenant: String,
3911    /// Namespace owning the source. Must match the URL path namespace.
3912    pub namespace: String,
3913    /// Source name. Must match the URL path name.
3914    pub name: String,
3915    /// Fully-qualified connector class (e.g.
3916    /// `org.apache.pulsar.io.kafka.KafkaSource`).
3917    pub class_name: String,
3918    /// Destination topic the source writes to.
3919    pub topic_name: String,
3920    /// Number of source instances to schedule.
3921    pub parallelism: i32,
3922    /// Connector-specific configuration map. Skipped from JSON when
3923    /// `None` so a `null` does not override the broker default.
3924    #[serde(skip_serializing_if = "Option::is_none")]
3925    pub configs: Option<serde_json::Value>,
3926}
3927
3928/// Java `SinkConfig` — declarative description of a Pulsar IO Sink.
3929/// Mirrors `org.apache.pulsar.common.io.SinkConfig` (Jackson camelCase
3930/// on the wire). The `inputs` slot is the list of source topics the
3931/// sink reads from — the broker accepts either fully-qualified topic
3932/// names or `tenant/namespace/topic` shorthand. Per-connector knobs
3933/// ride in `configs` for the same forward-compat reason as
3934/// [`SourceConfig`].
3935#[derive(Debug, Clone, Default, Deserialize, Serialize)]
3936#[serde(default, rename_all = "camelCase")]
3937pub struct SinkConfig {
3938    /// Tenant owning the sink. Must match the URL path tenant.
3939    pub tenant: String,
3940    /// Namespace owning the sink. Must match the URL path namespace.
3941    pub namespace: String,
3942    /// Sink name. Must match the URL path name.
3943    pub name: String,
3944    /// Fully-qualified connector class (e.g.
3945    /// `org.apache.pulsar.io.jdbc.PostgresJdbcAutoSchemaSink`).
3946    pub class_name: String,
3947    /// Source topics the sink subscribes to.
3948    pub inputs: Vec<String>,
3949    /// Number of sink instances to schedule.
3950    pub parallelism: i32,
3951    /// Connector-specific configuration map. Skipped from JSON when
3952    /// `None` so a `null` does not override the broker default.
3953    #[serde(skip_serializing_if = "Option::is_none")]
3954    pub configs: Option<serde_json::Value>,
3955}
3956
3957/// Pulsar Packages namespace dimension — the `{type}` segment of the
3958/// `/admin/v3/packages/{type}/...` URL. Maps to upstream's
3959/// `PackageType` enum
3960/// (`pulsar-packages-management/pulsar-packages-management-core/.../
3961/// PackageType.java`): the broker only accepts the three lowercase
3962/// tokens `function`, `source`, `sink` and rejects everything else
3963/// with 400. Modelled as a closed Rust enum so the URL builder
3964/// cannot emit a value the broker will refuse.
3965#[derive(Debug, Clone, Copy, PartialEq, Eq)]
3966pub enum PackageType {
3967    /// `function` — Pulsar Functions JAR.
3968    Function,
3969    /// `source` — Pulsar IO Source NAR.
3970    Source,
3971    /// `sink` — Pulsar IO Sink NAR.
3972    Sink,
3973}
3974
3975impl PackageType {
3976    /// Render as the lowercase token the broker URL surface expects.
3977    #[must_use]
3978    pub fn as_str(self) -> &'static str {
3979        match self {
3980            Self::Function => "function",
3981            Self::Source => "source",
3982            Self::Sink => "sink",
3983        }
3984    }
3985}
3986
3987impl std::str::FromStr for PackageType {
3988    type Err = AdminError;
3989
3990    /// Parse from the lowercase tokens the broker emits (`function` /
3991    /// `source` / `sink`). Hyphenated aliases are accepted to make the
3992    /// CLI feel idiomatic — `package-type=source` vs the broker's
3993    /// `source` are equivalent.
3994    fn from_str(s: &str) -> Result<Self, AdminError> {
3995        match s.to_ascii_lowercase().as_str() {
3996            "function" | "functions" => Ok(Self::Function),
3997            "source" | "sources" => Ok(Self::Source),
3998            "sink" | "sinks" => Ok(Self::Sink),
3999            other => Err(AdminError::InvalidName(format!(
4000                "unknown package type {other:?} (expected: function | source | sink)"
4001            ))),
4002        }
4003    }
4004}
4005
4006/// Java `PackageMetadata` — the metadata envelope Pulsar Packages
4007/// attaches to each `(type, tenant, namespace, name, version)` tuple.
4008/// Mirrors `org.apache.pulsar.packages.management.core.common.PackageMetadata`
4009/// (Jackson camelCase on the wire). `modification_time` is a
4010/// broker-side timestamp in milliseconds-since-epoch — the broker
4011/// emits it on `GET` and ignores caller-supplied values on `PUT`
4012/// (overwriting them with the receive timestamp).
4013#[derive(Debug, Clone, Default, Deserialize, Serialize)]
4014#[serde(default, rename_all = "camelCase")]
4015pub struct PackageMetadata {
4016    /// Free-form package description.
4017    pub description: String,
4018    /// Maintainer contact (typically an email or team handle).
4019    pub contact: String,
4020    /// Last-modification timestamp in ms-since-epoch. Read-only for
4021    /// callers — the broker overwrites the value on `PUT`.
4022    pub modification_time: i64,
4023    /// Arbitrary key/value labels (release notes, CI ids, etc.).
4024    pub properties: std::collections::HashMap<String, String>,
4025}
4026
4027/// Java `BacklogQuotaType` — selects which dimension a `BacklogQuota`
4028/// entry limits.
4029#[derive(Debug, Clone, Copy, PartialEq, Eq)]
4030pub enum BacklogQuotaType {
4031    /// Bytes-on-disk dimension. Uses `BacklogQuota::limit_size`.
4032    DestinationStorage,
4033    /// Message-age dimension. Uses `BacklogQuota::limit_time`.
4034    MessageAge,
4035}
4036
4037impl BacklogQuotaType {
4038    /// Render as the lowercase `snake_case` value the broker REST surface
4039    /// expects in the `backlogQuotaType` query parameter.
4040    #[must_use]
4041    pub fn as_query_value(self) -> &'static str {
4042        match self {
4043            Self::DestinationStorage => "destination_storage",
4044            Self::MessageAge => "message_age",
4045        }
4046    }
4047}
4048
4049/// Java `LongRunningProcessStatus` — the polling shape for triggered
4050/// background jobs (compaction, offload). The broker returns one of four
4051/// `status` values: `NOT_RUN` (never triggered), `RUNNING`, `SUCCESS`,
4052/// `ERROR`. `last_error` is populated only on `ERROR`.
4053#[derive(Debug, Clone, Default, Deserialize, Serialize)]
4054#[serde(default, rename_all = "camelCase")]
4055pub struct LongRunningProcessStatus {
4056    /// Job state — `NOT_RUN`, `RUNNING`, `SUCCESS`, or `ERROR`.
4057    pub status: String,
4058    /// Human-readable error message, present on `ERROR`.
4059    pub last_error: String,
4060}
4061
4062/// Request body for `POST .../subscription/{sub}/resetcursor` (Java
4063/// `ResetCursorData`). The CLI exposes `message_id` and `is_excluded`;
4064/// Pulsar's `batchIndexes` / `properties` fields are not currently set
4065/// — they exist for transactional dedup metadata and would require
4066/// txn-aware callers anyway.
4067///
4068/// `ledger_id` / `entry_id` are `i64` on the wire because Pulsar
4069/// Jackson-binds them to Java `long`. `MessageId` on the Rust side uses
4070/// `u64` (matching the wire-protocol envelope), with `u64::MAX` as the
4071/// `EARLIEST` / `LATEST` sentinel — the conversion below maps those
4072/// sentinels to `-1` (the Java sentinel) so a reset-to-earliest /
4073/// reset-to-latest doesn't overflow Java's `long` parser.
4074#[derive(Debug, Serialize)]
4075#[serde(rename_all = "camelCase")]
4076struct ResetCursorData {
4077    ledger_id: i64,
4078    entry_id: i64,
4079    partition_index: i32,
4080    batch_index: i32,
4081    #[serde(rename = "isExcluded")]
4082    is_excluded: bool,
4083}
4084
4085/// Map a `MessageId` u64 ledger/entry id onto Pulsar's Java-`long`
4086/// wire field, translating the Rust-side `u64::MAX` sentinel (used by
4087/// `MessageId::EARLIEST` / `LATEST`) to Java's `-1` sentinel.
4088/// Non-sentinel values are passed through verbatim — Pulsar's
4089/// `LedgerHandle` / `EntryId` indices fit in `i64::MAX` long before
4090/// overflowing.
4091#[inline]
4092fn message_id_field_for_wire(value: u64) -> i64 {
4093    if value == u64::MAX {
4094        -1
4095    } else {
4096        // Pulsar entry indices fit in i63 — `as i64` cannot overflow
4097        // for any legitimate broker-emitted value.
4098        value as i64
4099    }
4100}
4101
4102/// Builder for [`AdminClient`].
4103#[derive(Debug, Default)]
4104pub struct AdminClientBuilder {
4105    base_url: Option<Url>,
4106    auth: AdminAuth,
4107    timeout: Option<Duration>,
4108    /// Extra CA root, PEM-encoded, added to reqwest's trust store. Mirrors
4109    /// pulsarctl's `tls_trust_certs_file_path`.
4110    tls_trust_cert_pem: Option<Vec<u8>>,
4111    /// Disable certificate verification. Mirrors pulsarctl's
4112    /// `tls_allow_insecure_connection`. **Insecure** — defeats MITM
4113    /// protection; only for self-signed dev brokers.
4114    tls_allow_insecure: bool,
4115}
4116
4117impl AdminClientBuilder {
4118    /// Set the service URL — the base for `/admin/v2/...`. Required.
4119    #[must_use]
4120    pub fn service_url(mut self, url: Url) -> Self {
4121        self.base_url = Some(url);
4122        self
4123    }
4124
4125    /// Configure bearer-token auth (`Authorization: Bearer <token>`).
4126    #[must_use]
4127    pub fn token(mut self, token: String) -> Self {
4128        self.auth = AdminAuth::Token(token);
4129        self
4130    }
4131
4132    /// Configure `OAuth2` `client_credentials` auth. The shared flow's token
4133    /// cache is refreshed on demand at request time (see [`AdminAuth::OAuth2`]).
4134    #[must_use]
4135    pub fn oauth2(mut self, flow: Arc<ClientCredentialsFlow>) -> Self {
4136        self.auth = AdminAuth::OAuth2(flow);
4137        self
4138    }
4139
4140    /// Add a custom CA root (PEM bytes) to the HTTPS trust store. Mirrors
4141    /// pulsarctl's `tls_trust_certs_file_path`. The CLI reads the file and
4142    /// passes its bytes here.
4143    #[must_use]
4144    pub fn tls_trust_cert_pem(mut self, pem: Vec<u8>) -> Self {
4145        self.tls_trust_cert_pem = Some(pem);
4146        self
4147    }
4148
4149    /// Disable TLS certificate verification (`danger_accept_invalid_certs`).
4150    /// Mirrors pulsarctl's `tls_allow_insecure_connection`. **Insecure** —
4151    /// only for self-signed dev brokers; never in production.
4152    #[must_use]
4153    pub fn tls_allow_insecure(mut self, allow: bool) -> Self {
4154        self.tls_allow_insecure = allow;
4155        self
4156    }
4157
4158    /// Override the request timeout. Defaults to [`DEFAULT_TIMEOUT`].
4159    #[must_use]
4160    pub fn timeout(mut self, dur: Duration) -> Self {
4161        self.timeout = Some(dur);
4162        self
4163    }
4164
4165    /// Build the client.
4166    pub fn build(self) -> Result<AdminClient, AdminError> {
4167        let base_url = self
4168            .base_url
4169            .ok_or_else(|| AdminError::Builder("service_url is required".into()))?;
4170        if base_url.cannot_be_a_base() {
4171            return Err(AdminError::Builder(format!(
4172                "service_url cannot be a base url: {base_url}"
4173            )));
4174        }
4175        // Anchor every V2 API call below `/admin/v2/` and every V3 call
4176        // below `/admin/v3/`. We append the suffix here so callers pass
4177        // plain `http://broker:8080` rather than baking either prefix in.
4178        //
4179        // `Url::join` follows WHATWG semantics: if `base_url` has no
4180        // trailing slash, its last path segment is replaceable. So
4181        // `http://broker/pulsar` + `admin/v2/` would yield
4182        // `http://broker/admin/v2/` — the `pulsar` prefix silently
4183        // dropped (common for path-prefixed K8s ingresses). Normalise
4184        // the base to end with `/` first so the join always appends.
4185        let base_url = {
4186            let mut b = base_url.clone();
4187            if !b.path().ends_with('/') {
4188                b.set_path(&format!("{}/", b.path()));
4189            }
4190            b
4191        };
4192        let base_url_v3 = base_url.join("admin/v3/")?;
4193        let base_url = base_url.join("admin/v2/")?;
4194
4195        // reqwest 0.13 panics in `Client::builder().build()` when the active
4196        // `rustls` flavor is `rustls-no-provider` and no global
4197        // `CryptoProvider` is installed. That happens whenever more than one
4198        // `crypto-*` feature is unified (e.g. default `crypto-aws-lc-rs`
4199        // plus an explicit `crypto-ring`), so install the default here —
4200        // the shim is idempotent and a no-op once a provider is set, which
4201        // covers parallel callers and processes that also boot the tokio
4202        // engine.
4203        tls_crypto::install_default_provider();
4204
4205        let timeout = self.timeout.unwrap_or(DEFAULT_TIMEOUT);
4206        let http_builder = reqwest::Client::builder().timeout(timeout);
4207        // The custom-CA / allow-insecure reqwest knobs only exist when a
4208        // rustls TLS feature (`__tls`) is compiled in, which every `crypto-*`
4209        // feature enables. The binary crate always selects one provider, so
4210        // this branch is live in production; a no-crypto library build that
4211        // asks for TLS options gets a clear builder error rather than a
4212        // silently-ignored option.
4213        let http = apply_tls_options(
4214            http_builder,
4215            self.tls_trust_cert_pem,
4216            self.tls_allow_insecure,
4217        )?
4218        .build()
4219        .map_err(AdminError::Http)?;
4220
4221        Ok(AdminClient {
4222            base_url,
4223            base_url_v3,
4224            http,
4225            auth: self.auth,
4226        })
4227    }
4228}
4229
4230/// Errors returned by the admin client.
4231#[derive(Debug, thiserror::Error)]
4232pub enum AdminError {
4233    /// Transport-layer error from `reqwest`.
4234    #[error("http error: {0}")]
4235    Http(#[from] reqwest::Error),
4236    /// API returned a non-success HTTP status.
4237    ///
4238    /// `method` and `url` identify the exact request that failed, so an
4239    /// operator can tell at a glance whether they hit the wrong endpoint,
4240    /// cluster, or proxy without re-deriving the URL from the call site.
4241    #[error("api error {code} from {method} {url}: {body}")]
4242    Status {
4243        /// HTTP method of the failed request (`GET`, `POST`, …).
4244        method: String,
4245        /// Full request URL the client dispatched.
4246        url: String,
4247        /// HTTP status code.
4248        code: u16,
4249        /// Response body (or a placeholder if reading the body failed).
4250        body: String,
4251    },
4252    /// A 2xx response body could not be decoded as the expected JSON.
4253    ///
4254    /// Carries the request `method` + `url`, the HTTP `status`, the
4255    /// response `content_type`, and a truncated body `snippet` so the
4256    /// failure is self-diagnosing: a `text/html` body or an empty
4257    /// `<none>` content-type on a 200 almost always means the request
4258    /// was answered by the wrong endpoint, a reverse proxy, or an auth
4259    /// redirect rather than the broker's admin API. Replaces the bare
4260    /// `serde_json` "expected value at line 1 column 1" message.
4261    #[error(
4262        "unexpected response from {method} {url}: HTTP {status}, content-type {content_type}, body: {snippet}"
4263    )]
4264    Decode {
4265        /// HTTP method of the request whose response failed to decode.
4266        method: String,
4267        /// Full request URL the client dispatched.
4268        url: String,
4269        /// HTTP status code of the response (always 2xx here).
4270        status: u16,
4271        /// Response `Content-Type` header, or `<none>` when absent.
4272        content_type: String,
4273        /// First ~256 bytes of the body (UTF-8 lossy), truncated with an
4274        /// ellipsis marker when longer.
4275        snippet: String,
4276        /// The underlying serde decode error.
4277        #[source]
4278        source: serde_json::Error,
4279    },
4280    /// JSON encode error — building a request body from a Rust value.
4281    ///
4282    /// Response-decode failures surface as [`AdminError::Decode`]; this
4283    /// variant covers the rare serialization-side failure (e.g.
4284    /// `build_url_config_multipart`).
4285    #[error("json encode: {0}")]
4286    Json(#[from] serde_json::Error),
4287    /// URL parse / construction error.
4288    #[error("invalid url: {0}")]
4289    Url(#[from] url::ParseError),
4290    /// Builder configuration error (missing service URL, invalid argument...).
4291    #[error("invalid builder: {0}")]
4292    Builder(String),
4293    /// Authentication failure — e.g. the `OAuth2` `client_credentials` exchange
4294    /// failed or returned an empty access token.
4295    #[error("auth error: {0}")]
4296    Auth(String),
4297    /// Caller passed a namespace or topic name that the client could not parse.
4298    #[error("invalid name: {0}")]
4299    InvalidName(String),
4300    /// Broker returned a response that violates the documented wire contract
4301    /// (e.g. negative `ledgerId` from `getMessageIdByIndex`, which Java
4302    /// `MessageIdImpl` cannot represent either).
4303    #[error("broker protocol violation: {0}")]
4304    Protocol(String),
4305}
4306
4307/// Apply the pulsarctl-derived TLS options (custom CA root + allow-insecure)
4308/// to a reqwest [`ClientBuilder`].
4309///
4310/// `add_root_certificate` / `danger_accept_invalid_certs` / `Certificate`
4311/// live behind reqwest's `__tls` cfg, enabled by every `crypto-*` feature.
4312/// The two-variant cfg split keeps a no-crypto library build compiling: with
4313/// no TLS feature, asking for either option is a hard builder error rather
4314/// than a silently-dropped knob.
4315#[cfg(any(
4316    feature = "crypto-aws-lc-rs",
4317    feature = "crypto-ring",
4318    feature = "crypto-openssl",
4319    feature = "crypto-fips",
4320))]
4321fn apply_tls_options(
4322    mut builder: reqwest::ClientBuilder,
4323    trust_cert_pem: Option<Vec<u8>>,
4324    allow_insecure: bool,
4325) -> Result<reqwest::ClientBuilder, AdminError> {
4326    // Custom CA trust (pulsarctl `tls_trust_certs_file_path`). reqwest's
4327    // `add_root_certificate` adds the cert *alongside* the platform roots,
4328    // matching Pulsar's "extra trust anchor" semantics.
4329    if let Some(pem) = trust_cert_pem {
4330        let cert = reqwest::Certificate::from_pem(&pem)
4331            .map_err(|err| AdminError::Builder(format!("invalid tls trust cert PEM: {err}")))?;
4332        builder = builder.add_root_certificate(cert);
4333    }
4334    // Allow-insecure (pulsarctl `tls_allow_insecure_connection`). Defeats
4335    // certificate verification — only meaningful against self-signed dev
4336    // brokers, hence gated behind the explicit opt-in flag.
4337    if allow_insecure {
4338        builder = builder.danger_accept_invalid_certs(true);
4339    }
4340    Ok(builder)
4341}
4342
4343/// No-TLS fallback: a library build with no `crypto-*` feature cannot honour
4344/// TLS options. Accept the builder unchanged when no option is requested;
4345/// error clearly when one is.
4346#[cfg(not(any(
4347    feature = "crypto-aws-lc-rs",
4348    feature = "crypto-ring",
4349    feature = "crypto-openssl",
4350    feature = "crypto-fips",
4351)))]
4352fn apply_tls_options(
4353    builder: reqwest::ClientBuilder,
4354    trust_cert_pem: Option<Vec<u8>>,
4355    allow_insecure: bool,
4356) -> Result<reqwest::ClientBuilder, AdminError> {
4357    if trust_cert_pem.is_some() || allow_insecure {
4358        return Err(AdminError::Builder(
4359            "TLS options (trust cert / allow-insecure) require a crypto-* feature".to_owned(),
4360        ));
4361    }
4362    Ok(builder)
4363}
4364
4365/// Attach `Authorization: Bearer <tok>` to a request builder.
4366///
4367/// Shared by the `Token` and `OAuth2` auth arms — both ultimately set a
4368/// bearer header; only the source of the token bytes differs.
4369fn bearer(req: RequestBuilder, tok: &str) -> Result<RequestBuilder, AdminError> {
4370    let value = format!("Bearer {tok}");
4371    let mut headers = HeaderMap::new();
4372    let header_value = HeaderValue::from_str(&value)
4373        .map_err(|err| AdminError::Builder(format!("invalid bearer token: {err}")))?;
4374    headers.insert(AUTHORIZATION, header_value);
4375    Ok(req.headers(headers))
4376}
4377
4378/// Maximum number of body bytes echoed back in [`AdminError::Decode`].
4379/// Enough to recognise an HTML error page or a plain-text proxy banner
4380/// without dumping a whole payload into the error message.
4381const DECODE_SNIPPET_LIMIT: usize = 256;
4382
4383/// Build an [`AdminError::Decode`] from a failed-to-decode response's
4384/// parts. Centralises content-type extraction and body-snippet
4385/// truncation so every JSON decoder reports the same enriched context.
4386fn decode_error(
4387    method: &Method,
4388    url: &Url,
4389    status: u16,
4390    headers: &HeaderMap,
4391    body: &[u8],
4392    source: serde_json::Error,
4393) -> AdminError {
4394    let content_type = headers
4395        .get(reqwest::header::CONTENT_TYPE)
4396        .and_then(|v| v.to_str().ok())
4397        .map_or_else(|| "<none>".to_owned(), ToOwned::to_owned);
4398    let snippet = if body.len() > DECODE_SNIPPET_LIMIT {
4399        format!(
4400            "{}… (truncated)",
4401            String::from_utf8_lossy(&body[..DECODE_SNIPPET_LIMIT])
4402        )
4403    } else {
4404        String::from_utf8_lossy(body).into_owned()
4405    };
4406    AdminError::Decode {
4407        method: method.to_string(),
4408        url: url.to_string(),
4409        status,
4410        content_type,
4411        snippet,
4412        source,
4413    }
4414}
4415
4416/// Decode a non-error JSON response body.
4417async fn json_ok<T>(api: ApiResponse) -> Result<T, AdminError>
4418where
4419    T: for<'de> Deserialize<'de>,
4420{
4421    let api = ensure_status(api).await?;
4422    let ApiResponse { method, url, resp } = api;
4423    let status = resp.status().as_u16();
4424    let headers = resp.headers().clone();
4425    let bytes = resp.bytes().await?;
4426    serde_json::from_slice(&bytes)
4427        .map_err(|err| decode_error(&method, &url, status, &headers, &bytes, err))
4428}
4429
4430/// Decode a JSON response body, defaulting to `T::default()` when the
4431/// broker returns 204 / empty body / literal `null`. Pulsar's
4432/// "policy-unset" pattern collapses to one of those three encodings:
4433///
4434/// - `getRetention` / `getPersistence` / `getDispatchRate` / `getPublishRate` on a fresh namespace
4435///   surface `null` through Jersey's `resume(null)`, which travels as 204 No Content;
4436/// - `getTopicPolicies` on a topic with no explicit policy returns either 204 or the literal JSON
4437///   `null`.
4438///
4439/// `json_ok::<RetentionPolicies>` errors with `EOF while parsing a
4440/// value` for either case; this helper maps them to
4441/// `RetentionPolicies::default()` (broker semantic: missing fields ==
4442/// broker default). Callers asserting "post-remove returns the
4443/// default" stay correct without changing the public signature.
4444async fn json_ok_or_default<T>(api: ApiResponse) -> Result<T, AdminError>
4445where
4446    T: for<'de> Deserialize<'de> + Default,
4447{
4448    let api = ensure_status(api).await?;
4449    let ApiResponse { method, url, resp } = api;
4450    // Tolerant short-circuits MUST run before any serde attempt.
4451    if resp.status() == StatusCode::NO_CONTENT {
4452        return Ok(T::default());
4453    }
4454    let status = resp.status().as_u16();
4455    let headers = resp.headers().clone();
4456    let bytes = resp.bytes().await?;
4457    if bytes.is_empty() {
4458        return Ok(T::default());
4459    }
4460    // `null` body — broker says "policy unset" — also folds to default.
4461    if bytes.as_ref().trim_ascii() == b"null" {
4462        return Ok(T::default());
4463    }
4464    serde_json::from_slice::<T>(&bytes)
4465        .map_err(|err| decode_error(&method, &url, status, &headers, &bytes, err))
4466}
4467
4468/// Decode a JSON response body that the broker may emit as an empty
4469/// payload to mean "no value here". Pulsar's pattern for "no override
4470/// set" on the policy GET endpoints is to return `204 No Content` (or
4471/// a 200 with an empty body) rather than the literal JSON `null`;
4472/// `serde_json::from_slice::<Option<T>>(b"")` errors with `EOF while
4473/// parsing a value`, so every `namespace_get_*` / `topic_get_*` that
4474/// returns `Option<T>` needs this tolerant decoder.
4475///
4476/// Decoding rules:
4477///   - 204 No Content                  → `Ok(None)`
4478///   - 2xx with empty body bytes       → `Ok(None)`
4479///   - 2xx with the literal `null`     → `Ok(None)`
4480///   - 2xx with a JSON value           → `Ok(Some(value))` via serde
4481async fn json_ok_optional<T>(api: ApiResponse) -> Result<Option<T>, AdminError>
4482where
4483    T: for<'de> Deserialize<'de>,
4484{
4485    let api = ensure_status(api).await?;
4486    let ApiResponse { method, url, resp } = api;
4487    // Tolerant short-circuits MUST run before any serde attempt.
4488    if resp.status() == StatusCode::NO_CONTENT {
4489        return Ok(None);
4490    }
4491    let status = resp.status().as_u16();
4492    let headers = resp.headers().clone();
4493    let bytes = resp.bytes().await?;
4494    if bytes.is_empty() {
4495        return Ok(None);
4496    }
4497    serde_json::from_slice::<Option<T>>(&bytes)
4498        .map_err(|err| decode_error(&method, &url, status, &headers, &bytes, err))
4499}
4500
4501/// Discard a successful no-content response body.
4502async fn empty_ok(api: ApiResponse) -> Result<(), AdminError> {
4503    let _ = ensure_status(api).await?;
4504    Ok(())
4505}
4506
4507/// Convert a non-success response into [`AdminError::Status`]. Returns the
4508/// original [`ApiResponse`] on 2xx so the caller can decode the body.
4509///
4510/// The `Status` error carries the request method + URL so a failure
4511/// names the exact endpoint hit, not just the status code and body.
4512async fn ensure_status(api: ApiResponse) -> Result<ApiResponse, AdminError> {
4513    let status = api.resp.status();
4514    if status.is_success() || status == StatusCode::NO_CONTENT {
4515        return Ok(api);
4516    }
4517    let code = status.as_u16();
4518    let ApiResponse { method, url, resp } = api;
4519    let body = resp
4520        .text()
4521        .await
4522        .unwrap_or_else(|err| format!("<failed to read body: {err}>"));
4523    Err(AdminError::Status {
4524        method: method.to_string(),
4525        url: url.to_string(),
4526        code,
4527        body,
4528    })
4529}
4530
4531/// Split a `tenant/namespace` string into its two segments.
4532/// Reject path segments the `url` crate would silently rewrite. `.` and `..`
4533/// disappear under RFC 3986 dot-segment normalisation; percent-encoded slash
4534/// (`%2F` / `%2f`) lets a hostile name escape its segment; NUL / ASCII
4535/// control bytes have no place in an admin path. Refusing all of these at
4536/// the input boundary keeps the URL the client builds in lock-step with the
4537/// path the broker eventually parses.
4538fn validate_segment(segment: &str) -> Result<(), AdminError> {
4539    if segment.is_empty() {
4540        return Err(AdminError::InvalidName("empty path segment".into()));
4541    }
4542    if segment == "." || segment == ".." {
4543        return Err(AdminError::InvalidName(format!(
4544            "dot segment is not a valid name: {segment:?}",
4545        )));
4546    }
4547    if segment.contains("%2F") || segment.contains("%2f") {
4548        return Err(AdminError::InvalidName(format!(
4549            "percent-encoded slash in segment: {segment:?}",
4550        )));
4551    }
4552    if segment.bytes().any(|b| b < 0x20 || b == 0x7f) {
4553        return Err(AdminError::InvalidName(format!(
4554            "control byte in segment: {segment:?}",
4555        )));
4556    }
4557    Ok(())
4558}
4559
4560/// Split a `tenant/namespace` string into its two segments.
4561///
4562/// Exposed for the CLI (and any other admin-client wrapper) so the
4563/// `tenant/namespace` shape used by the namespace-scoped list verbs
4564/// (`functions list`, `sources list`, `sinks list`, `packages list`)
4565/// validates against the same `validate_segment` rules every admin
4566/// method enforces internally — no parallel parsers, no divergent
4567/// error categories.
4568pub fn split_namespace(ns: &str) -> Result<(&str, &str), AdminError> {
4569    let (tenant, namespace) = ns.split_once('/').ok_or_else(|| {
4570        AdminError::InvalidName(format!("expected tenant/namespace, got {ns:?} (no '/')"))
4571    })?;
4572    if tenant.is_empty() || namespace.is_empty() || namespace.contains('/') {
4573        return Err(AdminError::InvalidName(format!(
4574            "expected tenant/namespace, got {ns:?}"
4575        )));
4576    }
4577    validate_segment(tenant)?;
4578    validate_segment(namespace)?;
4579    Ok((tenant, namespace))
4580}
4581
4582/// Split a `persistent://tenant/namespace/topic` (or `tenant/namespace/topic`)
4583/// into its three path segments. The scheme is optional; if present it must
4584/// be `persistent://`.
4585fn split_topic(topic: &str) -> Result<(&str, &str, &str), AdminError> {
4586    let rest = topic.strip_prefix("persistent://").unwrap_or(topic);
4587    let mut parts = rest.splitn(3, '/');
4588    let tenant = parts.next().unwrap_or("");
4589    let namespace = parts.next().unwrap_or("");
4590    let name = parts.next().unwrap_or("");
4591    if tenant.is_empty() || namespace.is_empty() || name.is_empty() || name.contains('/') {
4592        return Err(AdminError::InvalidName(format!(
4593            "expected [persistent://]tenant/namespace/topic, got {topic:?}"
4594        )));
4595    }
4596    validate_segment(tenant)?;
4597    validate_segment(namespace)?;
4598    validate_segment(name)?;
4599    Ok((tenant, namespace, name))
4600}
4601
4602/// Build the two-part `multipart/form-data` envelope the broker expects
4603/// on URL-based function register / update calls. Order is fixed (`url`
4604/// then `functionConfig`) so wire-level tests can pin the body shape.
4605/// The `functionConfig` part carries an explicit `application/json`
4606/// content type; without it the broker's Jersey
4607/// `FormDataMultiPartFeature` falls back to `text/plain` and refuses
4608/// the JSON.
4609fn function_pkg_form(
4610    pkg_url: &str,
4611    config: &FunctionConfig,
4612) -> Result<reqwest::multipart::Form, AdminError> {
4613    build_url_config_multipart(pkg_url, "functionConfig", config)
4614}
4615
4616/// Split a `tenant/namespace/name` Functions / IO identifier into its
4617/// three segments. Pulsar Functions never carry a `persistent://`
4618/// scheme prefix (functions are not topics), so the parser is stricter
4619/// than the internal `split_topic` (rustdoc cannot resolve the bare
4620/// identifier because `split_topic` is module-private).
4621///
4622/// Exposed for the CLI, which parses the fully qualified name out of a
4623/// single positional argument before calling the admin methods (which
4624/// take `tenant`, `namespace`, `name` separately so the broker's
4625/// per-segment validation maps 1:1 to the URL path).
4626pub fn split_function_id(id: &str) -> Result<(&str, &str, &str), AdminError> {
4627    let mut parts = id.splitn(3, '/');
4628    let tenant = parts.next().unwrap_or("");
4629    let namespace = parts.next().unwrap_or("");
4630    let name = parts.next().unwrap_or("");
4631    if tenant.is_empty() || namespace.is_empty() || name.is_empty() || name.contains('/') {
4632        return Err(AdminError::InvalidName(format!(
4633            "expected tenant/namespace/name, got {id:?}"
4634        )));
4635    }
4636    validate_segment(tenant)?;
4637    validate_segment(namespace)?;
4638    validate_segment(name)?;
4639    Ok((tenant, namespace, name))
4640}
4641
4642/// Build the two-part `multipart/form-data` body Pulsar IO's
4643/// `register*` / `update*` endpoints expect when the package is
4644/// referenced by URL (`http(s)://`, `file://`, `function://`): a `url`
4645/// text part and a `<config_field>` JSON part. The broker enforces
4646/// both parts at the dispatcher boundary
4647/// (`SourcesBase#registerSource`, `SinksBase#registerSink`) — missing
4648/// either yields 400. Generic over the config type so Functions,
4649/// Sources, and Sinks share one helper (`functionConfig` /
4650/// `sourceConfig` / `sinkConfig` via the `config_field` argument).
4651fn build_url_config_multipart<T: Serialize>(
4652    pkg_url: &str,
4653    config_field: &str,
4654    config: &T,
4655) -> Result<reqwest::multipart::Form, AdminError> {
4656    let body = serde_json::to_string(config)?;
4657    // `mime_str` only fails on a malformed string; the literal we pass
4658    // is well-formed so the `expect` is on a never-taken branch.
4659    let config_part = reqwest::multipart::Part::text(body)
4660        .mime_str("application/json")
4661        .expect("application/json is a well-formed media type");
4662    let form = reqwest::multipart::Form::new()
4663        .text("url", pkg_url.to_owned())
4664        .part(config_field.to_owned(), config_part);
4665    Ok(form)
4666}
4667
4668#[cfg(test)]
4669mod tests {
4670    use super::*;
4671
4672    #[test]
4673    fn builder_requires_service_url() {
4674        let err = AdminClient::builder().build().unwrap_err();
4675        assert!(matches!(err, AdminError::Builder(_)));
4676    }
4677
4678    #[test]
4679    fn builder_appends_admin_v2_prefix() {
4680        let client = AdminClient::builder()
4681            .service_url("http://localhost:8080".parse().unwrap())
4682            .build()
4683            .unwrap();
4684        assert_eq!(
4685            client.base_url().as_str(),
4686            "http://localhost:8080/admin/v2/"
4687        );
4688    }
4689
4690    #[test]
4691    fn builder_carries_token() {
4692        let client = AdminClient::builder()
4693            .service_url("http://localhost:8080".parse().unwrap())
4694            .token("abc".into())
4695            .build()
4696            .unwrap();
4697        assert!(matches!(client.auth(), AdminAuth::Token(t) if t == "abc"));
4698    }
4699
4700    #[test]
4701    fn admin_auth_token_debug_redacts_secret() {
4702        let auth = AdminAuth::Token("super-secret-jwt".to_owned());
4703        let rendered = format!("{auth:?}");
4704        assert!(
4705            !rendered.contains("super-secret-jwt"),
4706            "raw token leaked through Debug: {rendered}",
4707        );
4708        assert!(
4709            rendered.contains("<redacted>"),
4710            "expected redaction sentinel in {rendered}"
4711        );
4712        assert!(
4713            rendered.contains("Token"),
4714            "expected variant name in {rendered}"
4715        );
4716
4717        let none_rendered = format!("{:?}", AdminAuth::None);
4718        assert_eq!(none_rendered, "None");
4719    }
4720
4721    #[test]
4722    fn admin_client_debug_does_not_leak_token() {
4723        let client = AdminClient::builder()
4724            .service_url("http://localhost:8080".parse().unwrap())
4725            .token("leaky-token".into())
4726            .build()
4727            .unwrap();
4728        let rendered = format!("{client:?}");
4729        assert!(
4730            !rendered.contains("leaky-token"),
4731            "raw token leaked through AdminClient Debug: {rendered}",
4732        );
4733    }
4734
4735    #[test]
4736    fn split_namespace_ok() {
4737        assert_eq!(
4738            split_namespace("public/default").unwrap(),
4739            ("public", "default")
4740        );
4741    }
4742
4743    #[test]
4744    fn split_namespace_rejects_missing_slash() {
4745        assert!(matches!(
4746            split_namespace("public"),
4747            Err(AdminError::InvalidName(_))
4748        ));
4749    }
4750
4751    #[test]
4752    fn split_namespace_rejects_extra_segment() {
4753        assert!(matches!(
4754            split_namespace("public/default/extra"),
4755            Err(AdminError::InvalidName(_))
4756        ));
4757    }
4758
4759    #[test]
4760    fn split_topic_with_scheme() {
4761        let (t, n, name) = split_topic("persistent://acme/svc/orders").unwrap();
4762        assert_eq!((t, n, name), ("acme", "svc", "orders"));
4763    }
4764
4765    #[test]
4766    fn split_topic_without_scheme() {
4767        let (t, n, name) = split_topic("acme/svc/orders").unwrap();
4768        assert_eq!((t, n, name), ("acme", "svc", "orders"));
4769    }
4770
4771    #[test]
4772    fn split_topic_rejects_short_name() {
4773        assert!(matches!(
4774            split_topic("acme/svc"),
4775            Err(AdminError::InvalidName(_))
4776        ));
4777    }
4778
4779    #[test]
4780    fn split_function_id_ok() {
4781        let (t, n, name) = split_function_id("public/default/my-fn").unwrap();
4782        assert_eq!((t, n, name), ("public", "default", "my-fn"));
4783    }
4784
4785    #[test]
4786    fn split_function_id_rejects_short_name() {
4787        assert!(matches!(
4788            split_function_id("public/default"),
4789            Err(AdminError::InvalidName(_))
4790        ));
4791    }
4792
4793    #[test]
4794    fn split_function_id_rejects_persistent_scheme() {
4795        // Functions never carry a `persistent://` prefix — the parser
4796        // refuses one rather than silently treating the scheme as the
4797        // tenant name (which `validate_segment` would later catch via
4798        // the percent-encoded slash rule, but we want a clearer error
4799        // up front).
4800        assert!(matches!(
4801            split_function_id("persistent://acme/svc/fn"),
4802            Err(AdminError::InvalidName(_))
4803        ));
4804    }
4805
4806    #[test]
4807    fn message_id_response_deserialises_java_camelcase() {
4808        // The exact body shape upstream PIP-415 §"Success Response" advertises.
4809        let json = r#"{"ledgerId":12345,"entryId":67890,"partitionIndex":0}"#;
4810        let dto: MessageIdResponse = serde_json::from_str(json).unwrap();
4811        let msg = dto.try_into_message_id().unwrap();
4812        assert_eq!(msg.ledger_id, 12345);
4813        assert_eq!(msg.entry_id, 67890);
4814        assert_eq!(msg.partition, 0);
4815        // The broker resolves at entry granularity — batch fields are absent
4816        // from the JSON and must default to -1 to match the canonical sentinel.
4817        assert_eq!(msg.batch_index, -1);
4818        assert_eq!(msg.batch_size, -1);
4819    }
4820
4821    #[test]
4822    fn message_id_response_defaults_partition_for_non_partitioned_topic() {
4823        // PIP-415 §"Success Response": `partitionIndex: -1` for non-partitioned
4824        // topics. Some broker versions omit the field entirely on
4825        // non-partitioned topics; serde default keeps us correct in either case.
4826        let json = r#"{"ledgerId":1,"entryId":2}"#;
4827        let dto: MessageIdResponse = serde_json::from_str(json).unwrap();
4828        assert_eq!(dto.try_into_message_id().unwrap().partition, -1);
4829    }
4830
4831    #[test]
4832    fn url_helper_emits_single_slash_after_admin_v2() {
4833        // Regression guard: the previous url() helper appended segments after
4834        // the trailing-slash sentinel of /admin/v2/, producing
4835        // /admin/v2//persistent/... — real brokers tolerated it but strict
4836        // mocks (and Java's PulsarAdmin) emit the single-slash form. Pin the
4837        // current behaviour so we notice any future regression.
4838        let client = AdminClient::builder()
4839            .service_url("http://broker.example:8080".parse().unwrap())
4840            .build()
4841            .unwrap();
4842        let url = client.url(&["clusters"]).unwrap();
4843        assert_eq!(url.as_str(), "http://broker.example:8080/admin/v2/clusters");
4844        let url2 = client
4845            .url(&["persistent", "public", "default", "topic", "stats"])
4846            .unwrap();
4847        assert_eq!(
4848            url2.as_str(),
4849            "http://broker.example:8080/admin/v2/persistent/public/default/topic/stats"
4850        );
4851    }
4852
4853    #[test]
4854    fn url_helper_preserves_path_prefix_without_trailing_slash() {
4855        // Regression guard: per WHATWG URL semantics, `Url::join("admin/v2/")`
4856        // on a base whose path has no trailing slash REPLACES the last segment
4857        // — `http://broker/pulsar` + `admin/v2/` becomes
4858        // `http://broker/admin/v2/`, silently dropping `/pulsar`. The builder
4859        // normalises the base to end in `/` before joining so any operator
4860        // running behind a path-prefixed K8s ingress (`--admin-url
4861        // http://gw/pulsar`) gets the right URLs on both V2 and V3 endpoints.
4862        let client = AdminClient::builder()
4863            .service_url("http://broker.example:8080/pulsar".parse().unwrap())
4864            .build()
4865            .unwrap();
4866        let url = client.url(&["clusters"]).unwrap();
4867        assert_eq!(
4868            url.as_str(),
4869            "http://broker.example:8080/pulsar/admin/v2/clusters"
4870        );
4871        let url_v3 = client.url_v3(&["functions", "a", "b"]).unwrap();
4872        assert_eq!(
4873            url_v3.as_str(),
4874            "http://broker.example:8080/pulsar/admin/v3/functions/a/b"
4875        );
4876    }
4877
4878    #[test]
4879    fn split_topic_rejects_dot_segments() {
4880        // LISA-001: `..` / `.` in any segment would silently normalise out via
4881        // url::Url::path_segments_mut, producing a client/server URL parser
4882        // differential. Refuse them at the input boundary.
4883        assert!(matches!(
4884            split_topic("persistent://../foo/bar"),
4885            Err(AdminError::InvalidName(_))
4886        ));
4887        assert!(matches!(
4888            split_topic("./foo/bar"),
4889            Err(AdminError::InvalidName(_))
4890        ));
4891        assert!(matches!(
4892            split_topic("tenant/./topic"),
4893            Err(AdminError::InvalidName(_))
4894        ));
4895    }
4896
4897    #[test]
4898    fn split_topic_rejects_control_bytes_and_percent_encoded_slash() {
4899        assert!(matches!(
4900            split_topic("tenant/ns/topic%2Fevil"),
4901            Err(AdminError::InvalidName(_))
4902        ));
4903        assert!(matches!(
4904            split_topic("tenant/ns/top\0ic"),
4905            Err(AdminError::InvalidName(_))
4906        ));
4907    }
4908}