Skip to main content

magnetar_admin/
lib.rs

1// SPDX-License-Identifier: Apache-2.0
2
3//! Apache Pulsar admin REST client (`/admin/v2/...`).
4//!
5//! Thin async wrapper around [`reqwest`] for the broker's JAX-RS admin API.
6//! TLS is via `rustls-tls`. There are no channels and no background tasks: every
7//! call is a one-shot `await` that resolves to a [`Result`].
8//!
9//! Endpoint paths mirror the broker. Each method's rustdoc cites the Java
10//! endpoint class (file + relevant `@Path` annotation) in `apache/pulsar` so a
11//! reader can confirm the URL and HTTP verb against upstream.
12//!
13//! ## Quick start
14//!
15//! ```no_run
16//! # async fn run() -> Result<(), Box<dyn std::error::Error>> {
17//! use magnetar_admin::{AdminClient, TenantInfo};
18//!
19//! let admin = AdminClient::builder()
20//!     .service_url("http://localhost:8080".parse()?)
21//!     .build()?;
22//!
23//! let tenants = admin.tenants_list().await?;
24//! println!("{tenants:?}");
25//!
26//! admin
27//!     .tenant_create(
28//!         "acme",
29//!         TenantInfo {
30//!             admin_roles: vec!["admin".into()],
31//!             allowed_clusters: vec!["standalone".into()],
32//!         },
33//!     )
34//!     .await?;
35//! # Ok(()) }
36//! ```
37
38#![warn(unreachable_pub)]
39#![forbid(unsafe_code)]
40
41mod tls_crypto;
42
43use std::collections::HashMap;
44use std::time::Duration;
45
46use magnetar_proto::MessageId;
47use reqwest::header::{AUTHORIZATION, HeaderMap, HeaderValue};
48use reqwest::{Method, RequestBuilder, Response, StatusCode};
49use serde::{Deserialize, Serialize};
50use url::Url;
51
52/// Default request timeout. Mirrors `PulsarAdminBuilder` Java default of 60s
53/// (see `pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/
54/// PulsarAdminBuilderImpl.java`).
55pub const DEFAULT_TIMEOUT: Duration = Duration::from_secs(60);
56
57/// Longer per-call timeout for `*_create_with_url` / `*_update_with_url`
58/// endpoints. The broker fetches the package from the supplied URL
59/// before responding, so the round-trip can comfortably exceed 60 s
60/// against a slow internal registry (S3 / GCS / function://) or a
61/// large `.nar` / `.jar` artifact. Overrides the client-level timeout
62/// for those calls only — every other admin verb keeps the 60 s
63/// budget [`DEFAULT_TIMEOUT`] provides.
64///
65/// 5 min matches the Java admin client's read-timeout for register
66/// paths.
67pub const PACKAGE_REGISTER_TIMEOUT: Duration = Duration::from_secs(300);
68
69/// Authentication strategy used by the admin client.
70///
71/// `Token(...)` adds `Authorization: Bearer <token>` to every request.
72/// Mirrors Java's `AuthenticationToken` provider.
73#[derive(Clone, Default)]
74pub enum AdminAuth {
75    /// No authentication.
76    #[default]
77    None,
78    /// Bearer token. The string is the raw token; the `Bearer ` prefix is added
79    /// at request time.
80    Token(String),
81}
82
83impl std::fmt::Debug for AdminAuth {
84    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
85        // Redact the token body so calling `Debug` on the admin client never
86        // spills the bearer credential to tracing or stdout. Mirrors the
87        // `Credentials`/`ClientCredentialsFlow` Debug redaction in
88        // `magnetar-auth-oauth2`.
89        match self {
90            Self::None => f.write_str("None"),
91            Self::Token(_) => f.debug_tuple("Token").field(&"<redacted>").finish(),
92        }
93    }
94}
95
96/// Apache Pulsar admin REST client.
97///
98/// Holds two pre-computed base URLs: `base_url` anchored at
99/// `/admin/v2/` (clusters / tenants / namespaces / topics /
100/// subscriptions / brokers / bookies / schemas) and `base_url_v3`
101/// anchored at `/admin/v3/` (Pulsar Functions / IO Sources / IO Sinks
102/// / Packages). The split mirrors Pulsar's own routing — Java's
103/// `PulsarAdmin` keeps them separate too. Both URLs are derived from
104/// the same broker root at builder time, so a caller never has to
105/// know which version family an endpoint belongs to.
106#[derive(Debug, Clone)]
107pub struct AdminClient {
108    base_url: Url,
109    base_url_v3: Url,
110    http: reqwest::Client,
111    auth: AdminAuth,
112}
113
114impl AdminClient {
115    /// Start building an admin client.
116    #[must_use]
117    pub fn builder() -> AdminClientBuilder {
118        AdminClientBuilder::default()
119    }
120
121    /// Return the base URL the client targets (with the trailing `/admin/v2/`
122    /// component already appended). Exposed for tests and diagnostics.
123    #[must_use]
124    pub fn base_url(&self) -> &Url {
125        &self.base_url
126    }
127
128    /// Return the configured auth strategy. Exposed for tests and diagnostics.
129    #[must_use]
130    pub fn auth(&self) -> &AdminAuth {
131        &self.auth
132    }
133
134    // --- Cluster ---------------------------------------------------------
135
136    /// List clusters.
137    ///
138    /// `GET /admin/v2/clusters`.
139    /// Java: `pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Clusters.java`
140    /// (`@Path("/clusters")`) + `admin/impl/ClustersBase.java#getClusters`.
141    pub async fn cluster_list(&self) -> Result<Vec<String>, AdminError> {
142        let url = self.url(&["clusters"])?;
143        let resp = self.send(self.http.request(Method::GET, url)).await?;
144        json_ok(resp).await
145    }
146
147    /// List failure-domains configured on a cluster.
148    ///
149    /// `GET /admin/v2/clusters/{cluster}/failureDomains`. The broker returns
150    /// a `Map<String, FailureDomain>` keyed by domain name; each value
151    /// carries a `brokers: Set<String>` member. The map is exposed as a
152    /// raw `serde_json::Value` for forward-compat — broker minor versions
153    /// add fields.
154    /// Java: `ClustersBase#getFailureDomains`.
155    pub async fn cluster_failure_domains_list(
156        &self,
157        cluster: &str,
158    ) -> Result<serde_json::Value, AdminError> {
159        validate_segment(cluster)?;
160        let url = self.url(&["clusters", cluster, "failureDomains"])?;
161        let resp = self.send(self.http.request(Method::GET, url)).await?;
162        json_ok(resp).await
163    }
164
165    /// Get one failure-domain by name.
166    ///
167    /// `GET /admin/v2/clusters/{cluster}/failureDomains/{domain}`.
168    /// Java: `ClustersBase#getDomain`.
169    pub async fn cluster_failure_domain_get(
170        &self,
171        cluster: &str,
172        domain: &str,
173    ) -> Result<serde_json::Value, AdminError> {
174        validate_segment(cluster)?;
175        validate_segment(domain)?;
176        let url = self.url(&["clusters", cluster, "failureDomains", domain])?;
177        let resp = self.send(self.http.request(Method::GET, url)).await?;
178        json_ok(resp).await
179    }
180
181    /// List namespace-isolation policies configured on a cluster.
182    ///
183    /// `GET /admin/v2/clusters/{cluster}/namespaceIsolationPolicies`. The
184    /// broker returns a `Map<String, NamespaceIsolationData>` carrying
185    /// the namespace regex, primary/secondary broker lists, and the
186    /// auto-failover policy. Exposed as raw JSON for forward-compat.
187    ///
188    /// A cluster with no isolation policies configured surfaces as
189    /// `404 NamespaceIsolationPolicies for cluster ... does not exist`
190    /// (Pulsar 4.0.x) rather than an empty map; we mirror the Java
191    /// client's `Map<String, _>` surface by mapping that specific 404 to
192    /// an empty `{}` object. Other 404s (auth, wrong cluster) still
193    /// surface as `AdminError::Status`.
194    /// Java: `ClustersBase#getNamespaceIsolationPolicies`.
195    pub async fn namespace_isolation_policies_list(
196        &self,
197        cluster: &str,
198    ) -> Result<serde_json::Value, AdminError> {
199        validate_segment(cluster)?;
200        let url = self.url(&["clusters", cluster, "namespaceIsolationPolicies"])?;
201        let resp = self.send(self.http.request(Method::GET, url)).await?;
202        match json_ok::<serde_json::Value>(resp).await {
203            Ok(v) => Ok(v),
204            Err(AdminError::Status { code: 404, body })
205                if body.contains("NamespaceIsolationPolicies") =>
206            {
207                Ok(serde_json::Value::Object(serde_json::Map::new()))
208            }
209            Err(e) => Err(e),
210        }
211    }
212
213    // --- Brokers ---------------------------------------------------------
214
215    /// List active brokers in a cluster.
216    ///
217    /// `GET /admin/v2/brokers/{cluster}`. Returns a list of `host:port`
218    /// strings — one entry per broker that's currently registered with
219    /// the cluster's metadata store. Java: `BrokersBase#getActiveBrokers`.
220    pub async fn brokers_list(&self, cluster: &str) -> Result<Vec<String>, AdminError> {
221        validate_segment(cluster)?;
222        let url = self.url(&["brokers", cluster])?;
223        let resp = self.send(self.http.request(Method::GET, url)).await?;
224        json_ok(resp).await
225    }
226
227    /// Get the current leader broker for the cluster.
228    ///
229    /// `GET /admin/v2/brokers/leaderBroker`. Returns `{ serviceUrl,
230    /// brokerId }`. Exposed as raw JSON for forward-compat — newer
231    /// brokers add `clusterName` and similar fields.
232    /// Java: `BrokersBase#getLeaderBroker`.
233    pub async fn brokers_leader(&self) -> Result<serde_json::Value, AdminError> {
234        let url = self.url(&["brokers", "leaderBroker"])?;
235        let resp = self.send(self.http.request(Method::GET, url)).await?;
236        json_ok(resp).await
237    }
238
239    /// List the names of all dynamic-config keys the broker exposes.
240    ///
241    /// `GET /admin/v2/brokers/configuration`. Returns the bare list of
242    /// `ServiceConfiguration` fields tagged `@FieldContext(dynamic = true)`
243    /// — the set of keys that `brokers_set_dynamic_config` accepts. Use
244    /// [`Self::brokers_dynamic_config_overrides`] for the current values.
245    ///
246    /// Pulsar 4 normally returns a JSON array (`List<String>` from
247    /// `BrokerService#getDynamicConfiguration`), but some packaging /
248    /// proxy paths surface the underlying `Map<String, ConfigField>`
249    /// shape instead. We accept both — array → values, object → keys —
250    /// to stay version-tolerant. Java: `BrokersBase#getDynamicConfigurationName`.
251    pub async fn brokers_dynamic_config_keys(&self) -> Result<Vec<String>, AdminError> {
252        let url = self.url(&["brokers", "configuration"])?;
253        let resp = self.send(self.http.request(Method::GET, url)).await?;
254        let v: serde_json::Value = json_ok(resp).await?;
255        match v {
256            serde_json::Value::Array(items) => Ok(items
257                .into_iter()
258                .filter_map(|x| x.as_str().map(str::to_owned))
259                .collect()),
260            serde_json::Value::Object(map) => Ok(map.into_iter().map(|(k, _)| k).collect()),
261            other => Err(AdminError::Protocol(format!(
262                "brokers/configuration returned unexpected shape: {other}"
263            ))),
264        }
265    }
266
267    /// Get the currently-overridden dynamic configuration values.
268    ///
269    /// `GET /admin/v2/brokers/configuration/values`. Returns a
270    /// `Map<String, String>` of every dynamic key the operator has set
271    /// (the broker omits keys still on their static / default value).
272    /// Exposed as raw JSON because broker minor versions add keys.
273    /// Java: `BrokersBase#getAllDynamicConfigurations`.
274    pub async fn brokers_dynamic_config_overrides(&self) -> Result<serde_json::Value, AdminError> {
275        let url = self.url(&["brokers", "configuration", "values"])?;
276        let resp = self.send(self.http.request(Method::GET, url)).await?;
277        json_ok(resp).await
278    }
279
280    /// Get the broker's runtime (merged static + dynamic) configuration.
281    ///
282    /// `GET /admin/v2/brokers/configuration/runtime`. Returns the full
283    /// `Map<String, String>` of `ServiceConfiguration` values as they
284    /// currently apply on the broker process — static defaults
285    /// overlaid with any `brokers_set_dynamic_config` overrides. Raw
286    /// JSON for forward-compat. Java: `BrokersBase#getRuntimeConfiguration`.
287    pub async fn brokers_runtime_config(&self) -> Result<serde_json::Value, AdminError> {
288        let url = self.url(&["brokers", "configuration", "runtime"])?;
289        let resp = self.send(self.http.request(Method::GET, url)).await?;
290        json_ok(resp).await
291    }
292
293    /// Get the broker's internal-stack endpoints.
294    ///
295    /// `GET /admin/v2/brokers/internal-configuration`. Returns the
296    /// `InternalConfigurationData` envelope — metadata-store URLs
297    /// (`zookeeperServers`, `configurationMetadataStoreUrl`),
298    /// `BookKeeper` metadata service URI, ledger root paths. Raw JSON
299    /// for forward-compat; the shape rolls between releases as the
300    /// metadata layer evolves.
301    /// Java: `BrokersBase#getInternalConfigurationData`.
302    pub async fn brokers_internal_config(&self) -> Result<serde_json::Value, AdminError> {
303        let url = self.url(&["brokers", "internal-configuration"])?;
304        let resp = self.send(self.http.request(Method::GET, url)).await?;
305        json_ok(resp).await
306    }
307
308    /// Probe broker health — produces and consumes one heartbeat message
309    /// on an internal topic.
310    ///
311    /// `GET /admin/v2/brokers/health`. The broker returns the plain-text
312    /// string `"ok"` on success; non-200 surfaces as `AdminError::Status`.
313    /// Java: `BrokersBase#healthCheck`.
314    pub async fn brokers_health_check(&self) -> Result<String, AdminError> {
315        let url = self.url(&["brokers", "health"])?;
316        let resp = self.send(self.http.request(Method::GET, url)).await?;
317        let resp = ensure_status(resp).await?;
318        Ok(resp.text().await?)
319    }
320
321    /// List the namespaces a specific broker currently owns.
322    ///
323    /// `GET /admin/v2/brokers/{cluster}/{broker}/ownedNamespaces`. The
324    /// `broker` argument must be the broker's `host:port` (matching the
325    /// strings [`Self::brokers_list`] returns). Returns a
326    /// `Map<String, NamespaceOwnershipStatus>` keyed by namespace name —
327    /// raw JSON for forward-compat.
328    /// Java: `BrokersBase#getOwnedNamespaces`.
329    pub async fn brokers_owned_namespaces(
330        &self,
331        cluster: &str,
332        broker: &str,
333    ) -> Result<serde_json::Value, AdminError> {
334        validate_segment(cluster)?;
335        validate_segment(broker)?;
336        let url = self.url(&["brokers", cluster, broker, "ownedNamespaces"])?;
337        let resp = self.send(self.http.request(Method::GET, url)).await?;
338        json_ok(resp).await
339    }
340
341    /// Override a dynamic broker configuration value.
342    ///
343    /// `POST /admin/v2/brokers/configuration/{name}/{value}`. Both the
344    /// key and the value travel in the URL path — there is no request
345    /// body — matching the broker's `updateDynamicConfiguration(@PathParam
346    /// String configName, @PathParam String configValue)` signature.
347    /// The key must be one of those returned by
348    /// [`Self::brokers_dynamic_config_keys`]; unknown keys yield 412.
349    /// Java: `BrokersBase#updateDynamicConfiguration`.
350    pub async fn brokers_set_dynamic_config(
351        &self,
352        name: &str,
353        value: &str,
354    ) -> Result<(), AdminError> {
355        validate_segment(name)?;
356        validate_segment(value)?;
357        let url = self.url(&["brokers", "configuration", name, value])?;
358        let resp = self.send(self.http.request(Method::POST, url)).await?;
359        empty_ok(resp).await
360    }
361
362    /// Drop a dynamic configuration override, reverting to the static value.
363    ///
364    /// `DELETE /admin/v2/brokers/configuration/{name}`. After the call
365    /// the key disappears from [`Self::brokers_dynamic_config_overrides`]
366    /// and [`Self::brokers_runtime_config`] reflects the underlying
367    /// static / default value again.
368    /// Java: `BrokersBase#deleteDynamicConfiguration`.
369    pub async fn brokers_delete_dynamic_config(&self, name: &str) -> Result<(), AdminError> {
370        validate_segment(name)?;
371        let url = self.url(&["brokers", "configuration", name])?;
372        let resp = self.send(self.http.request(Method::DELETE, url)).await?;
373        empty_ok(resp).await
374    }
375
376    // --- Bookies ---------------------------------------------------------
377
378    /// List every bookie the broker knows about — both writable and
379    /// read-only — as registered in `BookKeeper` metadata.
380    ///
381    /// `GET /admin/v2/bookies/all`. Returns the broker's
382    /// `BookiesClusterInfo` envelope — a `bookies: [{ address: "host:port" }]`
383    /// array. Raw JSON for forward-compat.
384    /// Java: `BookiesBase#getAllAvailableBookies`.
385    pub async fn bookies_list_all(&self) -> Result<serde_json::Value, AdminError> {
386        let url = self.url(&["bookies", "all"])?;
387        let resp = self.send(self.http.request(Method::GET, url)).await?;
388        json_ok(resp).await
389    }
390
391    /// Get every bookie's group + rack assignment, as configured for the
392    /// rack-aware placement policy.
393    ///
394    /// `GET /admin/v2/bookies/racks-info`. Returns the nested
395    /// `Map<group, Map<bookieAddress, BookieInfo>>` shape Pulsar
396    /// persists in metadata. Raw JSON because the wire shape exposes
397    /// nested maps that change between releases (the `default` group
398    /// is implicit on older brokers).
399    /// Java: `BookiesBase#getBookieRackInfo`.
400    pub async fn bookies_racks_info(&self) -> Result<serde_json::Value, AdminError> {
401        let url = self.url(&["bookies", "racks-info"])?;
402        let resp = self.send(self.http.request(Method::GET, url)).await?;
403        json_ok(resp).await
404    }
405
406    /// Set (or update) a bookie's rack assignment.
407    ///
408    /// `POST /admin/v2/bookies/racks-info/{bookie}?group={group}` with
409    /// a JSON [`BookieInfo`] body carrying only `{rack, hostname}`.
410    /// `bookie` is the `host:port` registered in `BookKeeper` metadata.
411    /// The placement policy picks up the new rack on its next
412    /// reconciliation tick.
413    ///
414    /// `group` is **a query parameter**, not a body field — Pulsar's
415    /// `BookiesBase#updateBookieRackInfo(@PathParam("bookie") String,
416    /// @QueryParam("group") String, BookieInfo)` Jackson-binds the body
417    /// to `{rack, hostname}` only; an unknown `group` body field is
418    /// silently ignored and the query param defaults to `null`,
419    /// dropping the operator's group choice on the wire.
420    /// Java: `BookiesBase#updateBookieRackInfo`.
421    pub async fn bookies_set_rack(
422        &self,
423        bookie: &str,
424        group: &str,
425        info: BookieInfo,
426    ) -> Result<(), AdminError> {
427        validate_segment(bookie)?;
428        let mut url = self.url(&["bookies", "racks-info", bookie])?;
429        url.query_pairs_mut().append_pair("group", group);
430        let resp = self
431            .send(self.http.request(Method::POST, url).json(&info))
432            .await?;
433        empty_ok(resp).await
434    }
435
436    /// Remove a bookie's rack assignment.
437    ///
438    /// `DELETE /admin/v2/bookies/racks-info/{bookie}`. The bookie falls
439    /// back to the placement policy's default group / rack until
440    /// [`Self::bookies_set_rack`] is called again.
441    /// Java: `BookiesBase#deleteBookieRackInfo`.
442    pub async fn bookies_delete_rack(&self, bookie: &str) -> Result<(), AdminError> {
443        validate_segment(bookie)?;
444        let url = self.url(&["bookies", "racks-info", bookie])?;
445        let resp = self.send(self.http.request(Method::DELETE, url)).await?;
446        empty_ok(resp).await
447    }
448
449    // --- Schemas ---------------------------------------------------------
450
451    /// Get the latest schema attached to a topic.
452    ///
453    /// `GET /admin/v2/schemas/{tenant}/{ns}/{topic}/schema`. Returns
454    /// `{ version, type, schema, properties, timestamp }`; raw JSON
455    /// because the `type` axis (`AVRO` / `JSON` / `PROTOBUF` /
456    /// `PROTOBUF_NATIVE` / `KEY_VALUE` / `STRING` / `BYTES` / …) is
457    /// open-ended and broker minor versions add keys (deletion
458    /// tombstones surface as `type: "DELETE"` on the GET, for
459    /// instance). Java: `SchemasResourceBase#getSchema`.
460    pub async fn schema_get_latest(&self, topic: &str) -> Result<serde_json::Value, AdminError> {
461        let (tenant, namespace, name) = split_topic(topic)?;
462        let url = self.url(&["schemas", tenant, namespace, name, "schema"])?;
463        let resp = self.send(self.http.request(Method::GET, url)).await?;
464        json_ok(resp).await
465    }
466
467    /// Get a specific schema version attached to a topic.
468    ///
469    /// `GET /admin/v2/schemas/{tenant}/{ns}/{topic}/schema/{version}`.
470    /// `version` is the monotonically-increasing integer the broker
471    /// assigns at registration. Same wire shape as
472    /// [`Self::schema_get_latest`].
473    /// Java: `SchemasResourceBase#getSchema` (with version path param).
474    pub async fn schema_get_version(
475        &self,
476        topic: &str,
477        version: i64,
478    ) -> Result<serde_json::Value, AdminError> {
479        let (tenant, namespace, name) = split_topic(topic)?;
480        let v = version.to_string();
481        let url = self.url(&["schemas", tenant, namespace, name, "schema", &v])?;
482        let resp = self.send(self.http.request(Method::GET, url)).await?;
483        json_ok(resp).await
484    }
485
486    /// List every schema version registered for a topic.
487    ///
488    /// `GET /admin/v2/schemas/{tenant}/{ns}/{topic}/schemas`. Pulsar 4
489    /// wraps the per-version entries in a
490    /// `GetAllVersionsSchemaResponse { getSchemaResponses: [...] }`
491    /// envelope (verified against `apache/pulsar@v4.0.4`
492    /// `SchemasResourceBase#convertToAllVersionsSchemaResponse`). We
493    /// unwrap that envelope at the boundary so callers see the flat
494    /// `Vec<Value>` they expect; a bare-array shape (older or
495    /// alternative serialisations) is still accepted. Raw JSON
496    /// per-entry for forward-compat. Java:
497    /// `SchemasResourceBase#getAllSchemas`.
498    pub async fn schema_list_versions(
499        &self,
500        topic: &str,
501    ) -> Result<Vec<serde_json::Value>, AdminError> {
502        let (tenant, namespace, name) = split_topic(topic)?;
503        let url = self.url(&["schemas", tenant, namespace, name, "schemas"])?;
504        let resp = self.send(self.http.request(Method::GET, url)).await?;
505        let v: serde_json::Value = json_ok(resp).await?;
506        match v {
507            serde_json::Value::Array(items) => Ok(items),
508            serde_json::Value::Object(mut envelope) => {
509                if let Some(serde_json::Value::Array(items)) = envelope.remove("getSchemaResponses")
510                {
511                    return Ok(items);
512                }
513                Err(AdminError::Protocol(format!(
514                    "schemas/.../schemas envelope missing `getSchemaResponses` array: {}",
515                    serde_json::Value::Object(envelope)
516                )))
517            }
518            other => Err(AdminError::Protocol(format!(
519                "schemas/.../schemas returned unexpected shape: {other}"
520            ))),
521        }
522    }
523
524    /// Register a new schema version on a topic.
525    ///
526    /// `POST /admin/v2/schemas/{tenant}/{ns}/{topic}/schema` with a JSON
527    /// [`PostSchemaPayload`] body. The broker returns `{ version: N }`;
528    /// raw JSON because the upstream response envelope wraps the
529    /// version under `data` on some 4.x point releases. Compatibility
530    /// is enforced server-side per the namespace's
531    /// `schemaCompatibilityStrategy` — incompatible posts fail with
532    /// 409. Java: `SchemasResourceBase#postSchema`.
533    pub async fn schema_post(
534        &self,
535        topic: &str,
536        payload: PostSchemaPayload,
537    ) -> Result<serde_json::Value, AdminError> {
538        let (tenant, namespace, name) = split_topic(topic)?;
539        let url = self.url(&["schemas", tenant, namespace, name, "schema"])?;
540        let resp = self
541            .send(self.http.request(Method::POST, url).json(&payload))
542            .await?;
543        json_ok(resp).await
544    }
545
546    /// Delete a topic's schema.
547    ///
548    /// `DELETE /admin/v2/schemas/{tenant}/{ns}/{topic}/schema?force={force}`.
549    /// `force = true` skips the broker's "is the schema in use"
550    /// guard — equivalent to `pulsar-admin schemas delete --force`.
551    /// Java: `SchemasResourceBase#deleteSchema`.
552    pub async fn schema_delete(&self, topic: &str, force: bool) -> Result<(), AdminError> {
553        let (tenant, namespace, name) = split_topic(topic)?;
554        let mut url = self.url(&["schemas", tenant, namespace, name, "schema"])?;
555        url.query_pairs_mut()
556            .append_pair("force", if force { "true" } else { "false" });
557        let resp = self.send(self.http.request(Method::DELETE, url)).await?;
558        empty_ok(resp).await
559    }
560
561    /// Check whether a candidate schema would be compatible with the
562    /// topic's current schema.
563    ///
564    /// `POST /admin/v2/schemas/{tenant}/{ns}/{topic}/compatibility` with
565    /// a JSON [`PostSchemaPayload`] body — the same shape
566    /// [`Self::schema_post`] sends, but the broker only evaluates
567    /// compatibility and never persists. Returns `{ isCompatible:
568    /// bool, schemaCompatibilityStrategy: "..." }`; raw JSON for
569    /// forward-compat.
570    /// Java: `SchemasResourceBase#testCompatibility`.
571    pub async fn schema_compatibility_check(
572        &self,
573        topic: &str,
574        payload: PostSchemaPayload,
575    ) -> Result<serde_json::Value, AdminError> {
576        let (tenant, namespace, name) = split_topic(topic)?;
577        let url = self.url(&["schemas", tenant, namespace, name, "compatibility"])?;
578        let resp = self
579            .send(self.http.request(Method::POST, url).json(&payload))
580            .await?;
581        json_ok(resp).await
582    }
583
584    // --- Pulsar Functions (read) ----------------------------------------
585
586    /// List every function registered under a namespace.
587    ///
588    /// `GET /admin/v3/functions/{tenant}/{namespace}`. Returns a JSON
589    /// array of bare function names (no tenant / namespace prefix) —
590    /// matches Java's `FunctionsBase#listFunctions` body shape.
591    pub async fn functions_list_by_namespace(
592        &self,
593        tenant: &str,
594        namespace: &str,
595    ) -> Result<Vec<String>, AdminError> {
596        validate_segment(tenant)?;
597        validate_segment(namespace)?;
598        let url = self.url_v3(&["functions", tenant, namespace])?;
599        let resp = self.send(self.http.request(Method::GET, url)).await?;
600        json_ok(resp).await
601    }
602
603    /// Get a function's registered `FunctionConfig`.
604    ///
605    /// `GET /admin/v3/functions/{tenant}/{namespace}/{name}`. The
606    /// `FunctionConfig` Java type has ~30 fields and grows on every
607    /// minor release — return raw JSON for forward-compat. Java:
608    /// `FunctionsBase#getFunctionInfo`.
609    pub async fn function_get(
610        &self,
611        tenant: &str,
612        namespace: &str,
613        name: &str,
614    ) -> Result<serde_json::Value, AdminError> {
615        validate_segment(tenant)?;
616        validate_segment(namespace)?;
617        validate_segment(name)?;
618        let url = self.url_v3(&["functions", tenant, namespace, name])?;
619        let resp = self.send(self.http.request(Method::GET, url)).await?;
620        json_ok(resp).await
621    }
622
623    /// Get a function's aggregate status (all instances).
624    ///
625    /// `GET /admin/v3/functions/{tenant}/{namespace}/{name}/status`.
626    /// Returns Java's `FunctionStatus` envelope:
627    /// `{numInstances, numRunning, instances: [...]}`. Raw JSON because
628    /// the per-instance shape carries broker-version-dependent fields.
629    /// Java: `FunctionsBase#getFunctionStatus`.
630    pub async fn function_status(
631        &self,
632        tenant: &str,
633        namespace: &str,
634        name: &str,
635    ) -> Result<serde_json::Value, AdminError> {
636        validate_segment(tenant)?;
637        validate_segment(namespace)?;
638        validate_segment(name)?;
639        let url = self.url_v3(&["functions", tenant, namespace, name, "status"])?;
640        let resp = self.send(self.http.request(Method::GET, url)).await?;
641        json_ok(resp).await
642    }
643
644    /// Get a function's aggregate runtime statistics (all instances).
645    ///
646    /// `GET /admin/v3/functions/{tenant}/{namespace}/{name}/stats`.
647    /// Returns Java's `FunctionStats` envelope — message rates,
648    /// processed counts, average latency, per-instance breakdown. Raw
649    /// JSON for forward-compat. Java: `FunctionsBase#getFunctionStats`.
650    pub async fn function_stats(
651        &self,
652        tenant: &str,
653        namespace: &str,
654        name: &str,
655    ) -> Result<serde_json::Value, AdminError> {
656        validate_segment(tenant)?;
657        validate_segment(namespace)?;
658        validate_segment(name)?;
659        let url = self.url_v3(&["functions", tenant, namespace, name, "stats"])?;
660        let resp = self.send(self.http.request(Method::GET, url)).await?;
661        json_ok(resp).await
662    }
663
664    /// Get one instance's status.
665    ///
666    /// `GET /admin/v3/functions/{tenant}/{namespace}/{name}/{instance_id}/status`.
667    /// `instance_id` is the integer index the broker assigns at
668    /// schedule time (`0..parallelism`). Java:
669    /// `FunctionsBase#getFunctionInstanceStatus`.
670    pub async fn function_instance_status(
671        &self,
672        tenant: &str,
673        namespace: &str,
674        name: &str,
675        instance_id: i32,
676    ) -> Result<serde_json::Value, AdminError> {
677        validate_segment(tenant)?;
678        validate_segment(namespace)?;
679        validate_segment(name)?;
680        let id = instance_id.to_string();
681        let url = self.url_v3(&["functions", tenant, namespace, name, &id, "status"])?;
682        let resp = self.send(self.http.request(Method::GET, url)).await?;
683        json_ok(resp).await
684    }
685
686    /// Get one instance's runtime statistics.
687    ///
688    /// `GET /admin/v3/functions/{tenant}/{namespace}/{name}/{instance_id}/stats`.
689    /// Java: `FunctionsBase#getFunctionInstanceStats`.
690    pub async fn function_instance_stats(
691        &self,
692        tenant: &str,
693        namespace: &str,
694        name: &str,
695        instance_id: i32,
696    ) -> Result<serde_json::Value, AdminError> {
697        validate_segment(tenant)?;
698        validate_segment(namespace)?;
699        validate_segment(name)?;
700        let id = instance_id.to_string();
701        let url = self.url_v3(&["functions", tenant, namespace, name, &id, "stats"])?;
702        let resp = self.send(self.http.request(Method::GET, url)).await?;
703        json_ok(resp).await
704    }
705
706    // --- Pulsar Functions (URL-based register / update) ----------------
707
708    /// Register a function from a remote package URL.
709    ///
710    /// `POST /admin/v3/functions/{tenant}/{namespace}/{name}` with a
711    /// `multipart/form-data` body carrying two parts: `url=<pkg-url>`
712    /// (the broker-resolvable HTTP / `function://` / `file://` URL of
713    /// the compiled package) and `functionConfig=<json>` (the
714    /// serialised [`FunctionConfig`]). The local-file upload path
715    /// (Java's `FunctionsBase#registerFunction` with a
716    /// `FormDataMultiPart` `data` part) is intentionally out of scope
717    /// for this method — operators with a pre-built JAR served over
718    /// HTTP / S3 / GCS use the URL path.
719    ///
720    /// The two-part envelope matches Java's
721    /// `FunctionsBase#registerFunction(@PathParam tenant, ...,
722    /// @FormDataParam("url") String functionPkgUrl,
723    /// @FormDataParam("functionConfig") FunctionConfig functionConfig)`
724    /// — when `data` is null and `url` is non-null the broker takes the
725    /// URL fast-path and skips the upload step.
726    pub async fn function_create_with_url(
727        &self,
728        tenant: &str,
729        namespace: &str,
730        name: &str,
731        url: &str,
732        config: FunctionConfig,
733    ) -> Result<(), AdminError> {
734        validate_segment(tenant)?;
735        validate_segment(namespace)?;
736        validate_segment(name)?;
737        let endpoint = self.url_v3(&["functions", tenant, namespace, name])?;
738        let form = function_pkg_form(url, &config)?;
739        let resp = self
740            .send(
741                self.http
742                    .request(Method::POST, endpoint)
743                    .multipart(form)
744                    .timeout(PACKAGE_REGISTER_TIMEOUT),
745            )
746            .await?;
747        empty_ok(resp).await
748    }
749
750    /// Update an existing function from a remote package URL.
751    ///
752    /// `PUT /admin/v3/functions/{tenant}/{namespace}/{name}` with the
753    /// same two-part `multipart/form-data` shape as
754    /// [`Self::function_create_with_url`]. Java:
755    /// `FunctionsBase#updateFunction` with non-null `pkgUrl`.
756    pub async fn function_update_with_url(
757        &self,
758        tenant: &str,
759        namespace: &str,
760        name: &str,
761        url: &str,
762        config: FunctionConfig,
763    ) -> Result<(), AdminError> {
764        validate_segment(tenant)?;
765        validate_segment(namespace)?;
766        validate_segment(name)?;
767        let endpoint = self.url_v3(&["functions", tenant, namespace, name])?;
768        let form = function_pkg_form(url, &config)?;
769        let resp = self
770            .send(
771                self.http
772                    .request(Method::PUT, endpoint)
773                    .multipart(form)
774                    .timeout(PACKAGE_REGISTER_TIMEOUT),
775            )
776            .await?;
777        empty_ok(resp).await
778    }
779
780    // --- Pulsar Functions (lifecycle) -----------------------------------
781
782    /// Deregister (delete) a function.
783    ///
784    /// `DELETE /admin/v3/functions/{tenant}/{namespace}/{name}`. The
785    /// broker stops every running instance and drops the
786    /// `FunctionConfig` from metadata. Java:
787    /// `FunctionsBase#deregisterFunction`.
788    pub async fn function_delete(
789        &self,
790        tenant: &str,
791        namespace: &str,
792        name: &str,
793    ) -> Result<(), AdminError> {
794        validate_segment(tenant)?;
795        validate_segment(namespace)?;
796        validate_segment(name)?;
797        let url = self.url_v3(&["functions", tenant, namespace, name])?;
798        let resp = self.send(self.http.request(Method::DELETE, url)).await?;
799        empty_ok(resp).await
800    }
801
802    /// Start every instance of a function (idempotent).
803    ///
804    /// `POST /admin/v3/functions/{tenant}/{namespace}/{name}/start`. No
805    /// body. Java: `FunctionsBase#startFunction`.
806    pub async fn function_start(
807        &self,
808        tenant: &str,
809        namespace: &str,
810        name: &str,
811    ) -> Result<(), AdminError> {
812        validate_segment(tenant)?;
813        validate_segment(namespace)?;
814        validate_segment(name)?;
815        let url = self.url_v3(&["functions", tenant, namespace, name, "start"])?;
816        let resp = self.send(self.http.request(Method::POST, url)).await?;
817        empty_ok(resp).await
818    }
819
820    /// Stop every instance of a function.
821    ///
822    /// `POST /admin/v3/functions/{tenant}/{namespace}/{name}/stop`. The
823    /// broker leaves the `FunctionConfig` in metadata; a subsequent
824    /// `function_start` brings it back. Java:
825    /// `FunctionsBase#stopFunction`.
826    pub async fn function_stop(
827        &self,
828        tenant: &str,
829        namespace: &str,
830        name: &str,
831    ) -> Result<(), AdminError> {
832        validate_segment(tenant)?;
833        validate_segment(namespace)?;
834        validate_segment(name)?;
835        let url = self.url_v3(&["functions", tenant, namespace, name, "stop"])?;
836        let resp = self.send(self.http.request(Method::POST, url)).await?;
837        empty_ok(resp).await
838    }
839
840    /// Restart every instance of a function.
841    ///
842    /// `POST /admin/v3/functions/{tenant}/{namespace}/{name}/restart`.
843    /// Java: `FunctionsBase#restartFunction`.
844    pub async fn function_restart(
845        &self,
846        tenant: &str,
847        namespace: &str,
848        name: &str,
849    ) -> Result<(), AdminError> {
850        validate_segment(tenant)?;
851        validate_segment(namespace)?;
852        validate_segment(name)?;
853        let url = self.url_v3(&["functions", tenant, namespace, name, "restart"])?;
854        let resp = self.send(self.http.request(Method::POST, url)).await?;
855        empty_ok(resp).await
856    }
857
858    /// Start one specific instance.
859    ///
860    /// `POST /admin/v3/functions/{tenant}/{namespace}/{name}/{instance_id}/start`.
861    /// Java: `FunctionsBase#startFunctionInstance`.
862    pub async fn function_start_instance(
863        &self,
864        tenant: &str,
865        namespace: &str,
866        name: &str,
867        instance_id: i32,
868    ) -> Result<(), AdminError> {
869        validate_segment(tenant)?;
870        validate_segment(namespace)?;
871        validate_segment(name)?;
872        let id = instance_id.to_string();
873        let url = self.url_v3(&["functions", tenant, namespace, name, &id, "start"])?;
874        let resp = self.send(self.http.request(Method::POST, url)).await?;
875        empty_ok(resp).await
876    }
877
878    /// Stop one specific instance.
879    ///
880    /// `POST /admin/v3/functions/{tenant}/{namespace}/{name}/{instance_id}/stop`.
881    /// Java: `FunctionsBase#stopFunctionInstance`.
882    pub async fn function_stop_instance(
883        &self,
884        tenant: &str,
885        namespace: &str,
886        name: &str,
887        instance_id: i32,
888    ) -> Result<(), AdminError> {
889        validate_segment(tenant)?;
890        validate_segment(namespace)?;
891        validate_segment(name)?;
892        let id = instance_id.to_string();
893        let url = self.url_v3(&["functions", tenant, namespace, name, &id, "stop"])?;
894        let resp = self.send(self.http.request(Method::POST, url)).await?;
895        empty_ok(resp).await
896    }
897
898    // --- Tenants ---------------------------------------------------------
899
900    /// List tenants.
901    ///
902    /// `GET /admin/v2/tenants`.
903    /// Java: `pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Tenants.java`
904    /// (`@Path("/tenants")`) + `admin/impl/TenantsBase.java#getTenants`.
905    pub async fn tenants_list(&self) -> Result<Vec<String>, AdminError> {
906        let url = self.url(&["tenants"])?;
907        let resp = self.send(self.http.request(Method::GET, url)).await?;
908        json_ok(resp).await
909    }
910
911    /// Create a tenant.
912    ///
913    /// `PUT /admin/v2/tenants/{tenant}` with a JSON [`TenantInfo`] body.
914    /// Java: `pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantsBase.java#
915    /// createTenant`.
916    pub async fn tenant_create(&self, name: &str, info: TenantInfo) -> Result<(), AdminError> {
917        let url = self.url(&["tenants", name])?;
918        let resp = self
919            .send(self.http.request(Method::PUT, url).json(&info))
920            .await?;
921        empty_ok(resp).await
922    }
923
924    /// Delete a tenant.
925    ///
926    /// `DELETE /admin/v2/tenants/{tenant}`.
927    /// Java: `pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantsBase.java#
928    /// deleteTenant`.
929    pub async fn tenant_delete(&self, name: &str) -> Result<(), AdminError> {
930        let url = self.url(&["tenants", name])?;
931        let resp = self.send(self.http.request(Method::DELETE, url)).await?;
932        empty_ok(resp).await
933    }
934
935    // --- Namespaces ------------------------------------------------------
936
937    /// List namespaces under a tenant.
938    ///
939    /// `GET /admin/v2/namespaces/{tenant}`.
940    /// Java: `pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java`
941    /// (`@Path("/namespaces")` + `@Path("/{tenant}")`).
942    pub async fn namespaces_list(&self, tenant: &str) -> Result<Vec<String>, AdminError> {
943        let url = self.url(&["namespaces", tenant])?;
944        let resp = self.send(self.http.request(Method::GET, url)).await?;
945        json_ok(resp).await
946    }
947
948    /// Create a namespace.
949    ///
950    /// `PUT /admin/v2/namespaces/{tenant}/{namespace}`. The namespace argument
951    /// is `tenant/namespace`, matching how Pulsar expresses fully qualified
952    /// namespace names on the wire and CLI.
953    /// Java: `pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java`
954    /// (`@PUT @Path("/{tenant}/{namespace}")`).
955    pub async fn namespace_create(&self, ns: &str) -> Result<(), AdminError> {
956        let (tenant, namespace) = split_namespace(ns)?;
957        let url = self.url(&["namespaces", tenant, namespace])?;
958        let resp = self.send(self.http.request(Method::PUT, url)).await?;
959        empty_ok(resp).await
960    }
961
962    /// Delete a namespace.
963    ///
964    /// `DELETE /admin/v2/namespaces/{tenant}/{namespace}`.
965    /// Java: `pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java`
966    /// (`@DELETE @Path("/{tenant}/{namespace}")`).
967    pub async fn namespace_delete(&self, ns: &str) -> Result<(), AdminError> {
968        let (tenant, namespace) = split_namespace(ns)?;
969        let url = self.url(&["namespaces", tenant, namespace])?;
970        let resp = self.send(self.http.request(Method::DELETE, url)).await?;
971        empty_ok(resp).await
972    }
973
974    /// Get a namespace's retention policy.
975    ///
976    /// `GET /admin/v2/namespaces/{tenant}/{ns}/retention`.
977    /// Returns `RetentionPolicies { retentionTimeInMinutes, retentionSizeInMB }`.
978    /// A fresh namespace, or a namespace whose retention was just
979    /// removed, surfaces as 204 / empty body / `null` — we fold those
980    /// to `RetentionPolicies::default()` (broker semantic).
981    /// Java: `NamespacesBase#getRetention`.
982    pub async fn namespace_get_retention(&self, ns: &str) -> Result<RetentionPolicies, AdminError> {
983        let (tenant, namespace) = split_namespace(ns)?;
984        let url = self.url(&["namespaces", tenant, namespace, "retention"])?;
985        let resp = self.send(self.http.request(Method::GET, url)).await?;
986        json_ok_or_default(resp).await
987    }
988
989    /// Set a namespace's retention policy.
990    ///
991    /// `POST /admin/v2/namespaces/{tenant}/{ns}/retention` with a JSON
992    /// `RetentionPolicies` body. `-1` means infinite (size or time).
993    /// Java: `NamespacesBase#setRetention`.
994    pub async fn namespace_set_retention(
995        &self,
996        ns: &str,
997        policy: RetentionPolicies,
998    ) -> Result<(), AdminError> {
999        let (tenant, namespace) = split_namespace(ns)?;
1000        let url = self.url(&["namespaces", tenant, namespace, "retention"])?;
1001        let resp = self
1002            .send(self.http.request(Method::POST, url).json(&policy))
1003            .await?;
1004        empty_ok(resp).await
1005    }
1006
1007    /// Remove a namespace's retention policy (fall back to broker default).
1008    ///
1009    /// `DELETE /admin/v2/namespaces/{tenant}/{ns}/retention`.
1010    /// Java: `NamespacesBase#removeRetention`.
1011    pub async fn namespace_remove_retention(&self, ns: &str) -> Result<(), AdminError> {
1012        let (tenant, namespace) = split_namespace(ns)?;
1013        let url = self.url(&["namespaces", tenant, namespace, "retention"])?;
1014        let resp = self.send(self.http.request(Method::DELETE, url)).await?;
1015        empty_ok(resp).await
1016    }
1017
1018    /// Get all backlog-quota policies on a namespace.
1019    ///
1020    /// `GET /admin/v2/namespaces/{tenant}/{ns}/backlogQuotaMap`. Returns
1021    /// `Map<BacklogQuotaType, BacklogQuota>` — kept as raw JSON because
1022    /// broker versions add quota types (`message_age` since 2.10).
1023    /// Java: `NamespacesBase#getBacklogQuotaMap`.
1024    pub async fn namespace_get_backlog_quotas(
1025        &self,
1026        ns: &str,
1027    ) -> Result<serde_json::Value, AdminError> {
1028        let (tenant, namespace) = split_namespace(ns)?;
1029        let url = self.url(&["namespaces", tenant, namespace, "backlogQuotaMap"])?;
1030        let resp = self.send(self.http.request(Method::GET, url)).await?;
1031        json_ok(resp).await
1032    }
1033
1034    /// Set a backlog-quota policy on a namespace.
1035    ///
1036    /// `POST /admin/v2/namespaces/{tenant}/{ns}/backlogQuota?backlogQuotaType={type}`
1037    /// with a JSON `BacklogQuota` body. `backlog_quota_type` selects which
1038    /// dimension to limit (`destination_storage` for byte size, `message_age`
1039    /// for wall-clock TTL).
1040    /// Java: `NamespacesBase#setBacklogQuota`.
1041    pub async fn namespace_set_backlog_quota(
1042        &self,
1043        ns: &str,
1044        backlog_quota_type: BacklogQuotaType,
1045        quota: BacklogQuota,
1046    ) -> Result<(), AdminError> {
1047        let (tenant, namespace) = split_namespace(ns)?;
1048        let mut url = self.url(&["namespaces", tenant, namespace, "backlogQuota"])?;
1049        url.query_pairs_mut()
1050            .append_pair("backlogQuotaType", backlog_quota_type.as_query_value());
1051        let resp = self
1052            .send(self.http.request(Method::POST, url).json(&quota))
1053            .await?;
1054        empty_ok(resp).await
1055    }
1056
1057    /// Remove a backlog-quota policy from a namespace.
1058    ///
1059    /// `DELETE /admin/v2/namespaces/{tenant}/{ns}/backlogQuota?backlogQuotaType={type}`.
1060    /// Java: `NamespacesBase#removeBacklogQuota`.
1061    pub async fn namespace_remove_backlog_quota(
1062        &self,
1063        ns: &str,
1064        backlog_quota_type: BacklogQuotaType,
1065    ) -> Result<(), AdminError> {
1066        let (tenant, namespace) = split_namespace(ns)?;
1067        let mut url = self.url(&["namespaces", tenant, namespace, "backlogQuota"])?;
1068        url.query_pairs_mut()
1069            .append_pair("backlogQuotaType", backlog_quota_type.as_query_value());
1070        let resp = self.send(self.http.request(Method::DELETE, url)).await?;
1071        empty_ok(resp).await
1072    }
1073
1074    /// Get a namespace's message-TTL (seconds).
1075    ///
1076    /// `GET /admin/v2/namespaces/{tenant}/{ns}/messageTTL`. Returns a
1077    /// bare integer (or `null` if no TTL is set — which decodes as
1078    /// `Option::None`).
1079    /// Java: `NamespacesBase#getNamespaceMessageTTL`.
1080    pub async fn namespace_get_message_ttl(&self, ns: &str) -> Result<Option<i32>, AdminError> {
1081        let (tenant, namespace) = split_namespace(ns)?;
1082        let url = self.url(&["namespaces", tenant, namespace, "messageTTL"])?;
1083        let resp = self.send(self.http.request(Method::GET, url)).await?;
1084        json_ok_optional(resp).await
1085    }
1086
1087    /// Set a namespace's message-TTL (seconds).
1088    ///
1089    /// `POST /admin/v2/namespaces/{tenant}/{ns}/messageTTL` with a bare
1090    /// integer body. `0` disables (broker treats as no TTL).
1091    /// Java: `NamespacesBase#setNamespaceMessageTTL`.
1092    pub async fn namespace_set_message_ttl(
1093        &self,
1094        ns: &str,
1095        ttl_seconds: i32,
1096    ) -> Result<(), AdminError> {
1097        let (tenant, namespace) = split_namespace(ns)?;
1098        let url = self.url(&["namespaces", tenant, namespace, "messageTTL"])?;
1099        let resp = self
1100            .send(self.http.request(Method::POST, url).json(&ttl_seconds))
1101            .await?;
1102        empty_ok(resp).await
1103    }
1104
1105    /// Remove a namespace's message-TTL (fall back to broker default).
1106    ///
1107    /// `DELETE /admin/v2/namespaces/{tenant}/{ns}/messageTTL`.
1108    /// Java: `NamespacesBase#removeNamespaceMessageTTL`.
1109    pub async fn namespace_remove_message_ttl(&self, ns: &str) -> Result<(), AdminError> {
1110        let (tenant, namespace) = split_namespace(ns)?;
1111        let url = self.url(&["namespaces", tenant, namespace, "messageTTL"])?;
1112        let resp = self.send(self.http.request(Method::DELETE, url)).await?;
1113        empty_ok(resp).await
1114    }
1115
1116    // --- Namespace policies — persistence + rates ----------------------
1117
1118    /// Get a namespace's persistence policy.
1119    ///
1120    /// `GET /admin/v2/namespaces/{tenant}/{ns}/persistence`. Returns the
1121    /// `BookKeeper` ensemble / write-quorum / ack-quorum triple plus the
1122    /// managed-ledger mark-delete rate cap. `null` body decodes to
1123    /// `PersistencePolicies::default()` via `#[serde(default)]`.
1124    /// Java: `NamespacesBase#getPersistence`.
1125    pub async fn namespace_get_persistence(
1126        &self,
1127        ns: &str,
1128    ) -> Result<PersistencePolicies, AdminError> {
1129        let (tenant, namespace) = split_namespace(ns)?;
1130        let url = self.url(&["namespaces", tenant, namespace, "persistence"])?;
1131        let resp = self.send(self.http.request(Method::GET, url)).await?;
1132        json_ok_or_default(resp).await
1133    }
1134
1135    /// Set a namespace's persistence policy.
1136    ///
1137    /// `POST /admin/v2/namespaces/{tenant}/{ns}/persistence` with a JSON
1138    /// `PersistencePolicies` body.
1139    /// Java: `NamespacesBase#setPersistence`.
1140    pub async fn namespace_set_persistence(
1141        &self,
1142        ns: &str,
1143        policy: PersistencePolicies,
1144    ) -> Result<(), AdminError> {
1145        let (tenant, namespace) = split_namespace(ns)?;
1146        let url = self.url(&["namespaces", tenant, namespace, "persistence"])?;
1147        let resp = self
1148            .send(self.http.request(Method::POST, url).json(&policy))
1149            .await?;
1150        empty_ok(resp).await
1151    }
1152
1153    /// Remove a namespace's persistence policy (fall back to broker default).
1154    ///
1155    /// `DELETE /admin/v2/namespaces/{tenant}/{ns}/persistence`.
1156    /// Java: `NamespacesBase#deletePersistence`.
1157    pub async fn namespace_remove_persistence(&self, ns: &str) -> Result<(), AdminError> {
1158        let (tenant, namespace) = split_namespace(ns)?;
1159        let url = self.url(&["namespaces", tenant, namespace, "persistence"])?;
1160        let resp = self.send(self.http.request(Method::DELETE, url)).await?;
1161        empty_ok(resp).await
1162    }
1163
1164    /// Get a namespace's consumer dispatch-rate policy.
1165    ///
1166    /// `GET /admin/v2/namespaces/{tenant}/{ns}/dispatchRate`. Returns
1167    /// the per-namespace consumer-dispatch throttle (msg/sec, byte/sec,
1168    /// window in seconds). `-1` on either dimension means unlimited.
1169    /// Java: `NamespacesBase#getDispatchRate`.
1170    pub async fn namespace_get_dispatch_rate(&self, ns: &str) -> Result<DispatchRate, AdminError> {
1171        let (tenant, namespace) = split_namespace(ns)?;
1172        let url = self.url(&["namespaces", tenant, namespace, "dispatchRate"])?;
1173        let resp = self.send(self.http.request(Method::GET, url)).await?;
1174        json_ok_or_default(resp).await
1175    }
1176
1177    /// Set a namespace's consumer dispatch-rate policy.
1178    ///
1179    /// `POST /admin/v2/namespaces/{tenant}/{ns}/dispatchRate` with a
1180    /// JSON `DispatchRate` body.
1181    /// Java: `NamespacesBase#setDispatchRate`.
1182    pub async fn namespace_set_dispatch_rate(
1183        &self,
1184        ns: &str,
1185        rate: DispatchRate,
1186    ) -> Result<(), AdminError> {
1187        let (tenant, namespace) = split_namespace(ns)?;
1188        let url = self.url(&["namespaces", tenant, namespace, "dispatchRate"])?;
1189        let resp = self
1190            .send(self.http.request(Method::POST, url).json(&rate))
1191            .await?;
1192        empty_ok(resp).await
1193    }
1194
1195    /// Remove a namespace's consumer dispatch-rate policy.
1196    ///
1197    /// `DELETE /admin/v2/namespaces/{tenant}/{ns}/dispatchRate`.
1198    /// Java: `NamespacesBase#deleteDispatchRate`.
1199    pub async fn namespace_remove_dispatch_rate(&self, ns: &str) -> Result<(), AdminError> {
1200        let (tenant, namespace) = split_namespace(ns)?;
1201        let url = self.url(&["namespaces", tenant, namespace, "dispatchRate"])?;
1202        let resp = self.send(self.http.request(Method::DELETE, url)).await?;
1203        empty_ok(resp).await
1204    }
1205
1206    /// Get a namespace's per-subscription dispatch-rate policy.
1207    ///
1208    /// `GET /admin/v2/namespaces/{tenant}/{ns}/subscriptionDispatchRate`.
1209    /// Reuses the [`DispatchRate`] body shape — the policy applies per
1210    /// subscription rather than aggregated across all consumers.
1211    /// Java: `NamespacesBase#getSubscriptionDispatchRate`.
1212    pub async fn namespace_get_subscription_dispatch_rate(
1213        &self,
1214        ns: &str,
1215    ) -> Result<DispatchRate, AdminError> {
1216        let (tenant, namespace) = split_namespace(ns)?;
1217        let url = self.url(&["namespaces", tenant, namespace, "subscriptionDispatchRate"])?;
1218        let resp = self.send(self.http.request(Method::GET, url)).await?;
1219        json_ok(resp).await
1220    }
1221
1222    /// Set a namespace's per-subscription dispatch-rate policy.
1223    ///
1224    /// `POST /admin/v2/namespaces/{tenant}/{ns}/subscriptionDispatchRate`
1225    /// with a JSON `DispatchRate` body.
1226    /// Java: `NamespacesBase#setSubscriptionDispatchRate`.
1227    pub async fn namespace_set_subscription_dispatch_rate(
1228        &self,
1229        ns: &str,
1230        rate: DispatchRate,
1231    ) -> Result<(), AdminError> {
1232        let (tenant, namespace) = split_namespace(ns)?;
1233        let url = self.url(&["namespaces", tenant, namespace, "subscriptionDispatchRate"])?;
1234        let resp = self
1235            .send(self.http.request(Method::POST, url).json(&rate))
1236            .await?;
1237        empty_ok(resp).await
1238    }
1239
1240    /// Remove a namespace's per-subscription dispatch-rate policy.
1241    ///
1242    /// `DELETE /admin/v2/namespaces/{tenant}/{ns}/subscriptionDispatchRate`.
1243    /// Java: `NamespacesBase#deleteSubscriptionDispatchRate`.
1244    pub async fn namespace_remove_subscription_dispatch_rate(
1245        &self,
1246        ns: &str,
1247    ) -> Result<(), AdminError> {
1248        let (tenant, namespace) = split_namespace(ns)?;
1249        let url = self.url(&["namespaces", tenant, namespace, "subscriptionDispatchRate"])?;
1250        let resp = self.send(self.http.request(Method::DELETE, url)).await?;
1251        empty_ok(resp).await
1252    }
1253
1254    /// Get a namespace's cross-cluster replicator dispatch-rate policy.
1255    ///
1256    /// `GET /admin/v2/namespaces/{tenant}/{ns}/replicatorDispatchRate`.
1257    /// Reuses the [`DispatchRate`] body shape — the policy throttles
1258    /// outbound geo-replication traffic from this cluster.
1259    /// Java: `NamespacesBase#getReplicatorDispatchRate`.
1260    pub async fn namespace_get_replicator_dispatch_rate(
1261        &self,
1262        ns: &str,
1263    ) -> Result<DispatchRate, AdminError> {
1264        let (tenant, namespace) = split_namespace(ns)?;
1265        let url = self.url(&["namespaces", tenant, namespace, "replicatorDispatchRate"])?;
1266        let resp = self.send(self.http.request(Method::GET, url)).await?;
1267        json_ok(resp).await
1268    }
1269
1270    /// Set a namespace's cross-cluster replicator dispatch-rate policy.
1271    ///
1272    /// `POST /admin/v2/namespaces/{tenant}/{ns}/replicatorDispatchRate`
1273    /// with a JSON `DispatchRate` body.
1274    /// Java: `NamespacesBase#setReplicatorDispatchRate`.
1275    pub async fn namespace_set_replicator_dispatch_rate(
1276        &self,
1277        ns: &str,
1278        rate: DispatchRate,
1279    ) -> Result<(), AdminError> {
1280        let (tenant, namespace) = split_namespace(ns)?;
1281        let url = self.url(&["namespaces", tenant, namespace, "replicatorDispatchRate"])?;
1282        let resp = self
1283            .send(self.http.request(Method::POST, url).json(&rate))
1284            .await?;
1285        empty_ok(resp).await
1286    }
1287
1288    /// Remove a namespace's cross-cluster replicator dispatch-rate policy.
1289    ///
1290    /// `DELETE /admin/v2/namespaces/{tenant}/{ns}/replicatorDispatchRate`.
1291    /// Java: `NamespacesBase#removeReplicatorDispatchRate`.
1292    pub async fn namespace_remove_replicator_dispatch_rate(
1293        &self,
1294        ns: &str,
1295    ) -> Result<(), AdminError> {
1296        let (tenant, namespace) = split_namespace(ns)?;
1297        let url = self.url(&["namespaces", tenant, namespace, "replicatorDispatchRate"])?;
1298        let resp = self.send(self.http.request(Method::DELETE, url)).await?;
1299        empty_ok(resp).await
1300    }
1301
1302    /// Get a namespace's publish-rate policy.
1303    ///
1304    /// `GET /admin/v2/namespaces/{tenant}/{ns}/publishRate`. Returns
1305    /// the producer-side throttle (msg/sec + byte/sec). `-1` on either
1306    /// dimension means unlimited.
1307    /// Java: `NamespacesBase#getPublishRate`.
1308    pub async fn namespace_get_publish_rate(&self, ns: &str) -> Result<PublishRate, AdminError> {
1309        let (tenant, namespace) = split_namespace(ns)?;
1310        let url = self.url(&["namespaces", tenant, namespace, "publishRate"])?;
1311        let resp = self.send(self.http.request(Method::GET, url)).await?;
1312        json_ok_or_default(resp).await
1313    }
1314
1315    /// Set a namespace's publish-rate policy.
1316    ///
1317    /// `POST /admin/v2/namespaces/{tenant}/{ns}/publishRate` with a JSON
1318    /// `PublishRate` body.
1319    /// Java: `NamespacesBase#setPublishRate`.
1320    pub async fn namespace_set_publish_rate(
1321        &self,
1322        ns: &str,
1323        rate: PublishRate,
1324    ) -> Result<(), AdminError> {
1325        let (tenant, namespace) = split_namespace(ns)?;
1326        let url = self.url(&["namespaces", tenant, namespace, "publishRate"])?;
1327        let resp = self
1328            .send(self.http.request(Method::POST, url).json(&rate))
1329            .await?;
1330        empty_ok(resp).await
1331    }
1332
1333    /// Remove a namespace's publish-rate policy.
1334    ///
1335    /// `DELETE /admin/v2/namespaces/{tenant}/{ns}/publishRate`.
1336    /// Java: `NamespacesBase#removePublishRate`.
1337    pub async fn namespace_remove_publish_rate(&self, ns: &str) -> Result<(), AdminError> {
1338        let (tenant, namespace) = split_namespace(ns)?;
1339        let url = self.url(&["namespaces", tenant, namespace, "publishRate"])?;
1340        let resp = self.send(self.http.request(Method::DELETE, url)).await?;
1341        empty_ok(resp).await
1342    }
1343
1344    // --- Namespace policies — limits + dedup + delayed delivery -----
1345
1346    /// Get a namespace's broker-side message deduplication flag.
1347    ///
1348    /// `GET /admin/v2/namespaces/{tenant}/{ns}/deduplication`. Returns a
1349    /// bare JSON boolean, or `null` (decoded as `None`) when the policy
1350    /// is unset and the broker default applies.
1351    /// Java: `NamespacesBase#getDeduplication`.
1352    pub async fn namespace_get_deduplication(&self, ns: &str) -> Result<Option<bool>, AdminError> {
1353        let (tenant, namespace) = split_namespace(ns)?;
1354        let url = self.url(&["namespaces", tenant, namespace, "deduplication"])?;
1355        let resp = self.send(self.http.request(Method::GET, url)).await?;
1356        json_ok_optional(resp).await
1357    }
1358
1359    /// Set a namespace's broker-side message deduplication flag.
1360    ///
1361    /// `POST /admin/v2/namespaces/{tenant}/{ns}/deduplication` with a
1362    /// bare JSON boolean body.
1363    /// Java: `NamespacesBase#modifyDeduplication`.
1364    pub async fn namespace_set_deduplication(
1365        &self,
1366        ns: &str,
1367        enabled: bool,
1368    ) -> Result<(), AdminError> {
1369        let (tenant, namespace) = split_namespace(ns)?;
1370        let url = self.url(&["namespaces", tenant, namespace, "deduplication"])?;
1371        let resp = self
1372            .send(self.http.request(Method::POST, url).json(&enabled))
1373            .await?;
1374        empty_ok(resp).await
1375    }
1376
1377    /// Remove a namespace's deduplication flag (fall back to broker default).
1378    ///
1379    /// `DELETE /admin/v2/namespaces/{tenant}/{ns}/deduplication`.
1380    /// Java: `NamespacesBase#removeDeduplication`.
1381    pub async fn namespace_remove_deduplication(&self, ns: &str) -> Result<(), AdminError> {
1382        let (tenant, namespace) = split_namespace(ns)?;
1383        let url = self.url(&["namespaces", tenant, namespace, "deduplication"])?;
1384        let resp = self.send(self.http.request(Method::DELETE, url)).await?;
1385        empty_ok(resp).await
1386    }
1387
1388    /// Get a namespace's deduplication-snapshot interval (entries).
1389    ///
1390    /// `GET /admin/v2/namespaces/{tenant}/{ns}/deduplicationSnapshotInterval`.
1391    /// Returns a bare integer (the entry count between dedup cursor
1392    /// snapshots), or `null` (decoded as `None`) when the broker default
1393    /// applies.
1394    /// Java: `NamespacesBase#getDeduplicationSnapshotInterval`.
1395    pub async fn namespace_get_deduplication_snapshot_interval(
1396        &self,
1397        ns: &str,
1398    ) -> Result<Option<i32>, AdminError> {
1399        let (tenant, namespace) = split_namespace(ns)?;
1400        let url = self.url(&[
1401            "namespaces",
1402            tenant,
1403            namespace,
1404            "deduplicationSnapshotInterval",
1405        ])?;
1406        let resp = self.send(self.http.request(Method::GET, url)).await?;
1407        json_ok_optional(resp).await
1408    }
1409
1410    /// Set a namespace's deduplication-snapshot interval (entries).
1411    ///
1412    /// `POST /admin/v2/namespaces/{tenant}/{ns}/deduplicationSnapshotInterval`
1413    /// with a bare JSON integer body.
1414    /// Java: `NamespacesBase#setDeduplicationSnapshotInterval`.
1415    pub async fn namespace_set_deduplication_snapshot_interval(
1416        &self,
1417        ns: &str,
1418        interval_entries: i32,
1419    ) -> Result<(), AdminError> {
1420        let (tenant, namespace) = split_namespace(ns)?;
1421        let url = self.url(&[
1422            "namespaces",
1423            tenant,
1424            namespace,
1425            "deduplicationSnapshotInterval",
1426        ])?;
1427        let resp = self
1428            .send(self.http.request(Method::POST, url).json(&interval_entries))
1429            .await?;
1430        empty_ok(resp).await
1431    }
1432
1433    /// Remove a namespace's deduplication-snapshot interval override.
1434    ///
1435    /// `DELETE /admin/v2/namespaces/{tenant}/{ns}/deduplicationSnapshotInterval`.
1436    /// Java: `NamespacesBase#deleteDeduplicationSnapshotInterval`.
1437    pub async fn namespace_remove_deduplication_snapshot_interval(
1438        &self,
1439        ns: &str,
1440    ) -> Result<(), AdminError> {
1441        let (tenant, namespace) = split_namespace(ns)?;
1442        let url = self.url(&[
1443            "namespaces",
1444            tenant,
1445            namespace,
1446            "deduplicationSnapshotInterval",
1447        ])?;
1448        let resp = self.send(self.http.request(Method::DELETE, url)).await?;
1449        empty_ok(resp).await
1450    }
1451
1452    /// Get a namespace's compaction threshold (bytes).
1453    ///
1454    /// `GET /admin/v2/namespaces/{tenant}/{ns}/compactionThreshold`. Returns
1455    /// a bare integer (bytes of accumulated topic backlog above which the
1456    /// broker triggers automatic compaction), or `null` (decoded as `None`)
1457    /// when the broker default applies.
1458    /// Java: `NamespacesBase#getCompactionThreshold`.
1459    pub async fn namespace_get_compaction_threshold(
1460        &self,
1461        ns: &str,
1462    ) -> Result<Option<i64>, AdminError> {
1463        let (tenant, namespace) = split_namespace(ns)?;
1464        let url = self.url(&["namespaces", tenant, namespace, "compactionThreshold"])?;
1465        let resp = self.send(self.http.request(Method::GET, url)).await?;
1466        json_ok_optional(resp).await
1467    }
1468
1469    /// Set a namespace's compaction threshold (bytes).
1470    ///
1471    /// `PUT /admin/v2/namespaces/{tenant}/{ns}/compactionThreshold` with
1472    /// a bare JSON long body. `0` disables automatic compaction.
1473    ///
1474    /// Note: this endpoint is `@PUT` in Pulsar 4 (`Namespaces.java`
1475    /// declares `@PUT @Path("/{tenant}/{namespace}/compactionThreshold")`),
1476    /// not POST — using POST yields a `405 Method Not Allowed`.
1477    /// Java: `NamespacesBase#setCompactionThreshold`.
1478    pub async fn namespace_set_compaction_threshold(
1479        &self,
1480        ns: &str,
1481        threshold_bytes: i64,
1482    ) -> Result<(), AdminError> {
1483        let (tenant, namespace) = split_namespace(ns)?;
1484        let url = self.url(&["namespaces", tenant, namespace, "compactionThreshold"])?;
1485        let resp = self
1486            .send(self.http.request(Method::PUT, url).json(&threshold_bytes))
1487            .await?;
1488        empty_ok(resp).await
1489    }
1490
1491    /// Remove a namespace's compaction threshold override.
1492    ///
1493    /// `DELETE /admin/v2/namespaces/{tenant}/{ns}/compactionThreshold`.
1494    /// Java: `NamespacesBase#deleteCompactionThreshold`.
1495    pub async fn namespace_remove_compaction_threshold(&self, ns: &str) -> Result<(), AdminError> {
1496        let (tenant, namespace) = split_namespace(ns)?;
1497        let url = self.url(&["namespaces", tenant, namespace, "compactionThreshold"])?;
1498        let resp = self.send(self.http.request(Method::DELETE, url)).await?;
1499        empty_ok(resp).await
1500    }
1501
1502    /// Get a namespace's delayed-delivery policy.
1503    ///
1504    /// `GET /admin/v2/namespaces/{tenant}/{ns}/delayedDelivery`. Returns
1505    /// the active flag + tick time (the broker's index-tick granularity
1506    /// for delivering delayed messages). `null` decodes as `None`.
1507    /// Java: `NamespacesBase#getDelayedDeliveryPolicies`.
1508    pub async fn namespace_get_delayed_delivery(
1509        &self,
1510        ns: &str,
1511    ) -> Result<Option<DelayedDeliveryPolicies>, AdminError> {
1512        let (tenant, namespace) = split_namespace(ns)?;
1513        let url = self.url(&["namespaces", tenant, namespace, "delayedDelivery"])?;
1514        let resp = self.send(self.http.request(Method::GET, url)).await?;
1515        json_ok_optional(resp).await
1516    }
1517
1518    /// Set a namespace's delayed-delivery policy.
1519    ///
1520    /// `POST /admin/v2/namespaces/{tenant}/{ns}/delayedDelivery` with a
1521    /// JSON `DelayedDeliveryPolicies` body.
1522    /// Java: `NamespacesBase#setDelayedDeliveryPolicies`.
1523    pub async fn namespace_set_delayed_delivery(
1524        &self,
1525        ns: &str,
1526        policy: DelayedDeliveryPolicies,
1527    ) -> Result<(), AdminError> {
1528        let (tenant, namespace) = split_namespace(ns)?;
1529        let url = self.url(&["namespaces", tenant, namespace, "delayedDelivery"])?;
1530        let resp = self
1531            .send(self.http.request(Method::POST, url).json(&policy))
1532            .await?;
1533        empty_ok(resp).await
1534    }
1535
1536    /// Remove a namespace's delayed-delivery policy override.
1537    ///
1538    /// `DELETE /admin/v2/namespaces/{tenant}/{ns}/delayedDelivery`.
1539    /// Java: `NamespacesBase#removeDelayedDeliveryPolicies`.
1540    pub async fn namespace_remove_delayed_delivery(&self, ns: &str) -> Result<(), AdminError> {
1541        let (tenant, namespace) = split_namespace(ns)?;
1542        let url = self.url(&["namespaces", tenant, namespace, "delayedDelivery"])?;
1543        let resp = self.send(self.http.request(Method::DELETE, url)).await?;
1544        empty_ok(resp).await
1545    }
1546
1547    /// Get a namespace's max-producers-per-topic limit.
1548    ///
1549    /// `GET /admin/v2/namespaces/{tenant}/{ns}/maxProducersPerTopic`. Returns
1550    /// a bare integer (the per-topic ceiling on concurrent producer
1551    /// connections), or `null` (decoded as `None`) when the broker default
1552    /// applies.
1553    /// Java: `NamespacesBase#getMaxProducersPerTopic`.
1554    pub async fn namespace_get_max_producers_per_topic(
1555        &self,
1556        ns: &str,
1557    ) -> Result<Option<i32>, AdminError> {
1558        let (tenant, namespace) = split_namespace(ns)?;
1559        let url = self.url(&["namespaces", tenant, namespace, "maxProducersPerTopic"])?;
1560        let resp = self.send(self.http.request(Method::GET, url)).await?;
1561        json_ok_optional(resp).await
1562    }
1563
1564    /// Set a namespace's max-producers-per-topic limit.
1565    ///
1566    /// `POST /admin/v2/namespaces/{tenant}/{ns}/maxProducersPerTopic` with
1567    /// a bare JSON integer body. `0` disables the limit.
1568    /// Java: `NamespacesBase#setMaxProducersPerTopic`.
1569    pub async fn namespace_set_max_producers_per_topic(
1570        &self,
1571        ns: &str,
1572        max_producers: i32,
1573    ) -> Result<(), AdminError> {
1574        let (tenant, namespace) = split_namespace(ns)?;
1575        let url = self.url(&["namespaces", tenant, namespace, "maxProducersPerTopic"])?;
1576        let resp = self
1577            .send(self.http.request(Method::POST, url).json(&max_producers))
1578            .await?;
1579        empty_ok(resp).await
1580    }
1581
1582    /// Remove a namespace's max-producers-per-topic limit override.
1583    ///
1584    /// `DELETE /admin/v2/namespaces/{tenant}/{ns}/maxProducersPerTopic`.
1585    /// Java: `NamespacesBase#removeMaxProducersPerTopic`.
1586    pub async fn namespace_remove_max_producers_per_topic(
1587        &self,
1588        ns: &str,
1589    ) -> Result<(), AdminError> {
1590        let (tenant, namespace) = split_namespace(ns)?;
1591        let url = self.url(&["namespaces", tenant, namespace, "maxProducersPerTopic"])?;
1592        let resp = self.send(self.http.request(Method::DELETE, url)).await?;
1593        empty_ok(resp).await
1594    }
1595
1596    /// Get a namespace's max-consumers-per-topic limit.
1597    ///
1598    /// `GET /admin/v2/namespaces/{tenant}/{ns}/maxConsumersPerTopic`. Returns
1599    /// a bare integer (the per-topic ceiling on concurrent consumer
1600    /// connections across all subscriptions), or `null` (decoded as `None`)
1601    /// when the broker default applies.
1602    /// Java: `NamespacesBase#getMaxConsumersPerTopic`.
1603    pub async fn namespace_get_max_consumers_per_topic(
1604        &self,
1605        ns: &str,
1606    ) -> Result<Option<i32>, AdminError> {
1607        let (tenant, namespace) = split_namespace(ns)?;
1608        let url = self.url(&["namespaces", tenant, namespace, "maxConsumersPerTopic"])?;
1609        let resp = self.send(self.http.request(Method::GET, url)).await?;
1610        json_ok_optional(resp).await
1611    }
1612
1613    /// Set a namespace's max-consumers-per-topic limit.
1614    ///
1615    /// `POST /admin/v2/namespaces/{tenant}/{ns}/maxConsumersPerTopic` with
1616    /// a bare JSON integer body. `0` disables the limit.
1617    /// Java: `NamespacesBase#setMaxConsumersPerTopic`.
1618    pub async fn namespace_set_max_consumers_per_topic(
1619        &self,
1620        ns: &str,
1621        max_consumers: i32,
1622    ) -> Result<(), AdminError> {
1623        let (tenant, namespace) = split_namespace(ns)?;
1624        let url = self.url(&["namespaces", tenant, namespace, "maxConsumersPerTopic"])?;
1625        let resp = self
1626            .send(self.http.request(Method::POST, url).json(&max_consumers))
1627            .await?;
1628        empty_ok(resp).await
1629    }
1630
1631    /// Remove a namespace's max-consumers-per-topic limit override.
1632    ///
1633    /// `DELETE /admin/v2/namespaces/{tenant}/{ns}/maxConsumersPerTopic`.
1634    /// Java: `NamespacesBase#removeMaxConsumersPerTopic`.
1635    pub async fn namespace_remove_max_consumers_per_topic(
1636        &self,
1637        ns: &str,
1638    ) -> Result<(), AdminError> {
1639        let (tenant, namespace) = split_namespace(ns)?;
1640        let url = self.url(&["namespaces", tenant, namespace, "maxConsumersPerTopic"])?;
1641        let resp = self.send(self.http.request(Method::DELETE, url)).await?;
1642        empty_ok(resp).await
1643    }
1644
1645    /// Get a namespace's max-unacked-messages-per-consumer limit.
1646    ///
1647    /// `GET /admin/v2/namespaces/{tenant}/{ns}/maxUnackedMessagesPerConsumer`.
1648    /// Returns a bare integer (the broker's per-consumer permit-pool cap
1649    /// before it stops dispatching), or `null` (decoded as `None`) when
1650    /// the broker default applies.
1651    /// Java: `NamespacesBase#getMaxUnackedMessagesPerConsumer`.
1652    pub async fn namespace_get_max_unacked_messages_per_consumer(
1653        &self,
1654        ns: &str,
1655    ) -> Result<Option<i32>, AdminError> {
1656        let (tenant, namespace) = split_namespace(ns)?;
1657        let url = self.url(&[
1658            "namespaces",
1659            tenant,
1660            namespace,
1661            "maxUnackedMessagesPerConsumer",
1662        ])?;
1663        let resp = self.send(self.http.request(Method::GET, url)).await?;
1664        json_ok_optional(resp).await
1665    }
1666
1667    /// Set a namespace's max-unacked-messages-per-consumer limit.
1668    ///
1669    /// `POST /admin/v2/namespaces/{tenant}/{ns}/maxUnackedMessagesPerConsumer`
1670    /// with a bare JSON integer body. `0` disables the limit.
1671    /// Java: `NamespacesBase#setMaxUnackedMessagesPerConsumer`.
1672    pub async fn namespace_set_max_unacked_messages_per_consumer(
1673        &self,
1674        ns: &str,
1675        max_unacked: i32,
1676    ) -> Result<(), AdminError> {
1677        let (tenant, namespace) = split_namespace(ns)?;
1678        let url = self.url(&[
1679            "namespaces",
1680            tenant,
1681            namespace,
1682            "maxUnackedMessagesPerConsumer",
1683        ])?;
1684        let resp = self
1685            .send(self.http.request(Method::POST, url).json(&max_unacked))
1686            .await?;
1687        empty_ok(resp).await
1688    }
1689
1690    /// Remove a namespace's max-unacked-messages-per-consumer override.
1691    ///
1692    /// `DELETE /admin/v2/namespaces/{tenant}/{ns}/maxUnackedMessagesPerConsumer`.
1693    /// Java: `NamespacesBase#removeMaxUnackedMessagesPerConsumer`.
1694    pub async fn namespace_remove_max_unacked_messages_per_consumer(
1695        &self,
1696        ns: &str,
1697    ) -> Result<(), AdminError> {
1698        let (tenant, namespace) = split_namespace(ns)?;
1699        let url = self.url(&[
1700            "namespaces",
1701            tenant,
1702            namespace,
1703            "maxUnackedMessagesPerConsumer",
1704        ])?;
1705        let resp = self.send(self.http.request(Method::DELETE, url)).await?;
1706        empty_ok(resp).await
1707    }
1708
1709    /// Get a namespace's max-unacked-messages-per-subscription limit.
1710    ///
1711    /// `GET /admin/v2/namespaces/{tenant}/{ns}/maxUnackedMessagesPerSubscription`.
1712    /// Returns a bare integer (the broker's per-subscription unacked
1713    /// ceiling — once exceeded the broker stops dispatching to every
1714    /// consumer on that subscription), or `null` (decoded as `None`)
1715    /// when the broker default applies.
1716    /// Java: `NamespacesBase#getMaxUnackedMessagesPerSubscription`.
1717    pub async fn namespace_get_max_unacked_messages_per_subscription(
1718        &self,
1719        ns: &str,
1720    ) -> Result<Option<i32>, AdminError> {
1721        let (tenant, namespace) = split_namespace(ns)?;
1722        let url = self.url(&[
1723            "namespaces",
1724            tenant,
1725            namespace,
1726            "maxUnackedMessagesPerSubscription",
1727        ])?;
1728        let resp = self.send(self.http.request(Method::GET, url)).await?;
1729        json_ok_optional(resp).await
1730    }
1731
1732    /// Set a namespace's max-unacked-messages-per-subscription limit.
1733    ///
1734    /// `POST /admin/v2/namespaces/{tenant}/{ns}/maxUnackedMessagesPerSubscription`
1735    /// with a bare JSON integer body. `0` disables the limit.
1736    /// Java: `NamespacesBase#setMaxUnackedMessagesPerSubscription`.
1737    pub async fn namespace_set_max_unacked_messages_per_subscription(
1738        &self,
1739        ns: &str,
1740        max_unacked: i32,
1741    ) -> Result<(), AdminError> {
1742        let (tenant, namespace) = split_namespace(ns)?;
1743        let url = self.url(&[
1744            "namespaces",
1745            tenant,
1746            namespace,
1747            "maxUnackedMessagesPerSubscription",
1748        ])?;
1749        let resp = self
1750            .send(self.http.request(Method::POST, url).json(&max_unacked))
1751            .await?;
1752        empty_ok(resp).await
1753    }
1754
1755    /// Remove a namespace's max-unacked-messages-per-subscription override.
1756    ///
1757    /// `DELETE /admin/v2/namespaces/{tenant}/{ns}/maxUnackedMessagesPerSubscription`.
1758    /// Java: `NamespacesBase#removeMaxUnackedMessagesPerSubscription`.
1759    pub async fn namespace_remove_max_unacked_messages_per_subscription(
1760        &self,
1761        ns: &str,
1762    ) -> Result<(), AdminError> {
1763        let (tenant, namespace) = split_namespace(ns)?;
1764        let url = self.url(&[
1765            "namespaces",
1766            tenant,
1767            namespace,
1768            "maxUnackedMessagesPerSubscription",
1769        ])?;
1770        let resp = self.send(self.http.request(Method::DELETE, url)).await?;
1771        empty_ok(resp).await
1772    }
1773
1774    // --- Topics ----------------------------------------------------------
1775
1776    /// List persistent topics in a namespace.
1777    ///
1778    /// `GET /admin/v2/persistent/{tenant}/{namespace}`.
1779    /// Java: `pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java`
1780    /// (`@Path("/persistent")` + `@GET @Path("/{tenant}/{namespace}")`).
1781    pub async fn topics_list(&self, namespace: &str) -> Result<Vec<String>, AdminError> {
1782        let (tenant, namespace) = split_namespace(namespace)?;
1783        let url = self.url(&["persistent", tenant, namespace])?;
1784        let resp = self.send(self.http.request(Method::GET, url)).await?;
1785        json_ok(resp).await
1786    }
1787
1788    /// Create a non-partitioned persistent topic.
1789    ///
1790    /// `PUT /admin/v2/persistent/{tenant}/{namespace}/{topic}`.
1791    /// Java: `PersistentTopics.java#createNonPartitionedTopic`
1792    /// (`@PUT @Path("/{tenant}/{namespace}/{topic}")`).
1793    pub async fn topic_create_non_partitioned(&self, topic: &str) -> Result<(), AdminError> {
1794        self.topic_create_non_partitioned_with_properties(topic, &HashMap::new())
1795            .await
1796    }
1797
1798    async fn topic_create_non_partitioned_with_properties(
1799        &self,
1800        topic: &str,
1801        properties: &HashMap<String, String>,
1802    ) -> Result<(), AdminError> {
1803        let (tenant, namespace, name) = split_topic(topic)?;
1804        let url = self.url(&["persistent", tenant, namespace, name])?;
1805        let resp = self
1806            .send(self.http.request(Method::PUT, url).json(properties))
1807            .await?;
1808        empty_ok(resp).await
1809    }
1810
1811    /// Create a partitioned topic with `partitions` partitions.
1812    ///
1813    /// `PUT /admin/v2/persistent/{tenant}/{namespace}/{topic}/partitions`
1814    /// with the partition count as a JSON integer body.
1815    /// Java: `PersistentTopics.java#createPartitionedTopic`
1816    /// (`@PUT @Path("/{tenant}/{namespace}/{topic}/partitions")`).
1817    pub async fn topic_create_partitioned(
1818        &self,
1819        topic: &str,
1820        partitions: u32,
1821    ) -> Result<(), AdminError> {
1822        let (tenant, namespace, name) = split_topic(topic)?;
1823        let url = self.url(&["persistent", tenant, namespace, name, "partitions"])?;
1824        let resp = self
1825            .send(self.http.request(Method::PUT, url).json(&partitions))
1826            .await?;
1827        empty_ok(resp).await
1828    }
1829
1830    /// Delete a topic, auto-detecting partitioned vs non-partitioned.
1831    ///
1832    /// Pulsar exposes two distinct delete endpoints — the partitioned
1833    /// parent uses `DELETE .../{topic}/partitions?force=…` and the
1834    /// non-partitioned topic uses `DELETE .../{topic}?force=…`. Hitting
1835    /// the partitioned endpoint on a non-partitioned topic returns 404
1836    /// ("Topic is not partitioned"), which used to surface as "the
1837    /// topic doesn't exist" to operators using `magnetar admin topics
1838    /// delete`.
1839    ///
1840    /// Probe via `topic_partitions_count` first (`GET .../partitions`
1841    /// returns `partitions: 0` for non-partitioned topics, `> 0` for a
1842    /// partitioned parent) and route to the matching endpoint. Same
1843    /// behaviour as pulsarctl's `topics delete`.
1844    ///
1845    /// Java: `PersistentTopics.java#deletePartitionedTopic` /
1846    /// `PersistentTopics.java#deleteTopic`.
1847    pub async fn topic_delete(&self, topic: &str, force: bool) -> Result<(), AdminError> {
1848        let (tenant, namespace, name) = split_topic(topic)?;
1849        let partitions = self.topic_partitions_count(topic).await?;
1850        let force_str = if force { "true" } else { "false" };
1851        let mut url = if partitions > 0 {
1852            self.url(&["persistent", tenant, namespace, name, "partitions"])?
1853        } else {
1854            self.url(&["persistent", tenant, namespace, name])?
1855        };
1856        url.query_pairs_mut().append_pair("force", force_str);
1857        let resp = self.send(self.http.request(Method::DELETE, url)).await?;
1858        empty_ok(resp).await
1859    }
1860
1861    /// Get topic stats.
1862    ///
1863    /// `GET /admin/v2/persistent/{tenant}/{namespace}/{topic}/stats`.
1864    /// Java: `PersistentTopics.java#getStats`
1865    /// (`@GET @Path("/{tenant}/{namespace}/{topic}/stats")`,
1866    /// response shape `PersistentTopicStats`).
1867    ///
1868    /// For a **partitioned** topic, the broker returns 404 on this endpoint
1869    /// because there is no ledger backing the parent name. Call
1870    /// [`Self::topic_partitioned_stats`] instead, or look up the count via
1871    /// [`Self::topic_partitions_count`] first.
1872    pub async fn topic_stats(&self, topic: &str) -> Result<TopicStats, AdminError> {
1873        let (tenant, namespace, name) = split_topic(topic)?;
1874        let url = self.url(&["persistent", tenant, namespace, name, "stats"])?;
1875        let resp = self.send(self.http.request(Method::GET, url)).await?;
1876        json_ok(resp).await
1877    }
1878
1879    /// Get aggregated stats for a partitioned topic.
1880    ///
1881    /// `GET /admin/v2/persistent/{tenant}/{namespace}/{topic}/partitioned-stats?
1882    /// perPartition=false`. Java: `PersistentTopics.java#getPartitionedStats`
1883    /// (`@GET @Path("/{tenant}/{namespace}/{topic}/partitioned-stats")`,
1884    /// response shape `PartitionedTopicStats` which extends
1885    /// `PersistentTopicStats` with `partitions: Map<String, TopicStats>`
1886    /// and `metadata: PartitionedTopicMetadata`).
1887    ///
1888    /// magnetar exposes only the aggregated top-level counters through the
1889    /// same [`TopicStats`] shape — the broker populates `msgInCounter`,
1890    /// `bytesInCounter`, `publishers`, `subscriptions` at the response root
1891    /// summed across partitions. The `partitions` and `metadata` fields are
1892    /// dropped on deserialisation; for per-partition detail call
1893    /// [`Self::topic_stats`] on each `<topic>-partition-N` instead. We pass
1894    /// `perPartition=false` to keep the wire response small.
1895    pub async fn topic_partitioned_stats(&self, topic: &str) -> Result<TopicStats, AdminError> {
1896        let (tenant, namespace, name) = split_topic(topic)?;
1897        let mut url = self.url(&["persistent", tenant, namespace, name, "partitioned-stats"])?;
1898        url.query_pairs_mut().append_pair("perPartition", "false");
1899        let resp = self.send(self.http.request(Method::GET, url)).await?;
1900        json_ok(resp).await
1901    }
1902
1903    /// Resolve the partition count of a topic.
1904    ///
1905    /// `GET /admin/v2/persistent/{tenant}/{namespace}/{topic}/partitions`.
1906    /// Java: `PersistentTopics.java#getPartitionedMetadata`
1907    /// (`@GET @Path("/{tenant}/{namespace}/{topic}/partitions")`,
1908    /// response shape `PartitionedTopicMetadata{ partitions: int }`).
1909    ///
1910    /// Returns `0` for non-partitioned topics; lets a caller disambiguate
1911    /// between [`Self::topic_stats`] and [`Self::topic_partitioned_stats`]
1912    /// when the topology is not known in advance.
1913    pub async fn topic_partitions_count(&self, topic: &str) -> Result<u32, AdminError> {
1914        let (tenant, namespace, name) = split_topic(topic)?;
1915        let url = self.url(&["persistent", tenant, namespace, name, "partitions"])?;
1916        let resp = self.send(self.http.request(Method::GET, url)).await?;
1917        let meta: PartitionedTopicMetadata = json_ok(resp).await?;
1918        Ok(meta.partitions)
1919    }
1920
1921    /// Resolve a broker-entry-metadata `index` to a [`MessageId`] (PIP-415).
1922    ///
1923    /// `GET /admin/v2/persistent/{tenant}/{namespace}/{topic}/getMessageIdByIndex?index={index}`.
1924    /// Per [PIP-415](https://github.com/apache/pulsar/blob/master/pip/pip-415.md)
1925    /// this is **REST-only** — the spec's "Binary protocol" section is
1926    /// intentionally empty and the canonical implementation PR
1927    /// [`apache/pulsar#24222`](https://github.com/apache/pulsar/pull/24222)
1928    /// (merged 2025-06-23) touches only admin / broker / CLI Java code.
1929    ///
1930    /// Java:
1931    /// `pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java`
1932    /// (`@GET @Path("/{tenant}/{namespace}/{topic}/getMessageIdByIndex")`,
1933    /// `@QueryParam("index") long`); admin-client side is
1934    /// `pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/
1935    /// TopicsImpl.java#getMessageIdByIndexAsync` which deserialises the
1936    /// response into `MessageIdImpl` (i.e. `{ledgerId, entryId, partitionIndex}`).
1937    ///
1938    /// `topic` follows the same rule as every other topic-scoped method:
1939    /// either `persistent://tenant/ns/topic` or `tenant/ns/topic`. For a
1940    /// partitioned topic, pass the specific partition (`my-topic-partition-0`).
1941    ///
1942    /// The response carries only `(ledgerId, entryId, partitionIndex)`. The
1943    /// returned [`MessageId`] sets `batch_index = -1` and `batch_size = -1`
1944    /// because the broker resolves at entry granularity — see PIP-415 §"Why
1945    /// Precise Index Matching Isn't Implemented on the Broker Side".
1946    pub async fn topic_get_message_id_by_index(
1947        &self,
1948        topic: &str,
1949        index: i64,
1950    ) -> Result<MessageId, AdminError> {
1951        let (tenant, namespace, name) = split_topic(topic)?;
1952        let mut url = self.url(&["persistent", tenant, namespace, name, "getMessageIdByIndex"])?;
1953        url.query_pairs_mut()
1954            .append_pair("index", &index.to_string());
1955        let resp = self.send(self.http.request(Method::GET, url)).await?;
1956        let dto: MessageIdResponse = json_ok(resp).await?;
1957        dto.try_into_message_id()
1958    }
1959
1960    /// Trigger ledger compaction for a topic.
1961    ///
1962    /// `PUT /admin/v2/persistent/{tenant}/{namespace}/{topic}/compaction`.
1963    /// Returns 204 on success; the broker queues the work asynchronously —
1964    /// poll [`Self::topic_compaction_status`] to observe progress.
1965    /// Java: `PersistentTopics#triggerCompaction`.
1966    pub async fn topic_compact(&self, topic: &str) -> Result<(), AdminError> {
1967        let (tenant, namespace, name) = split_topic(topic)?;
1968        let url = self.url(&["persistent", tenant, namespace, name, "compaction"])?;
1969        let resp = self.send(self.http.request(Method::PUT, url)).await?;
1970        empty_ok(resp).await
1971    }
1972
1973    /// Get the current compaction status for a topic.
1974    ///
1975    /// `GET /admin/v2/persistent/{tenant}/{namespace}/{topic}/compaction`.
1976    /// Returns Java's `LongRunningProcessStatus`: `status` ∈ {`NOT_RUN`,
1977    /// `RUNNING`, `SUCCESS`, `ERROR`} plus an optional `lastError` string.
1978    /// Java: `PersistentTopics#compactionStatus`.
1979    pub async fn topic_compaction_status(
1980        &self,
1981        topic: &str,
1982    ) -> Result<LongRunningProcessStatus, AdminError> {
1983        let (tenant, namespace, name) = split_topic(topic)?;
1984        let url = self.url(&["persistent", tenant, namespace, name, "compaction"])?;
1985        let resp = self.send(self.http.request(Method::GET, url)).await?;
1986        json_ok(resp).await
1987    }
1988
1989    /// Unload a topic from its current broker (forces rebalancing).
1990    ///
1991    /// `PUT /admin/v2/persistent/{tenant}/{namespace}/{topic}/unload`.
1992    /// Operators use this to drain a hot broker or to re-elect ownership
1993    /// after a configuration change. Java: `PersistentTopics#unloadTopic`.
1994    pub async fn topic_unload(&self, topic: &str) -> Result<(), AdminError> {
1995        let (tenant, namespace, name) = split_topic(topic)?;
1996        let url = self.url(&["persistent", tenant, namespace, name, "unload"])?;
1997        let resp = self.send(self.http.request(Method::PUT, url)).await?;
1998        empty_ok(resp).await
1999    }
2000
2001    /// Terminate (seal) a topic — no further produces succeed.
2002    ///
2003    /// `POST /admin/v2/persistent/{tenant}/{namespace}/{topic}/terminate`.
2004    /// Returns the [`MessageId`] of the last message that landed before
2005    /// the seal, or `None` when the broker reports the
2006    /// `MessageIdImpl(-1, -1)` sentinel — meaning the topic was sealed
2007    /// before any confirmed entry was written (a freshly-created topic,
2008    /// or a topic whose owner was just re-elected). The Java client
2009    /// surfaces that case as `MessageId.earliest`; we use `Option` so
2010    /// callers don't have to special-case a magic value.
2011    /// Java: `PersistentTopics#terminate`.
2012    pub async fn topic_terminate(&self, topic: &str) -> Result<Option<MessageId>, AdminError> {
2013        let (tenant, namespace, name) = split_topic(topic)?;
2014        let url = self.url(&["persistent", tenant, namespace, name, "terminate"])?;
2015        let resp = self.send(self.http.request(Method::POST, url)).await?;
2016        let dto: MessageIdResponse = json_ok(resp).await?;
2017        if dto.ledger_id < 0 && dto.entry_id < 0 {
2018            return Ok(None);
2019        }
2020        dto.try_into_message_id().map(Some)
2021    }
2022
2023    /// Grow a partitioned topic's partition count.
2024    ///
2025    /// `POST /admin/v2/persistent/{tenant}/{namespace}/{topic}/partitions`
2026    /// with a bare JSON integer body. Only forward (grow) is supported by
2027    /// the broker — shrinking returns 409. Java:
2028    /// `PersistentTopics#updatePartitionedTopic`.
2029    pub async fn topic_update_partitions(
2030        &self,
2031        topic: &str,
2032        new_partitions: u32,
2033    ) -> Result<(), AdminError> {
2034        let (tenant, namespace, name) = split_topic(topic)?;
2035        let url = self.url(&["persistent", tenant, namespace, name, "partitions"])?;
2036        let resp = self
2037            .send(self.http.request(Method::POST, url).json(&new_partitions))
2038            .await?;
2039        empty_ok(resp).await
2040    }
2041
2042    // --- Topic policies — per-topic overrides ---------------------------
2043
2044    /// Get a topic's retention policy.
2045    ///
2046    /// `GET /admin/v2/persistent/{tenant}/{ns}/{topic}/retention`.
2047    /// Returns the per-topic [`RetentionPolicies`] override; the broker
2048    /// emits a `RetentionPolicies` JSON when the policy is set and a bare
2049    /// `null` (decoded as `RetentionPolicies::default()` via `#[serde(default)]`)
2050    /// when no override is in place — callers fall back to the namespace
2051    /// policy in that case. Java: `PersistentTopicsBase#getRetention`.
2052    pub async fn topic_get_retention(&self, topic: &str) -> Result<RetentionPolicies, AdminError> {
2053        let (tenant, namespace, name) = split_topic(topic)?;
2054        let url = self.url(&["persistent", tenant, namespace, name, "retention"])?;
2055        let resp = self.send(self.http.request(Method::GET, url)).await?;
2056        json_ok_or_default(resp).await
2057    }
2058
2059    /// Set a topic's retention policy (overrides the namespace default).
2060    ///
2061    /// `POST /admin/v2/persistent/{tenant}/{ns}/{topic}/retention` with a
2062    /// JSON `RetentionPolicies` body. `-1` means infinite (size or time).
2063    /// Java: `PersistentTopicsBase#setRetention`.
2064    pub async fn topic_set_retention(
2065        &self,
2066        topic: &str,
2067        policy: RetentionPolicies,
2068    ) -> Result<(), AdminError> {
2069        let (tenant, namespace, name) = split_topic(topic)?;
2070        let url = self.url(&["persistent", tenant, namespace, name, "retention"])?;
2071        let resp = self
2072            .send(self.http.request(Method::POST, url).json(&policy))
2073            .await?;
2074        empty_ok(resp).await
2075    }
2076
2077    /// Remove a topic's retention policy (fall back to namespace default).
2078    ///
2079    /// `DELETE /admin/v2/persistent/{tenant}/{ns}/{topic}/retention`.
2080    /// Java: `PersistentTopicsBase#removeRetention`.
2081    pub async fn topic_remove_retention(&self, topic: &str) -> Result<(), AdminError> {
2082        let (tenant, namespace, name) = split_topic(topic)?;
2083        let url = self.url(&["persistent", tenant, namespace, name, "retention"])?;
2084        let resp = self.send(self.http.request(Method::DELETE, url)).await?;
2085        empty_ok(resp).await
2086    }
2087
2088    /// Get all backlog-quota policies on a topic.
2089    ///
2090    /// `GET /admin/v2/persistent/{tenant}/{ns}/{topic}/backlogQuotaMap`.
2091    /// Returns `Map<BacklogQuotaType, BacklogQuota>` — kept as raw JSON
2092    /// for the same reason as [`Self::namespace_get_backlog_quotas`]:
2093    /// broker minor versions add quota types.
2094    /// Java: `PersistentTopicsBase#getBacklogQuotaMap`.
2095    pub async fn topic_get_backlog_quotas(
2096        &self,
2097        topic: &str,
2098    ) -> Result<serde_json::Value, AdminError> {
2099        let (tenant, namespace, name) = split_topic(topic)?;
2100        let url = self.url(&["persistent", tenant, namespace, name, "backlogQuotaMap"])?;
2101        let resp = self.send(self.http.request(Method::GET, url)).await?;
2102        json_ok(resp).await
2103    }
2104
2105    /// Set a backlog-quota policy on a topic (overrides the namespace
2106    /// default for the matching `backlogQuotaType`).
2107    ///
2108    /// `POST /admin/v2/persistent/{tenant}/{ns}/{topic}/backlogQuota
2109    /// ?backlogQuotaType={type}` with a JSON `BacklogQuota` body.
2110    /// Java: `PersistentTopicsBase#setBacklogQuota`.
2111    pub async fn topic_set_backlog_quota(
2112        &self,
2113        topic: &str,
2114        backlog_quota_type: BacklogQuotaType,
2115        quota: BacklogQuota,
2116    ) -> Result<(), AdminError> {
2117        let (tenant, namespace, name) = split_topic(topic)?;
2118        let mut url = self.url(&["persistent", tenant, namespace, name, "backlogQuota"])?;
2119        url.query_pairs_mut()
2120            .append_pair("backlogQuotaType", backlog_quota_type.as_query_value());
2121        let resp = self
2122            .send(self.http.request(Method::POST, url).json(&quota))
2123            .await?;
2124        empty_ok(resp).await
2125    }
2126
2127    /// Remove a backlog-quota policy from a topic.
2128    ///
2129    /// `DELETE /admin/v2/persistent/{tenant}/{ns}/{topic}/backlogQuota
2130    /// ?backlogQuotaType={type}`.
2131    /// Java: `PersistentTopicsBase#removeBacklogQuota`.
2132    pub async fn topic_remove_backlog_quota(
2133        &self,
2134        topic: &str,
2135        backlog_quota_type: BacklogQuotaType,
2136    ) -> Result<(), AdminError> {
2137        let (tenant, namespace, name) = split_topic(topic)?;
2138        let mut url = self.url(&["persistent", tenant, namespace, name, "backlogQuota"])?;
2139        url.query_pairs_mut()
2140            .append_pair("backlogQuotaType", backlog_quota_type.as_query_value());
2141        let resp = self.send(self.http.request(Method::DELETE, url)).await?;
2142        empty_ok(resp).await
2143    }
2144
2145    /// Get a topic's message-TTL (seconds, or `null` if unset).
2146    ///
2147    /// `GET /admin/v2/persistent/{tenant}/{ns}/{topic}/messageTTL`. Returns
2148    /// a bare integer when the override is set, `null` (decoded as
2149    /// `Option::None`) when no topic-level override is in place.
2150    /// Java: `PersistentTopicsBase#getMessageTTL`.
2151    pub async fn topic_get_message_ttl(&self, topic: &str) -> Result<Option<i32>, AdminError> {
2152        let (tenant, namespace, name) = split_topic(topic)?;
2153        let url = self.url(&["persistent", tenant, namespace, name, "messageTTL"])?;
2154        let resp = self.send(self.http.request(Method::GET, url)).await?;
2155        json_ok_optional(resp).await
2156    }
2157
2158    /// Set a topic's message-TTL (seconds).
2159    ///
2160    /// `POST /admin/v2/persistent/{tenant}/{ns}/{topic}/messageTTL?messageTTL={n}`.
2161    /// `0` disables (broker treats as no TTL).
2162    ///
2163    /// Note: unlike the **namespace**-level `setNamespaceMessageTTL` (which
2164    /// takes the TTL as a JSON int body), the topic-level setter binds
2165    /// `@QueryParam("messageTTL") Integer messageTTL` on both Pulsar 4.0 and
2166    /// 4.2 (`pulsar-broker/.../v2/PersistentTopics.java#setMessageTTL`).
2167    /// Sending the value as a JSON body returns 204 No Content but the
2168    /// broker reads the query param as `null` and silently treats the call
2169    /// as "no override" — the topic policy never persists, and a subsequent
2170    /// `topic_get_message_ttl` surfaces `Ok(None)`. Older Pulsar releases
2171    /// tolerated both encodings; 4.2 enforces the query-param shape.
2172    /// Java: `PersistentTopicsBase#setMessageTTL`.
2173    pub async fn topic_set_message_ttl(
2174        &self,
2175        topic: &str,
2176        ttl_seconds: i32,
2177    ) -> Result<(), AdminError> {
2178        let (tenant, namespace, name) = split_topic(topic)?;
2179        let mut url = self.url(&["persistent", tenant, namespace, name, "messageTTL"])?;
2180        url.query_pairs_mut()
2181            .append_pair("messageTTL", &ttl_seconds.to_string());
2182        let resp = self.send(self.http.request(Method::POST, url)).await?;
2183        empty_ok(resp).await
2184    }
2185
2186    /// Remove a topic's message-TTL (fall back to namespace default).
2187    ///
2188    /// `DELETE /admin/v2/persistent/{tenant}/{ns}/{topic}/messageTTL`.
2189    /// Java: `PersistentTopicsBase#removeMessageTTL`.
2190    pub async fn topic_remove_message_ttl(&self, topic: &str) -> Result<(), AdminError> {
2191        let (tenant, namespace, name) = split_topic(topic)?;
2192        let url = self.url(&["persistent", tenant, namespace, name, "messageTTL"])?;
2193        let resp = self.send(self.http.request(Method::DELETE, url)).await?;
2194        empty_ok(resp).await
2195    }
2196
2197    /// Get a topic's persistence policy.
2198    ///
2199    /// `GET /admin/v2/persistent/{tenant}/{ns}/{topic}/persistence`. The
2200    /// broker emits a `PersistencePolicies` JSON when the topic override
2201    /// is set and `null` (decoded as `Option::None`) when no override is
2202    /// in place — callers fall back to the namespace policy in that case.
2203    /// Java: `PersistentTopicsBase#getPersistence`.
2204    pub async fn topic_get_persistence(
2205        &self,
2206        topic: &str,
2207    ) -> Result<Option<PersistencePolicies>, AdminError> {
2208        let (tenant, namespace, name) = split_topic(topic)?;
2209        let url = self.url(&["persistent", tenant, namespace, name, "persistence"])?;
2210        let resp = self.send(self.http.request(Method::GET, url)).await?;
2211        json_ok_optional(resp).await
2212    }
2213
2214    /// Set a topic's persistence policy (overrides the namespace default).
2215    ///
2216    /// `POST /admin/v2/persistent/{tenant}/{ns}/{topic}/persistence` with a
2217    /// JSON `PersistencePolicies` body.
2218    /// Java: `PersistentTopicsBase#setPersistence`.
2219    pub async fn topic_set_persistence(
2220        &self,
2221        topic: &str,
2222        policy: PersistencePolicies,
2223    ) -> Result<(), AdminError> {
2224        let (tenant, namespace, name) = split_topic(topic)?;
2225        let url = self.url(&["persistent", tenant, namespace, name, "persistence"])?;
2226        let resp = self
2227            .send(self.http.request(Method::POST, url).json(&policy))
2228            .await?;
2229        empty_ok(resp).await
2230    }
2231
2232    /// Remove a topic's persistence policy (fall back to namespace default).
2233    ///
2234    /// `DELETE /admin/v2/persistent/{tenant}/{ns}/{topic}/persistence`.
2235    /// Java: `PersistentTopicsBase#removePersistence`.
2236    pub async fn topic_remove_persistence(&self, topic: &str) -> Result<(), AdminError> {
2237        let (tenant, namespace, name) = split_topic(topic)?;
2238        let url = self.url(&["persistent", tenant, namespace, name, "persistence"])?;
2239        let resp = self.send(self.http.request(Method::DELETE, url)).await?;
2240        empty_ok(resp).await
2241    }
2242
2243    /// Get a topic's consumer dispatch-rate policy (or `null` if no override).
2244    ///
2245    /// `GET /admin/v2/persistent/{tenant}/{ns}/{topic}/dispatchRate`. The
2246    /// broker emits the per-topic [`DispatchRate`] override or `null` when
2247    /// no override is set; callers fall back to the namespace policy in the
2248    /// `None` case. Java: `PersistentTopicsBase#getDispatchRate`.
2249    pub async fn topic_get_dispatch_rate(
2250        &self,
2251        topic: &str,
2252    ) -> Result<Option<DispatchRate>, AdminError> {
2253        let (tenant, namespace, name) = split_topic(topic)?;
2254        let url = self.url(&["persistent", tenant, namespace, name, "dispatchRate"])?;
2255        let resp = self.send(self.http.request(Method::GET, url)).await?;
2256        json_ok_optional(resp).await
2257    }
2258
2259    /// Set a topic's consumer dispatch-rate policy (overrides namespace default).
2260    ///
2261    /// `POST /admin/v2/persistent/{tenant}/{ns}/{topic}/dispatchRate` with a
2262    /// JSON `DispatchRate` body. Java: `PersistentTopicsBase#setDispatchRate`.
2263    pub async fn topic_set_dispatch_rate(
2264        &self,
2265        topic: &str,
2266        rate: DispatchRate,
2267    ) -> Result<(), AdminError> {
2268        let (tenant, namespace, name) = split_topic(topic)?;
2269        let url = self.url(&["persistent", tenant, namespace, name, "dispatchRate"])?;
2270        let resp = self
2271            .send(self.http.request(Method::POST, url).json(&rate))
2272            .await?;
2273        empty_ok(resp).await
2274    }
2275
2276    /// Remove a topic's consumer dispatch-rate policy.
2277    ///
2278    /// `DELETE /admin/v2/persistent/{tenant}/{ns}/{topic}/dispatchRate`.
2279    /// Java: `PersistentTopicsBase#removeDispatchRate`.
2280    pub async fn topic_remove_dispatch_rate(&self, topic: &str) -> Result<(), AdminError> {
2281        let (tenant, namespace, name) = split_topic(topic)?;
2282        let url = self.url(&["persistent", tenant, namespace, name, "dispatchRate"])?;
2283        let resp = self.send(self.http.request(Method::DELETE, url)).await?;
2284        empty_ok(resp).await
2285    }
2286
2287    /// Get a topic's per-subscription dispatch-rate policy (or `null`).
2288    ///
2289    /// `GET /admin/v2/persistent/{tenant}/{ns}/{topic}/subscriptionDispatchRate`.
2290    /// Reuses the [`DispatchRate`] body shape — the policy applies per
2291    /// subscription rather than aggregated across all consumers.
2292    /// Java: `PersistentTopicsBase#getSubscriptionDispatchRate`.
2293    pub async fn topic_get_subscription_dispatch_rate(
2294        &self,
2295        topic: &str,
2296    ) -> Result<Option<DispatchRate>, AdminError> {
2297        let (tenant, namespace, name) = split_topic(topic)?;
2298        let url = self.url(&[
2299            "persistent",
2300            tenant,
2301            namespace,
2302            name,
2303            "subscriptionDispatchRate",
2304        ])?;
2305        let resp = self.send(self.http.request(Method::GET, url)).await?;
2306        json_ok_optional(resp).await
2307    }
2308
2309    /// Set a topic's per-subscription dispatch-rate policy (overrides namespace default).
2310    ///
2311    /// `POST /admin/v2/persistent/{tenant}/{ns}/{topic}/subscriptionDispatchRate`
2312    /// with a JSON `DispatchRate` body.
2313    /// Java: `PersistentTopicsBase#setSubscriptionDispatchRate`.
2314    pub async fn topic_set_subscription_dispatch_rate(
2315        &self,
2316        topic: &str,
2317        rate: DispatchRate,
2318    ) -> Result<(), AdminError> {
2319        let (tenant, namespace, name) = split_topic(topic)?;
2320        let url = self.url(&[
2321            "persistent",
2322            tenant,
2323            namespace,
2324            name,
2325            "subscriptionDispatchRate",
2326        ])?;
2327        let resp = self
2328            .send(self.http.request(Method::POST, url).json(&rate))
2329            .await?;
2330        empty_ok(resp).await
2331    }
2332
2333    /// Remove a topic's per-subscription dispatch-rate policy.
2334    ///
2335    /// `DELETE /admin/v2/persistent/{tenant}/{ns}/{topic}/subscriptionDispatchRate`.
2336    /// Java: `PersistentTopicsBase#removeSubscriptionDispatchRate`.
2337    pub async fn topic_remove_subscription_dispatch_rate(
2338        &self,
2339        topic: &str,
2340    ) -> Result<(), AdminError> {
2341        let (tenant, namespace, name) = split_topic(topic)?;
2342        let url = self.url(&[
2343            "persistent",
2344            tenant,
2345            namespace,
2346            name,
2347            "subscriptionDispatchRate",
2348        ])?;
2349        let resp = self.send(self.http.request(Method::DELETE, url)).await?;
2350        empty_ok(resp).await
2351    }
2352
2353    /// Get a topic's cross-cluster replicator dispatch-rate policy (or `null`).
2354    ///
2355    /// `GET /admin/v2/persistent/{tenant}/{ns}/{topic}/replicatorDispatchRate`.
2356    /// Reuses the [`DispatchRate`] body shape — the policy throttles
2357    /// outbound geo-replication traffic from this cluster.
2358    /// Java: `PersistentTopicsBase#getReplicatorDispatchRate`.
2359    pub async fn topic_get_replicator_dispatch_rate(
2360        &self,
2361        topic: &str,
2362    ) -> Result<Option<DispatchRate>, AdminError> {
2363        let (tenant, namespace, name) = split_topic(topic)?;
2364        let url = self.url(&[
2365            "persistent",
2366            tenant,
2367            namespace,
2368            name,
2369            "replicatorDispatchRate",
2370        ])?;
2371        let resp = self.send(self.http.request(Method::GET, url)).await?;
2372        json_ok_optional(resp).await
2373    }
2374
2375    /// Set a topic's cross-cluster replicator dispatch-rate policy.
2376    ///
2377    /// `POST /admin/v2/persistent/{tenant}/{ns}/{topic}/replicatorDispatchRate`
2378    /// with a JSON `DispatchRate` body.
2379    /// Java: `PersistentTopicsBase#setReplicatorDispatchRate`.
2380    pub async fn topic_set_replicator_dispatch_rate(
2381        &self,
2382        topic: &str,
2383        rate: DispatchRate,
2384    ) -> Result<(), AdminError> {
2385        let (tenant, namespace, name) = split_topic(topic)?;
2386        let url = self.url(&[
2387            "persistent",
2388            tenant,
2389            namespace,
2390            name,
2391            "replicatorDispatchRate",
2392        ])?;
2393        let resp = self
2394            .send(self.http.request(Method::POST, url).json(&rate))
2395            .await?;
2396        empty_ok(resp).await
2397    }
2398
2399    /// Remove a topic's cross-cluster replicator dispatch-rate policy.
2400    ///
2401    /// `DELETE /admin/v2/persistent/{tenant}/{ns}/{topic}/replicatorDispatchRate`.
2402    /// Java: `PersistentTopicsBase#removeReplicatorDispatchRate`.
2403    pub async fn topic_remove_replicator_dispatch_rate(
2404        &self,
2405        topic: &str,
2406    ) -> Result<(), AdminError> {
2407        let (tenant, namespace, name) = split_topic(topic)?;
2408        let url = self.url(&[
2409            "persistent",
2410            tenant,
2411            namespace,
2412            name,
2413            "replicatorDispatchRate",
2414        ])?;
2415        let resp = self.send(self.http.request(Method::DELETE, url)).await?;
2416        empty_ok(resp).await
2417    }
2418
2419    /// Get a topic's publish-rate policy (or `null` if no override).
2420    ///
2421    /// `GET /admin/v2/persistent/{tenant}/{ns}/{topic}/publishRate`. Returns
2422    /// the per-topic [`PublishRate`] producer-side throttle (msg/sec +
2423    /// byte/sec). `-1` on either dimension means unlimited.
2424    /// Java: `PersistentTopicsBase#getPublishRate`.
2425    pub async fn topic_get_publish_rate(
2426        &self,
2427        topic: &str,
2428    ) -> Result<Option<PublishRate>, AdminError> {
2429        let (tenant, namespace, name) = split_topic(topic)?;
2430        let url = self.url(&["persistent", tenant, namespace, name, "publishRate"])?;
2431        let resp = self.send(self.http.request(Method::GET, url)).await?;
2432        json_ok_optional(resp).await
2433    }
2434
2435    /// Set a topic's publish-rate policy (overrides namespace default).
2436    ///
2437    /// `POST /admin/v2/persistent/{tenant}/{ns}/{topic}/publishRate` with a
2438    /// JSON `PublishRate` body. Java: `PersistentTopicsBase#setPublishRate`.
2439    pub async fn topic_set_publish_rate(
2440        &self,
2441        topic: &str,
2442        rate: PublishRate,
2443    ) -> Result<(), AdminError> {
2444        let (tenant, namespace, name) = split_topic(topic)?;
2445        let url = self.url(&["persistent", tenant, namespace, name, "publishRate"])?;
2446        let resp = self
2447            .send(self.http.request(Method::POST, url).json(&rate))
2448            .await?;
2449        empty_ok(resp).await
2450    }
2451
2452    /// Remove a topic's publish-rate policy.
2453    ///
2454    /// `DELETE /admin/v2/persistent/{tenant}/{ns}/{topic}/publishRate`.
2455    /// Java: `PersistentTopicsBase#removePublishRate`.
2456    pub async fn topic_remove_publish_rate(&self, topic: &str) -> Result<(), AdminError> {
2457        let (tenant, namespace, name) = split_topic(topic)?;
2458        let url = self.url(&["persistent", tenant, namespace, name, "publishRate"])?;
2459        let resp = self.send(self.http.request(Method::DELETE, url)).await?;
2460        empty_ok(resp).await
2461    }
2462
2463    /// Get a topic's max-producers cap (or `null` if no override).
2464    ///
2465    /// `GET /admin/v2/persistent/{tenant}/{ns}/{topic}/maxProducers`. Returns
2466    /// a bare integer when the override is set, `null` (decoded as
2467    /// `Option::None`) when no topic-level cap is in place.
2468    /// Java: `PersistentTopicsBase#getMaxProducers`.
2469    pub async fn topic_get_max_producers(&self, topic: &str) -> Result<Option<i32>, AdminError> {
2470        let (tenant, namespace, name) = split_topic(topic)?;
2471        let url = self.url(&["persistent", tenant, namespace, name, "maxProducers"])?;
2472        let resp = self.send(self.http.request(Method::GET, url)).await?;
2473        json_ok_optional(resp).await
2474    }
2475
2476    /// Set a topic's max-producers cap.
2477    ///
2478    /// `POST /admin/v2/persistent/{tenant}/{ns}/{topic}/maxProducers` with
2479    /// a bare integer body. `0` disables (broker treats as unlimited).
2480    /// Java: `PersistentTopicsBase#setMaxProducers`.
2481    pub async fn topic_set_max_producers(
2482        &self,
2483        topic: &str,
2484        max_producers: i32,
2485    ) -> Result<(), AdminError> {
2486        let (tenant, namespace, name) = split_topic(topic)?;
2487        let url = self.url(&["persistent", tenant, namespace, name, "maxProducers"])?;
2488        let resp = self
2489            .send(self.http.request(Method::POST, url).json(&max_producers))
2490            .await?;
2491        empty_ok(resp).await
2492    }
2493
2494    /// Remove a topic's max-producers cap (fall back to namespace / broker default).
2495    ///
2496    /// `DELETE /admin/v2/persistent/{tenant}/{ns}/{topic}/maxProducers`.
2497    /// Java: `PersistentTopicsBase#removeMaxProducers`.
2498    pub async fn topic_remove_max_producers(&self, topic: &str) -> Result<(), AdminError> {
2499        let (tenant, namespace, name) = split_topic(topic)?;
2500        let url = self.url(&["persistent", tenant, namespace, name, "maxProducers"])?;
2501        let resp = self.send(self.http.request(Method::DELETE, url)).await?;
2502        empty_ok(resp).await
2503    }
2504
2505    /// Get a topic's max-consumers cap (or `null` if no override).
2506    ///
2507    /// `GET /admin/v2/persistent/{tenant}/{ns}/{topic}/maxConsumers`. Returns
2508    /// a bare integer when the override is set, `null` (decoded as
2509    /// `Option::None`) when no topic-level cap is in place.
2510    /// Java: `PersistentTopicsBase#getMaxConsumers`.
2511    pub async fn topic_get_max_consumers(&self, topic: &str) -> Result<Option<i32>, AdminError> {
2512        let (tenant, namespace, name) = split_topic(topic)?;
2513        let url = self.url(&["persistent", tenant, namespace, name, "maxConsumers"])?;
2514        let resp = self.send(self.http.request(Method::GET, url)).await?;
2515        json_ok_optional(resp).await
2516    }
2517
2518    /// Set a topic's max-consumers cap.
2519    ///
2520    /// `POST /admin/v2/persistent/{tenant}/{ns}/{topic}/maxConsumers` with
2521    /// a bare integer body. `0` disables (broker treats as unlimited).
2522    /// Java: `PersistentTopicsBase#setMaxConsumers`.
2523    pub async fn topic_set_max_consumers(
2524        &self,
2525        topic: &str,
2526        max_consumers: i32,
2527    ) -> Result<(), AdminError> {
2528        let (tenant, namespace, name) = split_topic(topic)?;
2529        let url = self.url(&["persistent", tenant, namespace, name, "maxConsumers"])?;
2530        let resp = self
2531            .send(self.http.request(Method::POST, url).json(&max_consumers))
2532            .await?;
2533        empty_ok(resp).await
2534    }
2535
2536    /// Remove a topic's max-consumers cap (fall back to namespace / broker default).
2537    ///
2538    /// `DELETE /admin/v2/persistent/{tenant}/{ns}/{topic}/maxConsumers`.
2539    /// Java: `PersistentTopicsBase#removeMaxConsumers`.
2540    pub async fn topic_remove_max_consumers(&self, topic: &str) -> Result<(), AdminError> {
2541        let (tenant, namespace, name) = split_topic(topic)?;
2542        let url = self.url(&["persistent", tenant, namespace, name, "maxConsumers"])?;
2543        let resp = self.send(self.http.request(Method::DELETE, url)).await?;
2544        empty_ok(resp).await
2545    }
2546
2547    // --- Subscriptions ---------------------------------------------------
2548
2549    /// List subscription names on a topic.
2550    ///
2551    /// `GET /admin/v2/persistent/{tenant}/{namespace}/{topic}/subscriptions`.
2552    /// Java: `PersistentTopics#getSubscriptions`.
2553    pub async fn subscriptions_list(&self, topic: &str) -> Result<Vec<String>, AdminError> {
2554        let (tenant, namespace, name) = split_topic(topic)?;
2555        let url = self.url(&["persistent", tenant, namespace, name, "subscriptions"])?;
2556        let resp = self.send(self.http.request(Method::GET, url)).await?;
2557        json_ok(resp).await
2558    }
2559
2560    /// Reset a subscription's cursor to a specific message-id position.
2561    ///
2562    /// `POST /admin/v2/persistent/{tenant}/{namespace}/{topic}/subscription/{sub}/resetcursor`
2563    /// with body `{ledgerId, entryId, partitionIndex, batchIndex, isExcluded}`.
2564    /// `is_excluded = true` skips the message at `message_id` itself; `false`
2565    /// leaves it eligible for redelivery. Java: `PersistentTopics#resetCursorOnPosition`.
2566    pub async fn subscription_reset_cursor_to_position(
2567        &self,
2568        topic: &str,
2569        subscription: &str,
2570        message_id: MessageId,
2571        is_excluded: bool,
2572    ) -> Result<(), AdminError> {
2573        let (tenant, namespace, name) = split_topic(topic)?;
2574        validate_segment(subscription)?;
2575        let url = self.url(&[
2576            "persistent",
2577            tenant,
2578            namespace,
2579            name,
2580            "subscription",
2581            subscription,
2582            "resetcursor",
2583        ])?;
2584        let body = ResetCursorData {
2585            ledger_id: message_id_field_for_wire(message_id.ledger_id),
2586            entry_id: message_id_field_for_wire(message_id.entry_id),
2587            partition_index: message_id.partition,
2588            batch_index: message_id.batch_index,
2589            is_excluded,
2590        };
2591        let resp = self
2592            .send(self.http.request(Method::POST, url).json(&body))
2593            .await?;
2594        empty_ok(resp).await
2595    }
2596
2597    /// Reset a subscription's cursor to a wall-clock timestamp (millis since epoch).
2598    ///
2599    /// `POST /admin/v2/persistent/{tenant}/{namespace}/{topic}/subscription/{sub}/resetcursor/
2600    /// {timestamp}`. Java: `PersistentTopics#resetCursor(topic, sub, timestamp)`.
2601    pub async fn subscription_reset_cursor_to_timestamp(
2602        &self,
2603        topic: &str,
2604        subscription: &str,
2605        timestamp_millis: u64,
2606    ) -> Result<(), AdminError> {
2607        let (tenant, namespace, name) = split_topic(topic)?;
2608        validate_segment(subscription)?;
2609        let timestamp = timestamp_millis.to_string();
2610        let url = self.url(&[
2611            "persistent",
2612            tenant,
2613            namespace,
2614            name,
2615            "subscription",
2616            subscription,
2617            "resetcursor",
2618            &timestamp,
2619        ])?;
2620        let resp = self.send(self.http.request(Method::POST, url)).await?;
2621        empty_ok(resp).await
2622    }
2623
2624    /// Advance a subscription's cursor past N undelivered messages.
2625    ///
2626    /// `POST /admin/v2/persistent/{tenant}/{namespace}/{topic}/subscription/{sub}/skip/
2627    /// {numMessages}`. Java: `PersistentTopics#skipMessages`.
2628    pub async fn subscription_skip_messages(
2629        &self,
2630        topic: &str,
2631        subscription: &str,
2632        num_messages: u64,
2633    ) -> Result<(), AdminError> {
2634        let (tenant, namespace, name) = split_topic(topic)?;
2635        validate_segment(subscription)?;
2636        let n = num_messages.to_string();
2637        let url = self.url(&[
2638            "persistent",
2639            tenant,
2640            namespace,
2641            name,
2642            "subscription",
2643            subscription,
2644            "skip",
2645            &n,
2646        ])?;
2647        let resp = self.send(self.http.request(Method::POST, url)).await?;
2648        empty_ok(resp).await
2649    }
2650
2651    /// Drain the entire backlog of a subscription (clear-backlog).
2652    ///
2653    /// `POST /admin/v2/persistent/{tenant}/{namespace}/{topic}/subscription/{sub}/skip_all`.
2654    /// Java: `PersistentTopics#skipAllMessages`.
2655    pub async fn subscription_skip_all_messages(
2656        &self,
2657        topic: &str,
2658        subscription: &str,
2659    ) -> Result<(), AdminError> {
2660        let (tenant, namespace, name) = split_topic(topic)?;
2661        validate_segment(subscription)?;
2662        let url = self.url(&[
2663            "persistent",
2664            tenant,
2665            namespace,
2666            name,
2667            "subscription",
2668            subscription,
2669            "skip_all",
2670        ])?;
2671        let resp = self.send(self.http.request(Method::POST, url)).await?;
2672        empty_ok(resp).await
2673    }
2674
2675    /// Expire all messages older than `expire_time_seconds` for a subscription.
2676    ///
2677    /// `POST /admin/v2/persistent/{tenant}/{namespace}/{topic}/subscription/{sub}/expireMessages/
2678    /// {seconds}`. Java: `PersistentTopics#expireMessagesForSubscription`.
2679    pub async fn subscription_expire_messages(
2680        &self,
2681        topic: &str,
2682        subscription: &str,
2683        expire_time_seconds: u64,
2684    ) -> Result<(), AdminError> {
2685        let (tenant, namespace, name) = split_topic(topic)?;
2686        validate_segment(subscription)?;
2687        let s = expire_time_seconds.to_string();
2688        let url = self.url(&[
2689            "persistent",
2690            tenant,
2691            namespace,
2692            name,
2693            "subscription",
2694            subscription,
2695            "expireMessages",
2696            &s,
2697        ])?;
2698        let resp = self.send(self.http.request(Method::POST, url)).await?;
2699        empty_ok(resp).await
2700    }
2701
2702    /// Delete (unsubscribe) a subscription.
2703    ///
2704    /// `DELETE /admin/v2/persistent/{tenant}/{namespace}/{topic}/subscription/{sub}?force={force}`.
2705    /// `force = true` disconnects active consumers before deletion. Java:
2706    /// `PersistentTopics#deleteSubscription`.
2707    pub async fn subscription_delete(
2708        &self,
2709        topic: &str,
2710        subscription: &str,
2711        force: bool,
2712    ) -> Result<(), AdminError> {
2713        let (tenant, namespace, name) = split_topic(topic)?;
2714        validate_segment(subscription)?;
2715        let mut url = self.url(&[
2716            "persistent",
2717            tenant,
2718            namespace,
2719            name,
2720            "subscription",
2721            subscription,
2722        ])?;
2723        if force {
2724            url.query_pairs_mut().append_pair("force", "true");
2725        }
2726        let resp = self.send(self.http.request(Method::DELETE, url)).await?;
2727        empty_ok(resp).await
2728    }
2729
2730    // --- Shadow topics (PIP-180 / ADR-0033) ------------------------------
2731
2732    /// Create a shadow topic ([PIP-180](https://github.com/apache/pulsar/blob/master/pip/pip-180.md)).
2733    ///
2734    /// Creates the shadow as a regular non-partitioned topic with the broker-reserved
2735    /// `PULSAR.SHADOW_SOURCE` topic property pointing at the source topic:
2736    /// `PUT /admin/v2/persistent/{tenant}/{namespace}/{shadow}` with body
2737    /// `{ "PULSAR.SHADOW_SOURCE": "persistent://tenant/ns/source" }`.
2738    /// Then links the created shadow in the source topic policy list via
2739    /// `PUT /admin/v2/persistent/{tenant}/{namespace}/{source}/shadowTopics`.
2740    ///
2741    /// Java:
2742    /// `pulsar-client-admin/.../TopicsImpl.java#createShadowTopicAsync` builds the same property
2743    /// map before calling `createNonPartitionedTopicAsync` for non-partitioned sources.
2744    ///
2745    /// Errors mirror the existing `AdminError` taxonomy: 404 → `Status { code:
2746    /// 404, .. }` (the source topic does not exist), 409 → `Status { code:
2747    /// 409, .. }` (the shadow topic already exists on this source),
2748    /// 401/403 → `Status { code: 401|403, .. }` (auth).
2749    pub async fn create_shadow_topic(&self, source: &str, shadow: &str) -> Result<(), AdminError> {
2750        let (source_tenant, source_namespace, source_name) = split_topic(source)?;
2751        // Validate the shadow name eagerly so a misformatted argument errors
2752        // out with `InvalidName` rather than as a broker 4xx after we've
2753        // already crossed the wire.
2754        let (shadow_tenant, shadow_namespace, shadow_name) = split_topic(shadow)?;
2755        let source = format!("persistent://{source_tenant}/{source_namespace}/{source_name}");
2756        let shadow = format!("persistent://{shadow_tenant}/{shadow_namespace}/{shadow_name}");
2757        let mut properties = HashMap::new();
2758        properties.insert("PULSAR.SHADOW_SOURCE".to_owned(), source.clone());
2759        self.topic_create_non_partitioned_with_properties(&shadow, &properties)
2760            .await?;
2761        self.set_shadow_topics(&source, &[shadow]).await
2762    }
2763
2764    async fn set_shadow_topics(&self, source: &str, shadows: &[String]) -> Result<(), AdminError> {
2765        let (tenant, namespace, name) = split_topic(source)?;
2766        let url = self.url(&["persistent", tenant, namespace, name, "shadowTopics"])?;
2767        let resp = self
2768            .send(self.http.request(Method::PUT, url).json(shadows))
2769            .await?;
2770        empty_ok(resp).await
2771    }
2772
2773    /// Delete a shadow topic (PIP-180).
2774    ///
2775    /// `DELETE /admin/v2/persistent/{tenant}/{namespace}/{topic}` where
2776    /// `{topic}` is the **shadow** topic name. PIP-180's deletion contract
2777    /// goes through the regular topic-delete path on the shadow itself —
2778    /// the broker recognises the topic as a shadow and detaches it from
2779    /// the source ledger atomically with the metadata delete.
2780    ///
2781    /// `force` controls whether active subscribers are kicked off before
2782    /// the delete (`?force=true`) or whether the broker rejects the
2783    /// request when subscribers exist (`?force=false`, the default).
2784    ///
2785    /// Java: `org.apache.pulsar.client.admin.Topics#deleteShadowTopic` calls
2786    /// the same `@DELETE @Path("/{tenant}/{namespace}/{topic}")` endpoint.
2787    pub async fn delete_shadow_topic(&self, shadow: &str, force: bool) -> Result<(), AdminError> {
2788        let (tenant, namespace, name) = split_topic(shadow)?;
2789        let mut url = self.url(&["persistent", tenant, namespace, name])?;
2790        url.query_pairs_mut()
2791            .append_pair("force", if force { "true" } else { "false" });
2792        let resp = self.send(self.http.request(Method::DELETE, url)).await?;
2793        empty_ok(resp).await
2794    }
2795
2796    /// List the shadow topics created on a source topic (PIP-180).
2797    ///
2798    /// `GET /admin/v2/persistent/{tenant}/{namespace}/{topic}/shadowTopics`
2799    /// where `{topic}` is the **source** topic name. The broker returns a
2800    /// JSON array of fully-qualified shadow topic names.
2801    ///
2802    /// Java:
2803    /// `pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java`
2804    /// (`@GET @Path("/{tenant}/{namespace}/{topic}/shadowTopics")`).
2805    ///
2806    /// Used by the runtime engine at consumer subscribe time: when the user
2807    /// subscribes to a topic the runtime cannot yet classify, a single
2808    /// `get_shadow_topics` lookup on every other topic in the namespace is
2809    /// expensive; instead the runtime calls `get_shadow_topics(subscribed)`
2810    /// on the topic itself — a non-shadow topic returns an empty array, a
2811    /// shadow topic surfaces nothing but the broker has already populated
2812    /// the consumer's `shadow_metadata` via the topic's policy.
2813    /// (See `crates/magnetar-runtime-tokio/src/client.rs::subscribe`.)
2814    pub async fn get_shadow_topics(&self, source: &str) -> Result<Vec<String>, AdminError> {
2815        let (tenant, namespace, name) = split_topic(source)?;
2816        let url = self.url(&["persistent", tenant, namespace, name, "shadowTopics"])?;
2817        let resp = self.send(self.http.request(Method::GET, url)).await?;
2818        json_ok_or_default(resp).await
2819    }
2820
2821    /// Resolve the **source** topic of a shadow topic (PIP-180).
2822    ///
2823    /// `GET /admin/v2/persistent/{tenant}/{namespace}/{topic}/properties`.
2824    /// Returns the `PULSAR.SHADOW_SOURCE` property when the queried topic is a
2825    /// shadow; returns `None` when it is a regular topic. Used by the runtime
2826    /// at subscribe time to populate
2827    /// [`magnetar_proto::ShadowTopicMetadata::source_topic`] on the new
2828    /// consumer (so the receive path can emit
2829    /// [`magnetar_proto::ConnectionEvent::MessageReceivedFromShadow`]
2830    /// without an out-of-band lookup per message).
2831    ///
2832    /// Java: `org.apache.pulsar.client.admin.Topics#getShadowSource` —
2833    /// `TopicsImpl#getShadowSourceAsync` delegates to `getPropertiesAsync`.
2834    pub async fn get_shadow_source(&self, shadow: &str) -> Result<Option<String>, AdminError> {
2835        let (tenant, namespace, name) = split_topic(shadow)?;
2836        let url = self.url(&["persistent", tenant, namespace, name, "properties"])?;
2837        let resp = self.send(self.http.request(Method::GET, url)).await?;
2838        let mut properties: HashMap<String, String> = json_ok_or_default(resp).await?;
2839        Ok(properties
2840            .remove("PULSAR.SHADOW_SOURCE")
2841            .filter(|source| !source.is_empty()))
2842    }
2843
2844    // --- Pulsar IO Sources (/admin/v3/sources/...) ----------------------
2845
2846    /// List sources configured under a namespace.
2847    ///
2848    /// `GET /admin/v3/sources/{tenant}/{namespace}`. Returns the list of
2849    /// source names (the broker emits a JSON array of strings — one
2850    /// entry per declared source, regardless of running state).
2851    /// Java:
2852    /// `pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Sources.java`
2853    /// (`@Path("/sources")`) +
2854    /// `pulsar-broker/.../admin/impl/SourcesBase.java#listSources`.
2855    pub async fn sources_list_by_namespace(
2856        &self,
2857        tenant: &str,
2858        namespace: &str,
2859    ) -> Result<Vec<String>, AdminError> {
2860        validate_segment(tenant)?;
2861        validate_segment(namespace)?;
2862        let url = self.url_v3(&["sources", tenant, namespace])?;
2863        let resp = self.send(self.http.request(Method::GET, url)).await?;
2864        json_ok(resp).await
2865    }
2866
2867    /// Get one source's configuration.
2868    ///
2869    /// `GET /admin/v3/sources/{tenant}/{namespace}/{name}`. Returns the
2870    /// stored `SourceConfig` envelope as raw JSON — minor broker
2871    /// versions extend the shape (new connector knobs, secret refs)
2872    /// faster than a typed Rust DTO can keep up.
2873    /// Java: `SourcesBase#getSourceInfo`.
2874    pub async fn source_get(
2875        &self,
2876        tenant: &str,
2877        namespace: &str,
2878        name: &str,
2879    ) -> Result<serde_json::Value, AdminError> {
2880        validate_segment(tenant)?;
2881        validate_segment(namespace)?;
2882        validate_segment(name)?;
2883        let url = self.url_v3(&["sources", tenant, namespace, name])?;
2884        let resp = self.send(self.http.request(Method::GET, url)).await?;
2885        json_ok(resp).await
2886    }
2887
2888    /// Get a source's running status (per-instance worker telemetry).
2889    ///
2890    /// `GET /admin/v3/sources/{tenant}/{namespace}/{name}/status`.
2891    /// Returns the broker's `SourceStatus` envelope (`numInstances`,
2892    /// `numRunning`, per-instance `workerId` + `running` + last
2893    /// received timestamp). Exposed as raw JSON for forward-compat.
2894    /// Java: `SourcesBase#getSourceStatus`.
2895    pub async fn source_status(
2896        &self,
2897        tenant: &str,
2898        namespace: &str,
2899        name: &str,
2900    ) -> Result<serde_json::Value, AdminError> {
2901        validate_segment(tenant)?;
2902        validate_segment(namespace)?;
2903        validate_segment(name)?;
2904        let url = self.url_v3(&["sources", tenant, namespace, name, "status"])?;
2905        let resp = self.send(self.http.request(Method::GET, url)).await?;
2906        json_ok(resp).await
2907    }
2908
2909    /// Register a source from a remote package URL.
2910    ///
2911    /// `POST /admin/v3/sources/{tenant}/{namespace}/{name}` with
2912    /// `multipart/form-data` carrying two parts: a `url` text part (the
2913    /// package URL — `http(s)://`, `file://`, or `function://` per the
2914    /// broker's `WorkerUtils#downloadFileFromPackageUrl`) and a
2915    /// `sourceConfig` JSON part with the [`SourceConfig`] body.
2916    /// A sibling `source_create` (binary upload) is intentionally not
2917    /// yet exposed — this URL-based variant covers every
2918    /// CI/operator scenario that does not need to ship a JAR through
2919    /// the admin client itself.
2920    /// Java: `SourcesBase#registerSource`.
2921    pub async fn source_create_with_url(
2922        &self,
2923        tenant: &str,
2924        namespace: &str,
2925        name: &str,
2926        url: &str,
2927        config: SourceConfig,
2928    ) -> Result<(), AdminError> {
2929        validate_segment(tenant)?;
2930        validate_segment(namespace)?;
2931        validate_segment(name)?;
2932        let endpoint = self.url_v3(&["sources", tenant, namespace, name])?;
2933        let form = build_url_config_multipart(url, "sourceConfig", &config)?;
2934        let resp = self
2935            .send(
2936                self.http
2937                    .request(Method::POST, endpoint)
2938                    .multipart(form)
2939                    .timeout(PACKAGE_REGISTER_TIMEOUT),
2940            )
2941            .await?;
2942        empty_ok(resp).await
2943    }
2944
2945    /// Update a source from a remote package URL.
2946    ///
2947    /// `PUT /admin/v3/sources/{tenant}/{namespace}/{name}` with the
2948    /// same multipart shape as [`Self::source_create_with_url`].
2949    /// Java: `SourcesBase#updateSource`.
2950    pub async fn source_update_with_url(
2951        &self,
2952        tenant: &str,
2953        namespace: &str,
2954        name: &str,
2955        url: &str,
2956        config: SourceConfig,
2957    ) -> Result<(), AdminError> {
2958        validate_segment(tenant)?;
2959        validate_segment(namespace)?;
2960        validate_segment(name)?;
2961        let endpoint = self.url_v3(&["sources", tenant, namespace, name])?;
2962        let form = build_url_config_multipart(url, "sourceConfig", &config)?;
2963        let resp = self
2964            .send(
2965                self.http
2966                    .request(Method::PUT, endpoint)
2967                    .multipart(form)
2968                    .timeout(PACKAGE_REGISTER_TIMEOUT),
2969            )
2970            .await?;
2971        empty_ok(resp).await
2972    }
2973
2974    /// Delete a source.
2975    ///
2976    /// `DELETE /admin/v3/sources/{tenant}/{namespace}/{name}`. Removes
2977    /// the source declaration and tears the running instances down.
2978    /// Java: `SourcesBase#deregisterSource`.
2979    pub async fn source_delete(
2980        &self,
2981        tenant: &str,
2982        namespace: &str,
2983        name: &str,
2984    ) -> Result<(), AdminError> {
2985        validate_segment(tenant)?;
2986        validate_segment(namespace)?;
2987        validate_segment(name)?;
2988        let url = self.url_v3(&["sources", tenant, namespace, name])?;
2989        let resp = self.send(self.http.request(Method::DELETE, url)).await?;
2990        empty_ok(resp).await
2991    }
2992
2993    /// Start every instance of a source.
2994    ///
2995    /// `POST /admin/v3/sources/{tenant}/{namespace}/{name}/start`.
2996    /// Java: `SourcesBase#startSource`.
2997    pub async fn source_start(
2998        &self,
2999        tenant: &str,
3000        namespace: &str,
3001        name: &str,
3002    ) -> Result<(), AdminError> {
3003        validate_segment(tenant)?;
3004        validate_segment(namespace)?;
3005        validate_segment(name)?;
3006        let url = self.url_v3(&["sources", tenant, namespace, name, "start"])?;
3007        let resp = self.send(self.http.request(Method::POST, url)).await?;
3008        empty_ok(resp).await
3009    }
3010
3011    /// Stop every instance of a source.
3012    ///
3013    /// `POST /admin/v3/sources/{tenant}/{namespace}/{name}/stop`.
3014    /// Java: `SourcesBase#stopSource`.
3015    pub async fn source_stop(
3016        &self,
3017        tenant: &str,
3018        namespace: &str,
3019        name: &str,
3020    ) -> Result<(), AdminError> {
3021        validate_segment(tenant)?;
3022        validate_segment(namespace)?;
3023        validate_segment(name)?;
3024        let url = self.url_v3(&["sources", tenant, namespace, name, "stop"])?;
3025        let resp = self.send(self.http.request(Method::POST, url)).await?;
3026        empty_ok(resp).await
3027    }
3028
3029    /// Restart every instance of a source.
3030    ///
3031    /// `POST /admin/v3/sources/{tenant}/{namespace}/{name}/restart`.
3032    /// Java: `SourcesBase#restartSource`.
3033    pub async fn source_restart(
3034        &self,
3035        tenant: &str,
3036        namespace: &str,
3037        name: &str,
3038    ) -> Result<(), AdminError> {
3039        validate_segment(tenant)?;
3040        validate_segment(namespace)?;
3041        validate_segment(name)?;
3042        let url = self.url_v3(&["sources", tenant, namespace, name, "restart"])?;
3043        let resp = self.send(self.http.request(Method::POST, url)).await?;
3044        empty_ok(resp).await
3045    }
3046
3047    // --- Pulsar IO Sinks (/admin/v3/sinks/...) --------------------------
3048
3049    /// List sinks configured under a namespace.
3050    ///
3051    /// `GET /admin/v3/sinks/{tenant}/{namespace}`. Returns the list of
3052    /// sink names. Mirrors [`Self::sources_list_by_namespace`] —
3053    /// Pulsar's Sources / Sinks REST surfaces are intentionally
3054    /// symmetric.
3055    /// Java:
3056    /// `pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Sinks.java`
3057    /// (`@Path("/sinks")`) +
3058    /// `pulsar-broker/.../admin/impl/SinksBase.java#listSinks`.
3059    pub async fn sinks_list_by_namespace(
3060        &self,
3061        tenant: &str,
3062        namespace: &str,
3063    ) -> Result<Vec<String>, AdminError> {
3064        validate_segment(tenant)?;
3065        validate_segment(namespace)?;
3066        let url = self.url_v3(&["sinks", tenant, namespace])?;
3067        let resp = self.send(self.http.request(Method::GET, url)).await?;
3068        json_ok(resp).await
3069    }
3070
3071    /// Get one sink's configuration.
3072    ///
3073    /// `GET /admin/v3/sinks/{tenant}/{namespace}/{name}`. Returns the
3074    /// stored `SinkConfig` as raw JSON for the same forward-compat
3075    /// reason as [`Self::source_get`]. Java: `SinksBase#getSinkInfo`.
3076    pub async fn sink_get(
3077        &self,
3078        tenant: &str,
3079        namespace: &str,
3080        name: &str,
3081    ) -> Result<serde_json::Value, AdminError> {
3082        validate_segment(tenant)?;
3083        validate_segment(namespace)?;
3084        validate_segment(name)?;
3085        let url = self.url_v3(&["sinks", tenant, namespace, name])?;
3086        let resp = self.send(self.http.request(Method::GET, url)).await?;
3087        json_ok(resp).await
3088    }
3089
3090    /// Get a sink's running status (per-instance worker telemetry).
3091    ///
3092    /// `GET /admin/v3/sinks/{tenant}/{namespace}/{name}/status`.
3093    /// Same envelope shape as the Sources status. Raw JSON.
3094    /// Java: `SinksBase#getSinkStatus`.
3095    pub async fn sink_status(
3096        &self,
3097        tenant: &str,
3098        namespace: &str,
3099        name: &str,
3100    ) -> Result<serde_json::Value, AdminError> {
3101        validate_segment(tenant)?;
3102        validate_segment(namespace)?;
3103        validate_segment(name)?;
3104        let url = self.url_v3(&["sinks", tenant, namespace, name, "status"])?;
3105        let resp = self.send(self.http.request(Method::GET, url)).await?;
3106        json_ok(resp).await
3107    }
3108
3109    /// Register a sink from a remote package URL.
3110    ///
3111    /// `POST /admin/v3/sinks/{tenant}/{namespace}/{name}` with
3112    /// `multipart/form-data` carrying a `url` text part and a
3113    /// `sinkConfig` JSON part. Mirrors
3114    /// [`Self::source_create_with_url`]; the only wire-level
3115    /// difference is the JSON-part field name (`sinkConfig` vs
3116    /// `sourceConfig`). Java: `SinksBase#registerSink`.
3117    pub async fn sink_create_with_url(
3118        &self,
3119        tenant: &str,
3120        namespace: &str,
3121        name: &str,
3122        url: &str,
3123        config: SinkConfig,
3124    ) -> Result<(), AdminError> {
3125        validate_segment(tenant)?;
3126        validate_segment(namespace)?;
3127        validate_segment(name)?;
3128        let endpoint = self.url_v3(&["sinks", tenant, namespace, name])?;
3129        let form = build_url_config_multipart(url, "sinkConfig", &config)?;
3130        let resp = self
3131            .send(
3132                self.http
3133                    .request(Method::POST, endpoint)
3134                    .multipart(form)
3135                    .timeout(PACKAGE_REGISTER_TIMEOUT),
3136            )
3137            .await?;
3138        empty_ok(resp).await
3139    }
3140
3141    /// Update a sink from a remote package URL.
3142    ///
3143    /// `PUT /admin/v3/sinks/{tenant}/{namespace}/{name}` with the same
3144    /// multipart shape as [`Self::sink_create_with_url`].
3145    /// Java: `SinksBase#updateSink`.
3146    pub async fn sink_update_with_url(
3147        &self,
3148        tenant: &str,
3149        namespace: &str,
3150        name: &str,
3151        url: &str,
3152        config: SinkConfig,
3153    ) -> Result<(), AdminError> {
3154        validate_segment(tenant)?;
3155        validate_segment(namespace)?;
3156        validate_segment(name)?;
3157        let endpoint = self.url_v3(&["sinks", tenant, namespace, name])?;
3158        let form = build_url_config_multipart(url, "sinkConfig", &config)?;
3159        let resp = self
3160            .send(
3161                self.http
3162                    .request(Method::PUT, endpoint)
3163                    .multipart(form)
3164                    .timeout(PACKAGE_REGISTER_TIMEOUT),
3165            )
3166            .await?;
3167        empty_ok(resp).await
3168    }
3169
3170    /// Delete a sink.
3171    ///
3172    /// `DELETE /admin/v3/sinks/{tenant}/{namespace}/{name}`.
3173    /// Java: `SinksBase#deregisterSink`.
3174    pub async fn sink_delete(
3175        &self,
3176        tenant: &str,
3177        namespace: &str,
3178        name: &str,
3179    ) -> Result<(), AdminError> {
3180        validate_segment(tenant)?;
3181        validate_segment(namespace)?;
3182        validate_segment(name)?;
3183        let url = self.url_v3(&["sinks", tenant, namespace, name])?;
3184        let resp = self.send(self.http.request(Method::DELETE, url)).await?;
3185        empty_ok(resp).await
3186    }
3187
3188    /// Start every instance of a sink.
3189    ///
3190    /// `POST /admin/v3/sinks/{tenant}/{namespace}/{name}/start`.
3191    /// Java: `SinksBase#startSink`.
3192    pub async fn sink_start(
3193        &self,
3194        tenant: &str,
3195        namespace: &str,
3196        name: &str,
3197    ) -> Result<(), AdminError> {
3198        validate_segment(tenant)?;
3199        validate_segment(namespace)?;
3200        validate_segment(name)?;
3201        let url = self.url_v3(&["sinks", tenant, namespace, name, "start"])?;
3202        let resp = self.send(self.http.request(Method::POST, url)).await?;
3203        empty_ok(resp).await
3204    }
3205
3206    /// Stop every instance of a sink.
3207    ///
3208    /// `POST /admin/v3/sinks/{tenant}/{namespace}/{name}/stop`.
3209    /// Java: `SinksBase#stopSink`.
3210    pub async fn sink_stop(
3211        &self,
3212        tenant: &str,
3213        namespace: &str,
3214        name: &str,
3215    ) -> Result<(), AdminError> {
3216        validate_segment(tenant)?;
3217        validate_segment(namespace)?;
3218        validate_segment(name)?;
3219        let url = self.url_v3(&["sinks", tenant, namespace, name, "stop"])?;
3220        let resp = self.send(self.http.request(Method::POST, url)).await?;
3221        empty_ok(resp).await
3222    }
3223
3224    /// Restart every instance of a sink.
3225    ///
3226    /// `POST /admin/v3/sinks/{tenant}/{namespace}/{name}/restart`.
3227    /// Java: `SinksBase#restartSink`.
3228    pub async fn sink_restart(
3229        &self,
3230        tenant: &str,
3231        namespace: &str,
3232        name: &str,
3233    ) -> Result<(), AdminError> {
3234        validate_segment(tenant)?;
3235        validate_segment(namespace)?;
3236        validate_segment(name)?;
3237        let url = self.url_v3(&["sinks", tenant, namespace, name, "restart"])?;
3238        let resp = self.send(self.http.request(Method::POST, url)).await?;
3239        empty_ok(resp).await
3240    }
3241
3242    // --- Pulsar Packages (/admin/v3/packages/...) -----------------------
3243
3244    /// List package names declared under (`type`, `tenant`, `namespace`).
3245    ///
3246    /// `GET /admin/v3/packages/{type}/{tenant}/{namespace}`. Returns the
3247    /// list of package names — *not* versions; one entry per declared
3248    /// package. Use [`Self::package_versions_list`] to enumerate the
3249    /// versions of one package.
3250    /// Java: `pulsar-broker/src/main/java/org/apache/pulsar/broker/
3251    /// admin/v3/Packages.java#listPackages`.
3252    pub async fn packages_list(
3253        &self,
3254        pkg_type: PackageType,
3255        tenant: &str,
3256        namespace: &str,
3257    ) -> Result<Vec<String>, AdminError> {
3258        validate_segment(tenant)?;
3259        validate_segment(namespace)?;
3260        let url = self.url_v3(&["packages", pkg_type.as_str(), tenant, namespace])?;
3261        let resp = self.send(self.http.request(Method::GET, url)).await?;
3262        json_ok(resp).await
3263    }
3264
3265    /// List the versions declared for one package.
3266    ///
3267    /// `GET /admin/v3/packages/{type}/{tenant}/{namespace}/{name}`.
3268    /// Returns the list of version strings (Pulsar treats versions as
3269    /// opaque strings — `1.0.0`, `latest`, build hashes — and only the
3270    /// metadata endpoints understand them).
3271    /// Java: `PackagesBase#listPackageVersions`.
3272    pub async fn package_versions_list(
3273        &self,
3274        pkg_type: PackageType,
3275        tenant: &str,
3276        namespace: &str,
3277        name: &str,
3278    ) -> Result<Vec<String>, AdminError> {
3279        validate_segment(tenant)?;
3280        validate_segment(namespace)?;
3281        validate_segment(name)?;
3282        let url = self.url_v3(&["packages", pkg_type.as_str(), tenant, namespace, name])?;
3283        let resp = self.send(self.http.request(Method::GET, url)).await?;
3284        json_ok(resp).await
3285    }
3286
3287    /// Get the metadata envelope for one package version.
3288    ///
3289    /// `GET /admin/v3/packages/{type}/{tenant}/{namespace}/{name}/
3290    /// {version}/metadata`. Returns the `PackageMetadata` envelope as
3291    /// raw JSON for forward-compat — broker minor versions extend the
3292    /// shape with `tags`, `documentationUrl`, etc.
3293    /// Java: `PackagesBase#getPackageMetadata`.
3294    pub async fn package_metadata_get(
3295        &self,
3296        pkg_type: PackageType,
3297        tenant: &str,
3298        namespace: &str,
3299        name: &str,
3300        version: &str,
3301    ) -> Result<serde_json::Value, AdminError> {
3302        validate_segment(tenant)?;
3303        validate_segment(namespace)?;
3304        validate_segment(name)?;
3305        validate_segment(version)?;
3306        let url = self.url_v3(&[
3307            "packages",
3308            pkg_type.as_str(),
3309            tenant,
3310            namespace,
3311            name,
3312            version,
3313            "metadata",
3314        ])?;
3315        let resp = self.send(self.http.request(Method::GET, url)).await?;
3316        json_ok(resp).await
3317    }
3318
3319    /// Replace the metadata envelope for one package version.
3320    ///
3321    /// `PUT /admin/v3/packages/{type}/{tenant}/{namespace}/{name}/
3322    /// {version}/metadata` with a [`PackageMetadata`] JSON body.
3323    /// The broker rejects this verb with 404 if the package version
3324    /// does not exist — `package_metadata_set` is *update*, never
3325    /// *create*. Java: `PackagesBase#updatePackageMetadata`.
3326    pub async fn package_metadata_set(
3327        &self,
3328        pkg_type: PackageType,
3329        tenant: &str,
3330        namespace: &str,
3331        name: &str,
3332        version: &str,
3333        metadata: PackageMetadata,
3334    ) -> Result<(), AdminError> {
3335        validate_segment(tenant)?;
3336        validate_segment(namespace)?;
3337        validate_segment(name)?;
3338        validate_segment(version)?;
3339        let url = self.url_v3(&[
3340            "packages",
3341            pkg_type.as_str(),
3342            tenant,
3343            namespace,
3344            name,
3345            version,
3346            "metadata",
3347        ])?;
3348        let resp = self
3349            .send(self.http.request(Method::PUT, url).json(&metadata))
3350            .await?;
3351        empty_ok(resp).await
3352    }
3353
3354    /// Delete one package version.
3355    ///
3356    /// `DELETE /admin/v3/packages/{type}/{tenant}/{namespace}/{name}/
3357    /// {version}`. The broker drops the version's metadata + storage
3358    /// atomically; other versions of the same package are untouched.
3359    /// Java: `PackagesBase#deletePackage`.
3360    pub async fn package_delete(
3361        &self,
3362        pkg_type: PackageType,
3363        tenant: &str,
3364        namespace: &str,
3365        name: &str,
3366        version: &str,
3367    ) -> Result<(), AdminError> {
3368        validate_segment(tenant)?;
3369        validate_segment(namespace)?;
3370        validate_segment(name)?;
3371        validate_segment(version)?;
3372        let url = self.url_v3(&[
3373            "packages",
3374            pkg_type.as_str(),
3375            tenant,
3376            namespace,
3377            name,
3378            version,
3379        ])?;
3380        let resp = self.send(self.http.request(Method::DELETE, url)).await?;
3381        empty_ok(resp).await
3382    }
3383
3384    // --- Internal --------------------------------------------------------
3385
3386    /// Build a request URL by joining `segments` onto `base_url`. Each segment
3387    /// is percent-encoded for the URL path.
3388    fn url(&self, segments: &[&str]) -> Result<Url, AdminError> {
3389        Self::url_for(&self.base_url, segments)
3390    }
3391
3392    /// Build a request URL by joining `segments` onto the `/admin/v3/`
3393    /// base. Used by Pulsar Functions / IO Sources / IO Sinks /
3394    /// Packages, which Pulsar moved off of `/admin/v2/` historically.
3395    fn url_v3(&self, segments: &[&str]) -> Result<Url, AdminError> {
3396        Self::url_for(&self.base_url_v3, segments)
3397    }
3398
3399    /// Shared URL-builder body for the v2 / v3 helpers.
3400    fn url_for(base: &Url, segments: &[&str]) -> Result<Url, AdminError> {
3401        let mut url = base.clone();
3402        {
3403            // `Url::path_segments_mut` only fails for cannot-be-a-base URLs;
3404            // builder already rejected those.
3405            let mut path = url
3406                .path_segments_mut()
3407                .map_err(|()| AdminError::Builder("base url is cannot-be-a-base".into()))?;
3408            // Both `base_url` (anchored at `/admin/v2/`) and `base_url_v3`
3409            // (`/admin/v3/`) carry a trailing slash, which produces a
3410            // sentinel empty trailing segment in `path_segments_mut`. Drop
3411            // it before appending API segments — otherwise pushes land
3412            // after the empty, producing `/admin/v2//persistent/...`. Real
3413            // brokers tolerate the double slash; strict mocks (wiremock)
3414            // do not, and Java's `PulsarAdmin` emits the single-slash
3415            // form.
3416            path.pop_if_empty();
3417            for segment in segments {
3418                path.push(segment);
3419            }
3420        }
3421        Ok(url)
3422    }
3423
3424    /// Apply auth headers and dispatch.
3425    async fn send(&self, req: RequestBuilder) -> Result<Response, AdminError> {
3426        let req = match &self.auth {
3427            AdminAuth::None => req,
3428            AdminAuth::Token(tok) => {
3429                let value = format!("Bearer {tok}");
3430                let mut headers = HeaderMap::new();
3431                let header_value = HeaderValue::from_str(&value)
3432                    .map_err(|err| AdminError::Builder(format!("invalid bearer token: {err}")))?;
3433                headers.insert(AUTHORIZATION, header_value);
3434                req.headers(headers)
3435            }
3436        };
3437        Ok(req.send().await?)
3438    }
3439}
3440
3441/// Pulsar Functions configuration — the subset of Java's
3442/// `org.apache.pulsar.common.functions.FunctionConfig` that the URL-based
3443/// `register` / `update` calls actually require. The Java type carries
3444/// ~30 fields (process / k8s runtime tuning, secrets, deadletter
3445/// topics, …); we expose the load-bearing ones an operator running a
3446/// pre-compiled JAR needs. Unknown fields on the wire are tolerated by
3447/// the broker, so adding extra knobs is an additive change later.
3448///
3449/// `tenant` / `namespace` / `name` are duplicated in the body because
3450/// the broker re-validates them against the URL path (Java's
3451/// `WorkerUtils.validateFunctionName`). The CLI fills these in from the
3452/// fully qualified name the operator passes on the command line.
3453///
3454/// `runtime` is the string `"JAVA"`, `"PYTHON"`, or `"GO"` — matches
3455/// Java's `org.apache.pulsar.common.functions.FunctionConfig.Runtime`
3456/// enum serialised by name.
3457#[derive(Debug, Clone, Default, Deserialize, Serialize)]
3458#[serde(default, rename_all = "camelCase")]
3459pub struct FunctionConfig {
3460    /// Owning tenant.
3461    pub tenant: String,
3462    /// Owning namespace (bare name, not `tenant/namespace`).
3463    pub namespace: String,
3464    /// Function name (unique within the namespace).
3465    pub name: String,
3466    /// Fully qualified entry-point class name (`com.acme.MyFunction`
3467    /// for Java; `module.fn` for Python; `main` for Go).
3468    #[serde(rename = "className")]
3469    pub class_name: String,
3470    /// Input topics the function subscribes to.
3471    pub inputs: Vec<String>,
3472    /// Output topic the function produces to. Empty when the function
3473    /// has no output sink.
3474    pub output: String,
3475    /// Runtime — `"JAVA"`, `"PYTHON"`, or `"GO"`.
3476    pub runtime: String,
3477    /// Number of parallel instances the worker schedules.
3478    pub parallelism: i32,
3479    /// Optional opaque user-config map passed to the function's
3480    /// `Context#getUserConfigValue`. JSON object on the wire.
3481    #[serde(rename = "userConfig", skip_serializing_if = "Option::is_none")]
3482    pub user_config: Option<serde_json::Value>,
3483}
3484
3485/// Tenant policy info — admin roles and allowed clusters.
3486///
3487/// Mirrors Java's `org.apache.pulsar.common.policies.data.TenantInfoImpl` —
3488/// the JSON keys (`adminRoles`, `allowedClusters`) match upstream verbatim.
3489#[derive(Debug, Clone, Default, Serialize, Deserialize)]
3490pub struct TenantInfo {
3491    /// Roles permitted to administrate the tenant.
3492    #[serde(rename = "adminRoles")]
3493    pub admin_roles: Vec<String>,
3494    /// Cluster names the tenant may use.
3495    #[serde(rename = "allowedClusters")]
3496    pub allowed_clusters: Vec<String>,
3497}
3498
3499/// Wire shape of the PIP-415 `getMessageIdByIndex` response.
3500///
3501/// Mirrors Java's `MessageIdImpl` JSON shape (Jackson default property-name
3502/// serialisation): `{ledgerId, entryId, partitionIndex}`. See
3503/// `pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java`.
3504///
3505/// Kept as a deserialise-only DTO and converted into
3506/// [`magnetar_proto::MessageId`] at the boundary so callers do not see this
3507/// wire detail. Not exposed publicly.
3508#[derive(Debug, Clone, Deserialize)]
3509#[serde(rename_all = "camelCase")]
3510struct MessageIdResponse {
3511    ledger_id: i64,
3512    entry_id: i64,
3513    #[serde(default = "default_partition_index")]
3514    partition_index: i32,
3515}
3516
3517fn default_partition_index() -> i32 {
3518    -1
3519}
3520
3521impl MessageIdResponse {
3522    /// Convert the REST response into the canonical [`MessageId`]. The broker
3523    /// resolves at entry granularity, so `batch_index` / `batch_size` are not
3524    /// part of the JSON — they default to `-1` (the same sentinel
3525    /// `MessageId::from_pb` uses for `MessageIdData` without batch fields).
3526    ///
3527    /// Returns `AdminError::Protocol` if the broker emits a negative
3528    /// `ledgerId` or `entryId` — both fields are `u64` in the canonical type
3529    /// (matching the proto wire format) and Java's `MessageIdImpl` cannot
3530    /// represent negative values either, so a negative wire value is a
3531    /// broker bug we must surface rather than silently wrap.
3532    fn try_into_message_id(self) -> Result<MessageId, AdminError> {
3533        let ledger_id = u64::try_from(self.ledger_id).map_err(|_| {
3534            AdminError::Protocol(format!("negative ledgerId from broker: {}", self.ledger_id))
3535        })?;
3536        let entry_id = u64::try_from(self.entry_id).map_err(|_| {
3537            AdminError::Protocol(format!("negative entryId from broker: {}", self.entry_id))
3538        })?;
3539        Ok(MessageId {
3540            ledger_id,
3541            entry_id,
3542            partition: self.partition_index,
3543            batch_index: -1,
3544            batch_size: -1,
3545            // PIP-460 (ADR-0031): admin REST never resolves a scalable
3546            // segment id; the field only exists under `scalable-topics`.
3547            #[cfg(feature = "scalable-topics")]
3548            segment_id: None,
3549        })
3550    }
3551}
3552
3553/// Topic stats. Intentionally permissive: the Java
3554/// `PersistentTopicStatsImpl` shape is large and shifts between releases;
3555/// we extract the high-signal counters and pass the rest through as raw JSON.
3556#[derive(Debug, Clone, Default, Deserialize)]
3557#[serde(default)]
3558pub struct TopicStats {
3559    /// Total messages received.
3560    #[serde(rename = "msgInCounter")]
3561    pub msg_in_counter: i64,
3562    /// Total bytes received.
3563    #[serde(rename = "bytesInCounter")]
3564    pub bytes_in_counter: i64,
3565    /// Publishers, raw JSON because the schema is large and version-dependent.
3566    pub publishers: Vec<serde_json::Value>,
3567    /// Subscriptions map (raw JSON).
3568    pub subscriptions: serde_json::Value,
3569}
3570
3571/// Partitioned-topic metadata, as returned by
3572/// `GET /admin/v2/persistent/{tenant}/{namespace}/{topic}/partitions`.
3573/// Java: `org.apache.pulsar.common.partition.PartitionedTopicMetadata`.
3574/// Only the partition count is consumed; broker-side extensions are ignored.
3575#[derive(Debug, Clone, Default, Deserialize)]
3576#[serde(default)]
3577struct PartitionedTopicMetadata {
3578    partitions: u32,
3579}
3580
3581/// Java `RetentionPolicies` — namespace-level retention policy. `-1` for
3582/// either dimension means infinite. The broker applies whichever quota
3583/// becomes binding first (time OR size).
3584#[derive(Debug, Clone, Copy, Default, Deserialize, Serialize)]
3585#[serde(default, rename_all = "camelCase")]
3586pub struct RetentionPolicies {
3587    /// Maximum retention time in minutes. `-1` = infinite, `0` = none.
3588    pub retention_time_in_minutes: i32,
3589    /// Maximum retention size in megabytes. `-1` = infinite, `0` = none.
3590    #[serde(rename = "retentionSizeInMB")]
3591    pub retention_size_in_mb: i64,
3592}
3593
3594/// Java `PersistencePolicies` — namespace-level `BookKeeper` layout +
3595/// managed-ledger write-shaping knobs. Maps to the broker's
3596/// `org.apache.pulsar.common.policies.data.PersistencePolicies`.
3597///
3598/// `Default::default()` returns the broker's documented default for a
3599/// new namespace (`2/2/2/0.0`), NOT all zeros — the broker rejects
3600/// ensemble values < 1 on `set`, so an all-zero policy is unusable
3601/// for a round-trip. Missing fields on decode default the same way:
3602/// a partial body where the broker omits one field round-trips with
3603/// the legal default, never with the illegal `0`.
3604#[derive(Debug, Clone, Copy, Deserialize, Serialize)]
3605#[serde(rename_all = "camelCase")]
3606pub struct PersistencePolicies {
3607    /// `BookKeeper` ensemble size — the number of bookies the managed
3608    /// ledger striping is spread across. Default: 2.
3609    #[serde(default = "default_bookkeeper_quorum")]
3610    pub bookkeeper_ensemble: i32,
3611    /// `BookKeeper` write quorum — the number of bookies each entry is
3612    /// written to. Default: 2.
3613    #[serde(default = "default_bookkeeper_quorum")]
3614    pub bookkeeper_write_quorum: i32,
3615    /// `BookKeeper` ack quorum — the number of acks required before an
3616    /// add is considered durable. Default: 2.
3617    #[serde(default = "default_bookkeeper_quorum")]
3618    pub bookkeeper_ack_quorum: i32,
3619    /// Managed-ledger mark-delete-rate cap (ops/sec). `0.0` disables
3620    /// the throttle. Default: 0.0 (disabled).
3621    #[serde(default)]
3622    pub managed_ledger_max_mark_delete_rate: f64,
3623}
3624
3625impl Default for PersistencePolicies {
3626    fn default() -> Self {
3627        Self {
3628            bookkeeper_ensemble: 2,
3629            bookkeeper_write_quorum: 2,
3630            bookkeeper_ack_quorum: 2,
3631            managed_ledger_max_mark_delete_rate: 0.0,
3632        }
3633    }
3634}
3635
3636#[inline]
3637fn default_bookkeeper_quorum() -> i32 {
3638    2
3639}
3640
3641/// Java `DispatchRate` — a sliding-window throttle (msg/sec + byte/sec
3642/// over a `ratePeriodInSecond` window). Shared shape between the
3643/// per-namespace consumer dispatch rate, the per-subscription dispatch
3644/// rate, and the cross-cluster replicator dispatch rate.
3645///
3646/// `-1` on either rate dimension disables that axis of the throttle —
3647/// missing fields default to `-1` (not `0`) so a broker-omitted
3648/// dimension round-trips as "no throttle", never as "throttle to
3649/// zero" (which would block consumer dispatch on the namespace).
3650#[derive(Debug, Clone, Copy, Deserialize, Serialize)]
3651#[serde(rename_all = "camelCase")]
3652pub struct DispatchRate {
3653    /// Throttle in messages/sec. `-1` = unlimited.
3654    #[serde(default = "neg_one_i32")]
3655    pub dispatch_throttling_rate_in_msg: i32,
3656    /// Throttle in bytes/sec. `-1` = unlimited.
3657    #[serde(default = "neg_one_i64")]
3658    pub dispatch_throttling_rate_in_byte: i64,
3659    /// Window size in seconds the throttle averages over.
3660    #[serde(default = "default_rate_period_seconds")]
3661    pub rate_period_in_second: i32,
3662    /// If `true`, dispatch rate is interpreted as an addend on top of
3663    /// the namespace publish rate rather than an absolute cap.
3664    #[serde(default)]
3665    pub relative_to_publish_rate: bool,
3666}
3667
3668impl Default for DispatchRate {
3669    fn default() -> Self {
3670        Self {
3671            dispatch_throttling_rate_in_msg: -1,
3672            dispatch_throttling_rate_in_byte: -1,
3673            rate_period_in_second: 1,
3674            relative_to_publish_rate: false,
3675        }
3676    }
3677}
3678
3679/// Java `PublishRate` — producer-side throttle (msg/sec + byte/sec).
3680/// `-1` on either dimension disables that axis of the throttle.
3681/// Missing fields default to `-1` (not `0`) — same sentinel semantics
3682/// as [`DispatchRate`]. Unlike `DispatchRate`, there is no
3683/// rate-period field; the broker uses a fixed 1-second window.
3684#[derive(Debug, Clone, Copy, Deserialize, Serialize)]
3685#[serde(rename_all = "camelCase")]
3686pub struct PublishRate {
3687    /// Throttle in messages/sec. `-1` = unlimited.
3688    #[serde(default = "neg_one_i32")]
3689    pub publish_throttling_rate_in_msg: i32,
3690    /// Throttle in bytes/sec. `-1` = unlimited.
3691    #[serde(default = "neg_one_i64")]
3692    pub publish_throttling_rate_in_byte: i64,
3693}
3694
3695impl Default for PublishRate {
3696    fn default() -> Self {
3697        Self {
3698            publish_throttling_rate_in_msg: -1,
3699            publish_throttling_rate_in_byte: -1,
3700        }
3701    }
3702}
3703
3704#[inline]
3705fn neg_one_i32() -> i32 {
3706    -1
3707}
3708#[inline]
3709fn neg_one_i64() -> i64 {
3710    -1
3711}
3712#[inline]
3713fn default_rate_period_seconds() -> i32 {
3714    1
3715}
3716
3717/// Java `DelayedDeliveryPolicies` — namespace-level switch + index-tick
3718/// granularity for the broker's delayed-message delivery tracker.
3719/// Maps to `org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies`.
3720/// `tick_time` controls how often the broker's delay-index buckets are
3721/// re-evaluated; smaller values give tighter delivery accuracy at a
3722/// higher tracker cost.
3723///
3724/// The Java field name is `tickTime` (carrying a `@JsonProperty("tickTime")`
3725/// annotation), **not** `tickTimeMillis`. The unit is documented as
3726/// milliseconds (see the upstream class doc), but the wire key drops
3727/// the unit suffix — Jackson on the broker only binds the literal
3728/// `tickTime`.
3729#[derive(Debug, Clone, Copy, Default, Deserialize, Serialize)]
3730#[serde(default, rename_all = "camelCase")]
3731pub struct DelayedDeliveryPolicies {
3732    /// Whether delayed delivery is enabled for the namespace.
3733    pub active: bool,
3734    /// Index-tick granularity in milliseconds. Wire key `tickTime`.
3735    #[serde(rename = "tickTime")]
3736    pub tick_time_millis: i64,
3737}
3738
3739/// Java `BacklogQuota` — one entry in the namespace-level backlog quota
3740/// map. `policy` is a string (`producer_request_hold`,
3741/// `producer_exception`, `consumer_backlog_eviction`) rather than a
3742/// closed Rust enum so new broker enum values forward-decode cleanly.
3743///
3744/// `-1` on either limit dimension disables that axis — missing fields
3745/// default to `-1` (not `0`) so a broker-omitted dimension round-trips
3746/// as "no quota", never as "expire-everything" or "block-everything".
3747#[derive(Debug, Clone, Deserialize, Serialize)]
3748#[serde(rename_all = "camelCase")]
3749pub struct BacklogQuota {
3750    /// Maximum allowed backlog in bytes (when type=`destination_storage`).
3751    /// `-1` = unlimited.
3752    #[serde(default = "neg_one_i64")]
3753    pub limit_size: i64,
3754    /// Maximum allowed backlog age in seconds (when type=`message_age`).
3755    /// `-1` = unlimited.
3756    #[serde(default = "neg_one_i32")]
3757    pub limit_time: i32,
3758    /// Action when the quota is exceeded.
3759    #[serde(default)]
3760    pub policy: String,
3761}
3762
3763impl Default for BacklogQuota {
3764    fn default() -> Self {
3765        Self {
3766            limit_size: -1,
3767            limit_time: -1,
3768            policy: String::new(),
3769        }
3770    }
3771}
3772
3773/// Java `BookieInfo` — a single bookie's rack assignment, as stored in
3774/// the `racks-info` metadata path and shipped on
3775/// [`AdminClient::bookies_set_rack`]. Field names are camelCase on the
3776/// wire (matching `org.apache.pulsar.common.policies.data.BookieInfo`,
3777/// which carries only `rack` and `hostname`).
3778///
3779/// The placement group is **not** part of this body — Pulsar's
3780/// `BookiesBase#updateBookieRackInfo` exposes `group` as a
3781/// `@QueryParam`, and the JSON body Jackson-binds only to
3782/// `{rack, hostname}`. Treating it as a body field silently drops the
3783/// operator's group choice on the wire.
3784#[derive(Debug, Clone, Default, Deserialize, Serialize)]
3785#[serde(default, rename_all = "camelCase")]
3786pub struct BookieInfo {
3787    /// Rack identifier within the group — opaque to the broker, only
3788    /// the placement policy cares about it.
3789    pub rack: String,
3790    /// Resolved hostname for the bookie. The broker uses it for
3791    /// log lines; it does not have to match DNS.
3792    pub hostname: String,
3793}
3794
3795/// Java `PostSchemaPayload` — the request body for
3796/// [`AdminClient::schema_post`] and
3797/// [`AdminClient::schema_compatibility_check`]. The Java DTO has
3798/// (`type`, `schema`, `properties`); both keys travel as-is on the wire.
3799/// `schema` is the canonical-form blob for AVRO / JSON / PROTOBUF and
3800/// the protobuf descriptor for `PROTOBUF_NATIVE`.
3801#[derive(Debug, Clone, Default, Deserialize, Serialize)]
3802#[serde(default)]
3803pub struct PostSchemaPayload {
3804    /// Schema type (`AVRO` / `JSON` / `PROTOBUF` / `PROTOBUF_NATIVE` /
3805    /// `KEY_VALUE` / `STRING` / `BYTES` / ...).
3806    #[serde(rename = "type")]
3807    pub schema_type: String,
3808    /// Schema definition, encoded per the type axis.
3809    pub schema: String,
3810    /// User-defined per-schema properties.
3811    pub properties: std::collections::HashMap<String, String>,
3812}
3813
3814/// Java `SourceConfig` — declarative description of a Pulsar IO
3815/// Source. Mirrors `org.apache.pulsar.common.io.SourceConfig` (Jackson
3816/// camelCase on the wire). Only the fields the JAX-RS `create` /
3817/// `update` paths require are typed; per-connector knobs ride along in
3818/// the open-ended `configs` map so a forward broker can add fields
3819/// without a magnetar release.
3820#[derive(Debug, Clone, Default, Deserialize, Serialize)]
3821#[serde(default, rename_all = "camelCase")]
3822pub struct SourceConfig {
3823    /// Tenant owning the source. Must match the URL path tenant.
3824    pub tenant: String,
3825    /// Namespace owning the source. Must match the URL path namespace.
3826    pub namespace: String,
3827    /// Source name. Must match the URL path name.
3828    pub name: String,
3829    /// Fully-qualified connector class (e.g.
3830    /// `org.apache.pulsar.io.kafka.KafkaSource`).
3831    pub class_name: String,
3832    /// Destination topic the source writes to.
3833    pub topic_name: String,
3834    /// Number of source instances to schedule.
3835    pub parallelism: i32,
3836    /// Connector-specific configuration map. Skipped from JSON when
3837    /// `None` so a `null` does not override the broker default.
3838    #[serde(skip_serializing_if = "Option::is_none")]
3839    pub configs: Option<serde_json::Value>,
3840}
3841
3842/// Java `SinkConfig` — declarative description of a Pulsar IO Sink.
3843/// Mirrors `org.apache.pulsar.common.io.SinkConfig` (Jackson camelCase
3844/// on the wire). The `inputs` slot is the list of source topics the
3845/// sink reads from — the broker accepts either fully-qualified topic
3846/// names or `tenant/namespace/topic` shorthand. Per-connector knobs
3847/// ride in `configs` for the same forward-compat reason as
3848/// [`SourceConfig`].
3849#[derive(Debug, Clone, Default, Deserialize, Serialize)]
3850#[serde(default, rename_all = "camelCase")]
3851pub struct SinkConfig {
3852    /// Tenant owning the sink. Must match the URL path tenant.
3853    pub tenant: String,
3854    /// Namespace owning the sink. Must match the URL path namespace.
3855    pub namespace: String,
3856    /// Sink name. Must match the URL path name.
3857    pub name: String,
3858    /// Fully-qualified connector class (e.g.
3859    /// `org.apache.pulsar.io.jdbc.PostgresJdbcAutoSchemaSink`).
3860    pub class_name: String,
3861    /// Source topics the sink subscribes to.
3862    pub inputs: Vec<String>,
3863    /// Number of sink instances to schedule.
3864    pub parallelism: i32,
3865    /// Connector-specific configuration map. Skipped from JSON when
3866    /// `None` so a `null` does not override the broker default.
3867    #[serde(skip_serializing_if = "Option::is_none")]
3868    pub configs: Option<serde_json::Value>,
3869}
3870
3871/// Pulsar Packages namespace dimension — the `{type}` segment of the
3872/// `/admin/v3/packages/{type}/...` URL. Maps to upstream's
3873/// `PackageType` enum
3874/// (`pulsar-packages-management/pulsar-packages-management-core/.../
3875/// PackageType.java`): the broker only accepts the three lowercase
3876/// tokens `function`, `source`, `sink` and rejects everything else
3877/// with 400. Modelled as a closed Rust enum so the URL builder
3878/// cannot emit a value the broker will refuse.
3879#[derive(Debug, Clone, Copy, PartialEq, Eq)]
3880pub enum PackageType {
3881    /// `function` — Pulsar Functions JAR.
3882    Function,
3883    /// `source` — Pulsar IO Source NAR.
3884    Source,
3885    /// `sink` — Pulsar IO Sink NAR.
3886    Sink,
3887}
3888
3889impl PackageType {
3890    /// Render as the lowercase token the broker URL surface expects.
3891    #[must_use]
3892    pub fn as_str(self) -> &'static str {
3893        match self {
3894            Self::Function => "function",
3895            Self::Source => "source",
3896            Self::Sink => "sink",
3897        }
3898    }
3899}
3900
3901impl std::str::FromStr for PackageType {
3902    type Err = AdminError;
3903
3904    /// Parse from the lowercase tokens the broker emits (`function` /
3905    /// `source` / `sink`). Hyphenated aliases are accepted to make the
3906    /// CLI feel idiomatic — `package-type=source` vs the broker's
3907    /// `source` are equivalent.
3908    fn from_str(s: &str) -> Result<Self, AdminError> {
3909        match s.to_ascii_lowercase().as_str() {
3910            "function" | "functions" => Ok(Self::Function),
3911            "source" | "sources" => Ok(Self::Source),
3912            "sink" | "sinks" => Ok(Self::Sink),
3913            other => Err(AdminError::InvalidName(format!(
3914                "unknown package type {other:?} (expected: function | source | sink)"
3915            ))),
3916        }
3917    }
3918}
3919
3920/// Java `PackageMetadata` — the metadata envelope Pulsar Packages
3921/// attaches to each `(type, tenant, namespace, name, version)` tuple.
3922/// Mirrors `org.apache.pulsar.packages.management.core.common.PackageMetadata`
3923/// (Jackson camelCase on the wire). `modification_time` is a
3924/// broker-side timestamp in milliseconds-since-epoch — the broker
3925/// emits it on `GET` and ignores caller-supplied values on `PUT`
3926/// (overwriting them with the receive timestamp).
3927#[derive(Debug, Clone, Default, Deserialize, Serialize)]
3928#[serde(default, rename_all = "camelCase")]
3929pub struct PackageMetadata {
3930    /// Free-form package description.
3931    pub description: String,
3932    /// Maintainer contact (typically an email or team handle).
3933    pub contact: String,
3934    /// Last-modification timestamp in ms-since-epoch. Read-only for
3935    /// callers — the broker overwrites the value on `PUT`.
3936    pub modification_time: i64,
3937    /// Arbitrary key/value labels (release notes, CI ids, etc.).
3938    pub properties: std::collections::HashMap<String, String>,
3939}
3940
3941/// Java `BacklogQuotaType` — selects which dimension a `BacklogQuota`
3942/// entry limits.
3943#[derive(Debug, Clone, Copy, PartialEq, Eq)]
3944pub enum BacklogQuotaType {
3945    /// Bytes-on-disk dimension. Uses `BacklogQuota::limit_size`.
3946    DestinationStorage,
3947    /// Message-age dimension. Uses `BacklogQuota::limit_time`.
3948    MessageAge,
3949}
3950
3951impl BacklogQuotaType {
3952    /// Render as the lowercase `snake_case` value the broker REST surface
3953    /// expects in the `backlogQuotaType` query parameter.
3954    #[must_use]
3955    pub fn as_query_value(self) -> &'static str {
3956        match self {
3957            Self::DestinationStorage => "destination_storage",
3958            Self::MessageAge => "message_age",
3959        }
3960    }
3961}
3962
3963/// Java `LongRunningProcessStatus` — the polling shape for triggered
3964/// background jobs (compaction, offload). The broker returns one of four
3965/// `status` values: `NOT_RUN` (never triggered), `RUNNING`, `SUCCESS`,
3966/// `ERROR`. `last_error` is populated only on `ERROR`.
3967#[derive(Debug, Clone, Default, Deserialize, Serialize)]
3968#[serde(default, rename_all = "camelCase")]
3969pub struct LongRunningProcessStatus {
3970    /// Job state — `NOT_RUN`, `RUNNING`, `SUCCESS`, or `ERROR`.
3971    pub status: String,
3972    /// Human-readable error message, present on `ERROR`.
3973    pub last_error: String,
3974}
3975
3976/// Request body for `POST .../subscription/{sub}/resetcursor` (Java
3977/// `ResetCursorData`). The CLI exposes `message_id` and `is_excluded`;
3978/// Pulsar's `batchIndexes` / `properties` fields are not currently set
3979/// — they exist for transactional dedup metadata and would require
3980/// txn-aware callers anyway.
3981///
3982/// `ledger_id` / `entry_id` are `i64` on the wire because Pulsar
3983/// Jackson-binds them to Java `long`. `MessageId` on the Rust side uses
3984/// `u64` (matching the wire-protocol envelope), with `u64::MAX` as the
3985/// `EARLIEST` / `LATEST` sentinel — the conversion below maps those
3986/// sentinels to `-1` (the Java sentinel) so a reset-to-earliest /
3987/// reset-to-latest doesn't overflow Java's `long` parser.
3988#[derive(Debug, Serialize)]
3989#[serde(rename_all = "camelCase")]
3990struct ResetCursorData {
3991    ledger_id: i64,
3992    entry_id: i64,
3993    partition_index: i32,
3994    batch_index: i32,
3995    #[serde(rename = "isExcluded")]
3996    is_excluded: bool,
3997}
3998
3999/// Map a `MessageId` u64 ledger/entry id onto Pulsar's Java-`long`
4000/// wire field, translating the Rust-side `u64::MAX` sentinel (used by
4001/// `MessageId::EARLIEST` / `LATEST`) to Java's `-1` sentinel.
4002/// Non-sentinel values are passed through verbatim — Pulsar's
4003/// `LedgerHandle` / `EntryId` indices fit in `i64::MAX` long before
4004/// overflowing.
4005#[inline]
4006fn message_id_field_for_wire(value: u64) -> i64 {
4007    if value == u64::MAX {
4008        -1
4009    } else {
4010        // Pulsar entry indices fit in i63 — `as i64` cannot overflow
4011        // for any legitimate broker-emitted value.
4012        value as i64
4013    }
4014}
4015
4016/// Builder for [`AdminClient`].
4017#[derive(Debug, Default)]
4018pub struct AdminClientBuilder {
4019    base_url: Option<Url>,
4020    auth: AdminAuth,
4021    timeout: Option<Duration>,
4022}
4023
4024impl AdminClientBuilder {
4025    /// Set the service URL — the base for `/admin/v2/...`. Required.
4026    #[must_use]
4027    pub fn service_url(mut self, url: Url) -> Self {
4028        self.base_url = Some(url);
4029        self
4030    }
4031
4032    /// Configure bearer-token auth (`Authorization: Bearer <token>`).
4033    #[must_use]
4034    pub fn token(mut self, token: String) -> Self {
4035        self.auth = AdminAuth::Token(token);
4036        self
4037    }
4038
4039    /// Override the request timeout. Defaults to [`DEFAULT_TIMEOUT`].
4040    #[must_use]
4041    pub fn timeout(mut self, dur: Duration) -> Self {
4042        self.timeout = Some(dur);
4043        self
4044    }
4045
4046    /// Build the client.
4047    pub fn build(self) -> Result<AdminClient, AdminError> {
4048        let base_url = self
4049            .base_url
4050            .ok_or_else(|| AdminError::Builder("service_url is required".into()))?;
4051        if base_url.cannot_be_a_base() {
4052            return Err(AdminError::Builder(format!(
4053                "service_url cannot be a base url: {base_url}"
4054            )));
4055        }
4056        // Anchor every V2 API call below `/admin/v2/` and every V3 call
4057        // below `/admin/v3/`. We append the suffix here so callers pass
4058        // plain `http://broker:8080` rather than baking either prefix in.
4059        //
4060        // `Url::join` follows WHATWG semantics: if `base_url` has no
4061        // trailing slash, its last path segment is replaceable. So
4062        // `http://broker/pulsar` + `admin/v2/` would yield
4063        // `http://broker/admin/v2/` — the `pulsar` prefix silently
4064        // dropped (common for path-prefixed K8s ingresses). Normalise
4065        // the base to end with `/` first so the join always appends.
4066        let base_url = {
4067            let mut b = base_url.clone();
4068            if !b.path().ends_with('/') {
4069                b.set_path(&format!("{}/", b.path()));
4070            }
4071            b
4072        };
4073        let base_url_v3 = base_url.join("admin/v3/")?;
4074        let base_url = base_url.join("admin/v2/")?;
4075
4076        // reqwest 0.13 panics in `Client::builder().build()` when the active
4077        // `rustls` flavor is `rustls-no-provider` and no global
4078        // `CryptoProvider` is installed. That happens whenever more than one
4079        // `crypto-*` feature is unified (e.g. default `crypto-aws-lc-rs`
4080        // plus an explicit `crypto-ring`), so install the default here —
4081        // the shim is idempotent and a no-op once a provider is set, which
4082        // covers parallel callers and processes that also boot the tokio
4083        // engine.
4084        tls_crypto::install_default_provider();
4085
4086        let timeout = self.timeout.unwrap_or(DEFAULT_TIMEOUT);
4087        let http = reqwest::Client::builder()
4088            .timeout(timeout)
4089            .build()
4090            .map_err(AdminError::Http)?;
4091
4092        Ok(AdminClient {
4093            base_url,
4094            base_url_v3,
4095            http,
4096            auth: self.auth,
4097        })
4098    }
4099}
4100
4101/// Errors returned by the admin client.
4102#[derive(Debug, thiserror::Error)]
4103pub enum AdminError {
4104    /// Transport-layer error from `reqwest`.
4105    #[error("http error: {0}")]
4106    Http(#[from] reqwest::Error),
4107    /// API returned a non-success HTTP status.
4108    #[error("api error {code}: {body}")]
4109    Status {
4110        /// HTTP status code.
4111        code: u16,
4112        /// Response body (or a placeholder if reading the body failed).
4113        body: String,
4114    },
4115    /// JSON decode error.
4116    #[error("json decode: {0}")]
4117    Json(#[from] serde_json::Error),
4118    /// URL parse / construction error.
4119    #[error("invalid url: {0}")]
4120    Url(#[from] url::ParseError),
4121    /// Builder configuration error (missing service URL, invalid argument...).
4122    #[error("invalid builder: {0}")]
4123    Builder(String),
4124    /// Caller passed a namespace or topic name that the client could not parse.
4125    #[error("invalid name: {0}")]
4126    InvalidName(String),
4127    /// Broker returned a response that violates the documented wire contract
4128    /// (e.g. negative `ledgerId` from `getMessageIdByIndex`, which Java
4129    /// `MessageIdImpl` cannot represent either).
4130    #[error("broker protocol violation: {0}")]
4131    Protocol(String),
4132}
4133
4134/// Decode a non-error JSON response body.
4135async fn json_ok<T>(resp: Response) -> Result<T, AdminError>
4136where
4137    T: for<'de> Deserialize<'de>,
4138{
4139    let resp = ensure_status(resp).await?;
4140    let bytes = resp.bytes().await?;
4141    Ok(serde_json::from_slice(&bytes)?)
4142}
4143
4144/// Decode a JSON response body, defaulting to `T::default()` when the
4145/// broker returns 204 / empty body / literal `null`. Pulsar's
4146/// "policy-unset" pattern collapses to one of those three encodings:
4147///
4148/// - `getRetention` / `getPersistence` / `getDispatchRate` / `getPublishRate` on a fresh namespace
4149///   surface `null` through Jersey's `resume(null)`, which travels as 204 No Content;
4150/// - `getTopicPolicies` on a topic with no explicit policy returns either 204 or the literal JSON
4151///   `null`.
4152///
4153/// `json_ok::<RetentionPolicies>` errors with `EOF while parsing a
4154/// value` for either case; this helper maps them to
4155/// `RetentionPolicies::default()` (broker semantic: missing fields ==
4156/// broker default). Callers asserting "post-remove returns the
4157/// default" stay correct without changing the public signature.
4158async fn json_ok_or_default<T>(resp: Response) -> Result<T, AdminError>
4159where
4160    T: for<'de> Deserialize<'de> + Default,
4161{
4162    let resp = ensure_status(resp).await?;
4163    if resp.status() == StatusCode::NO_CONTENT {
4164        return Ok(T::default());
4165    }
4166    let bytes = resp.bytes().await?;
4167    if bytes.is_empty() {
4168        return Ok(T::default());
4169    }
4170    // `null` body — broker says "policy unset" — also folds to default.
4171    if bytes.as_ref().trim_ascii() == b"null" {
4172        return Ok(T::default());
4173    }
4174    Ok(serde_json::from_slice::<T>(&bytes)?)
4175}
4176
4177/// Decode a JSON response body that the broker may emit as an empty
4178/// payload to mean "no value here". Pulsar's pattern for "no override
4179/// set" on the policy GET endpoints is to return `204 No Content` (or
4180/// a 200 with an empty body) rather than the literal JSON `null`;
4181/// `serde_json::from_slice::<Option<T>>(b"")` errors with `EOF while
4182/// parsing a value`, so every `namespace_get_*` / `topic_get_*` that
4183/// returns `Option<T>` needs this tolerant decoder.
4184///
4185/// Decoding rules:
4186///   - 204 No Content                  → `Ok(None)`
4187///   - 2xx with empty body bytes       → `Ok(None)`
4188///   - 2xx with the literal `null`     → `Ok(None)`
4189///   - 2xx with a JSON value           → `Ok(Some(value))` via serde
4190async fn json_ok_optional<T>(resp: Response) -> Result<Option<T>, AdminError>
4191where
4192    T: for<'de> Deserialize<'de>,
4193{
4194    let resp = ensure_status(resp).await?;
4195    if resp.status() == StatusCode::NO_CONTENT {
4196        return Ok(None);
4197    }
4198    let bytes = resp.bytes().await?;
4199    if bytes.is_empty() {
4200        return Ok(None);
4201    }
4202    Ok(serde_json::from_slice::<Option<T>>(&bytes)?)
4203}
4204
4205/// Discard a successful no-content response body.
4206async fn empty_ok(resp: Response) -> Result<(), AdminError> {
4207    let _ = ensure_status(resp).await?;
4208    Ok(())
4209}
4210
4211/// Convert a non-success response into [`AdminError::Status`]. Returns the
4212/// original response on 2xx so the caller can decode the body.
4213async fn ensure_status(resp: Response) -> Result<Response, AdminError> {
4214    let status = resp.status();
4215    if status.is_success() || status == StatusCode::NO_CONTENT {
4216        return Ok(resp);
4217    }
4218    let code = status.as_u16();
4219    let body = resp
4220        .text()
4221        .await
4222        .unwrap_or_else(|err| format!("<failed to read body: {err}>"));
4223    Err(AdminError::Status { code, body })
4224}
4225
4226/// Split a `tenant/namespace` string into its two segments.
4227/// Reject path segments the `url` crate would silently rewrite. `.` and `..`
4228/// disappear under RFC 3986 dot-segment normalisation; percent-encoded slash
4229/// (`%2F` / `%2f`) lets a hostile name escape its segment; NUL / ASCII
4230/// control bytes have no place in an admin path. Refusing all of these at
4231/// the input boundary keeps the URL the client builds in lock-step with the
4232/// path the broker eventually parses.
4233fn validate_segment(segment: &str) -> Result<(), AdminError> {
4234    if segment.is_empty() {
4235        return Err(AdminError::InvalidName("empty path segment".into()));
4236    }
4237    if segment == "." || segment == ".." {
4238        return Err(AdminError::InvalidName(format!(
4239            "dot segment is not a valid name: {segment:?}",
4240        )));
4241    }
4242    if segment.contains("%2F") || segment.contains("%2f") {
4243        return Err(AdminError::InvalidName(format!(
4244            "percent-encoded slash in segment: {segment:?}",
4245        )));
4246    }
4247    if segment.bytes().any(|b| b < 0x20 || b == 0x7f) {
4248        return Err(AdminError::InvalidName(format!(
4249            "control byte in segment: {segment:?}",
4250        )));
4251    }
4252    Ok(())
4253}
4254
4255/// Split a `tenant/namespace` string into its two segments.
4256///
4257/// Exposed for the CLI (and any other admin-client wrapper) so the
4258/// `tenant/namespace` shape used by the namespace-scoped list verbs
4259/// (`functions list`, `sources list`, `sinks list`, `packages list`)
4260/// validates against the same `validate_segment` rules every admin
4261/// method enforces internally — no parallel parsers, no divergent
4262/// error categories.
4263pub fn split_namespace(ns: &str) -> Result<(&str, &str), AdminError> {
4264    let (tenant, namespace) = ns.split_once('/').ok_or_else(|| {
4265        AdminError::InvalidName(format!("expected tenant/namespace, got {ns:?} (no '/')"))
4266    })?;
4267    if tenant.is_empty() || namespace.is_empty() || namespace.contains('/') {
4268        return Err(AdminError::InvalidName(format!(
4269            "expected tenant/namespace, got {ns:?}"
4270        )));
4271    }
4272    validate_segment(tenant)?;
4273    validate_segment(namespace)?;
4274    Ok((tenant, namespace))
4275}
4276
4277/// Split a `persistent://tenant/namespace/topic` (or `tenant/namespace/topic`)
4278/// into its three path segments. The scheme is optional; if present it must
4279/// be `persistent://`.
4280fn split_topic(topic: &str) -> Result<(&str, &str, &str), AdminError> {
4281    let rest = topic.strip_prefix("persistent://").unwrap_or(topic);
4282    let mut parts = rest.splitn(3, '/');
4283    let tenant = parts.next().unwrap_or("");
4284    let namespace = parts.next().unwrap_or("");
4285    let name = parts.next().unwrap_or("");
4286    if tenant.is_empty() || namespace.is_empty() || name.is_empty() || name.contains('/') {
4287        return Err(AdminError::InvalidName(format!(
4288            "expected [persistent://]tenant/namespace/topic, got {topic:?}"
4289        )));
4290    }
4291    validate_segment(tenant)?;
4292    validate_segment(namespace)?;
4293    validate_segment(name)?;
4294    Ok((tenant, namespace, name))
4295}
4296
4297/// Build the two-part `multipart/form-data` envelope the broker expects
4298/// on URL-based function register / update calls. Order is fixed (`url`
4299/// then `functionConfig`) so wire-level tests can pin the body shape.
4300/// The `functionConfig` part carries an explicit `application/json`
4301/// content type; without it the broker's Jersey
4302/// `FormDataMultiPartFeature` falls back to `text/plain` and refuses
4303/// the JSON.
4304fn function_pkg_form(
4305    pkg_url: &str,
4306    config: &FunctionConfig,
4307) -> Result<reqwest::multipart::Form, AdminError> {
4308    build_url_config_multipart(pkg_url, "functionConfig", config)
4309}
4310
4311/// Split a `tenant/namespace/name` Functions / IO identifier into its
4312/// three segments. Pulsar Functions never carry a `persistent://`
4313/// scheme prefix (functions are not topics), so the parser is stricter
4314/// than the internal `split_topic` (rustdoc cannot resolve the bare
4315/// identifier because `split_topic` is module-private).
4316///
4317/// Exposed for the CLI, which parses the fully qualified name out of a
4318/// single positional argument before calling the admin methods (which
4319/// take `tenant`, `namespace`, `name` separately so the broker's
4320/// per-segment validation maps 1:1 to the URL path).
4321pub fn split_function_id(id: &str) -> Result<(&str, &str, &str), AdminError> {
4322    let mut parts = id.splitn(3, '/');
4323    let tenant = parts.next().unwrap_or("");
4324    let namespace = parts.next().unwrap_or("");
4325    let name = parts.next().unwrap_or("");
4326    if tenant.is_empty() || namespace.is_empty() || name.is_empty() || name.contains('/') {
4327        return Err(AdminError::InvalidName(format!(
4328            "expected tenant/namespace/name, got {id:?}"
4329        )));
4330    }
4331    validate_segment(tenant)?;
4332    validate_segment(namespace)?;
4333    validate_segment(name)?;
4334    Ok((tenant, namespace, name))
4335}
4336
4337/// Build the two-part `multipart/form-data` body Pulsar IO's
4338/// `register*` / `update*` endpoints expect when the package is
4339/// referenced by URL (`http(s)://`, `file://`, `function://`): a `url`
4340/// text part and a `<config_field>` JSON part. The broker enforces
4341/// both parts at the dispatcher boundary
4342/// (`SourcesBase#registerSource`, `SinksBase#registerSink`) — missing
4343/// either yields 400. Generic over the config type so Functions,
4344/// Sources, and Sinks share one helper (`functionConfig` /
4345/// `sourceConfig` / `sinkConfig` via the `config_field` argument).
4346fn build_url_config_multipart<T: Serialize>(
4347    pkg_url: &str,
4348    config_field: &str,
4349    config: &T,
4350) -> Result<reqwest::multipart::Form, AdminError> {
4351    let body = serde_json::to_string(config)?;
4352    // `mime_str` only fails on a malformed string; the literal we pass
4353    // is well-formed so the `expect` is on a never-taken branch.
4354    let config_part = reqwest::multipart::Part::text(body)
4355        .mime_str("application/json")
4356        .expect("application/json is a well-formed media type");
4357    let form = reqwest::multipart::Form::new()
4358        .text("url", pkg_url.to_owned())
4359        .part(config_field.to_owned(), config_part);
4360    Ok(form)
4361}
4362
4363#[cfg(test)]
4364mod tests {
4365    use super::*;
4366
4367    #[test]
4368    fn builder_requires_service_url() {
4369        let err = AdminClient::builder().build().unwrap_err();
4370        assert!(matches!(err, AdminError::Builder(_)));
4371    }
4372
4373    #[test]
4374    fn builder_appends_admin_v2_prefix() {
4375        let client = AdminClient::builder()
4376            .service_url("http://localhost:8080".parse().unwrap())
4377            .build()
4378            .unwrap();
4379        assert_eq!(
4380            client.base_url().as_str(),
4381            "http://localhost:8080/admin/v2/"
4382        );
4383    }
4384
4385    #[test]
4386    fn builder_carries_token() {
4387        let client = AdminClient::builder()
4388            .service_url("http://localhost:8080".parse().unwrap())
4389            .token("abc".into())
4390            .build()
4391            .unwrap();
4392        assert!(matches!(client.auth(), AdminAuth::Token(t) if t == "abc"));
4393    }
4394
4395    #[test]
4396    fn admin_auth_token_debug_redacts_secret() {
4397        let auth = AdminAuth::Token("super-secret-jwt".to_owned());
4398        let rendered = format!("{auth:?}");
4399        assert!(
4400            !rendered.contains("super-secret-jwt"),
4401            "raw token leaked through Debug: {rendered}",
4402        );
4403        assert!(
4404            rendered.contains("<redacted>"),
4405            "expected redaction sentinel in {rendered}"
4406        );
4407        assert!(
4408            rendered.contains("Token"),
4409            "expected variant name in {rendered}"
4410        );
4411
4412        let none_rendered = format!("{:?}", AdminAuth::None);
4413        assert_eq!(none_rendered, "None");
4414    }
4415
4416    #[test]
4417    fn admin_client_debug_does_not_leak_token() {
4418        let client = AdminClient::builder()
4419            .service_url("http://localhost:8080".parse().unwrap())
4420            .token("leaky-token".into())
4421            .build()
4422            .unwrap();
4423        let rendered = format!("{client:?}");
4424        assert!(
4425            !rendered.contains("leaky-token"),
4426            "raw token leaked through AdminClient Debug: {rendered}",
4427        );
4428    }
4429
4430    #[test]
4431    fn split_namespace_ok() {
4432        assert_eq!(
4433            split_namespace("public/default").unwrap(),
4434            ("public", "default")
4435        );
4436    }
4437
4438    #[test]
4439    fn split_namespace_rejects_missing_slash() {
4440        assert!(matches!(
4441            split_namespace("public"),
4442            Err(AdminError::InvalidName(_))
4443        ));
4444    }
4445
4446    #[test]
4447    fn split_namespace_rejects_extra_segment() {
4448        assert!(matches!(
4449            split_namespace("public/default/extra"),
4450            Err(AdminError::InvalidName(_))
4451        ));
4452    }
4453
4454    #[test]
4455    fn split_topic_with_scheme() {
4456        let (t, n, name) = split_topic("persistent://acme/svc/orders").unwrap();
4457        assert_eq!((t, n, name), ("acme", "svc", "orders"));
4458    }
4459
4460    #[test]
4461    fn split_topic_without_scheme() {
4462        let (t, n, name) = split_topic("acme/svc/orders").unwrap();
4463        assert_eq!((t, n, name), ("acme", "svc", "orders"));
4464    }
4465
4466    #[test]
4467    fn split_topic_rejects_short_name() {
4468        assert!(matches!(
4469            split_topic("acme/svc"),
4470            Err(AdminError::InvalidName(_))
4471        ));
4472    }
4473
4474    #[test]
4475    fn split_function_id_ok() {
4476        let (t, n, name) = split_function_id("public/default/my-fn").unwrap();
4477        assert_eq!((t, n, name), ("public", "default", "my-fn"));
4478    }
4479
4480    #[test]
4481    fn split_function_id_rejects_short_name() {
4482        assert!(matches!(
4483            split_function_id("public/default"),
4484            Err(AdminError::InvalidName(_))
4485        ));
4486    }
4487
4488    #[test]
4489    fn split_function_id_rejects_persistent_scheme() {
4490        // Functions never carry a `persistent://` prefix — the parser
4491        // refuses one rather than silently treating the scheme as the
4492        // tenant name (which `validate_segment` would later catch via
4493        // the percent-encoded slash rule, but we want a clearer error
4494        // up front).
4495        assert!(matches!(
4496            split_function_id("persistent://acme/svc/fn"),
4497            Err(AdminError::InvalidName(_))
4498        ));
4499    }
4500
4501    #[test]
4502    fn message_id_response_deserialises_java_camelcase() {
4503        // The exact body shape upstream PIP-415 §"Success Response" advertises.
4504        let json = r#"{"ledgerId":12345,"entryId":67890,"partitionIndex":0}"#;
4505        let dto: MessageIdResponse = serde_json::from_str(json).unwrap();
4506        let msg = dto.try_into_message_id().unwrap();
4507        assert_eq!(msg.ledger_id, 12345);
4508        assert_eq!(msg.entry_id, 67890);
4509        assert_eq!(msg.partition, 0);
4510        // The broker resolves at entry granularity — batch fields are absent
4511        // from the JSON and must default to -1 to match the canonical sentinel.
4512        assert_eq!(msg.batch_index, -1);
4513        assert_eq!(msg.batch_size, -1);
4514    }
4515
4516    #[test]
4517    fn message_id_response_defaults_partition_for_non_partitioned_topic() {
4518        // PIP-415 §"Success Response": `partitionIndex: -1` for non-partitioned
4519        // topics. Some broker versions omit the field entirely on
4520        // non-partitioned topics; serde default keeps us correct in either case.
4521        let json = r#"{"ledgerId":1,"entryId":2}"#;
4522        let dto: MessageIdResponse = serde_json::from_str(json).unwrap();
4523        assert_eq!(dto.try_into_message_id().unwrap().partition, -1);
4524    }
4525
4526    #[test]
4527    fn url_helper_emits_single_slash_after_admin_v2() {
4528        // Regression guard: the previous url() helper appended segments after
4529        // the trailing-slash sentinel of /admin/v2/, producing
4530        // /admin/v2//persistent/... — real brokers tolerated it but strict
4531        // mocks (and Java's PulsarAdmin) emit the single-slash form. Pin the
4532        // current behaviour so we notice any future regression.
4533        let client = AdminClient::builder()
4534            .service_url("http://broker.example:8080".parse().unwrap())
4535            .build()
4536            .unwrap();
4537        let url = client.url(&["clusters"]).unwrap();
4538        assert_eq!(url.as_str(), "http://broker.example:8080/admin/v2/clusters");
4539        let url2 = client
4540            .url(&["persistent", "public", "default", "topic", "stats"])
4541            .unwrap();
4542        assert_eq!(
4543            url2.as_str(),
4544            "http://broker.example:8080/admin/v2/persistent/public/default/topic/stats"
4545        );
4546    }
4547
4548    #[test]
4549    fn url_helper_preserves_path_prefix_without_trailing_slash() {
4550        // Regression guard: per WHATWG URL semantics, `Url::join("admin/v2/")`
4551        // on a base whose path has no trailing slash REPLACES the last segment
4552        // — `http://broker/pulsar` + `admin/v2/` becomes
4553        // `http://broker/admin/v2/`, silently dropping `/pulsar`. The builder
4554        // normalises the base to end in `/` before joining so any operator
4555        // running behind a path-prefixed K8s ingress (`--admin-url
4556        // http://gw/pulsar`) gets the right URLs on both V2 and V3 endpoints.
4557        let client = AdminClient::builder()
4558            .service_url("http://broker.example:8080/pulsar".parse().unwrap())
4559            .build()
4560            .unwrap();
4561        let url = client.url(&["clusters"]).unwrap();
4562        assert_eq!(
4563            url.as_str(),
4564            "http://broker.example:8080/pulsar/admin/v2/clusters"
4565        );
4566        let url_v3 = client.url_v3(&["functions", "a", "b"]).unwrap();
4567        assert_eq!(
4568            url_v3.as_str(),
4569            "http://broker.example:8080/pulsar/admin/v3/functions/a/b"
4570        );
4571    }
4572
4573    #[test]
4574    fn split_topic_rejects_dot_segments() {
4575        // LISA-001: `..` / `.` in any segment would silently normalise out via
4576        // url::Url::path_segments_mut, producing a client/server URL parser
4577        // differential. Refuse them at the input boundary.
4578        assert!(matches!(
4579            split_topic("persistent://../foo/bar"),
4580            Err(AdminError::InvalidName(_))
4581        ));
4582        assert!(matches!(
4583            split_topic("./foo/bar"),
4584            Err(AdminError::InvalidName(_))
4585        ));
4586        assert!(matches!(
4587            split_topic("tenant/./topic"),
4588            Err(AdminError::InvalidName(_))
4589        ));
4590    }
4591
4592    #[test]
4593    fn split_topic_rejects_control_bytes_and_percent_encoded_slash() {
4594        assert!(matches!(
4595            split_topic("tenant/ns/topic%2Fevil"),
4596            Err(AdminError::InvalidName(_))
4597        ));
4598        assert!(matches!(
4599            split_topic("tenant/ns/top\0ic"),
4600            Err(AdminError::InvalidName(_))
4601        ));
4602    }
4603}