magnetar_admin/lib.rs
1// SPDX-License-Identifier: Apache-2.0
2
3//! Apache Pulsar admin REST client (`/admin/v2/...`).
4//!
5//! Thin async wrapper around [`reqwest`] for the broker's JAX-RS admin API.
6//! TLS is via `rustls-tls`. There are no channels and no background tasks: every
7//! call is a one-shot `await` that resolves to a [`Result`].
8//!
9//! Endpoint paths mirror the broker. Each method's rustdoc cites the Java
10//! endpoint class (file + relevant `@Path` annotation) in `apache/pulsar` so a
11//! reader can confirm the URL and HTTP verb against upstream.
12//!
13//! ## Quick start
14//!
15//! ```no_run
16//! # async fn run() -> Result<(), Box<dyn std::error::Error>> {
17//! use magnetar_admin::{AdminClient, TenantInfo};
18//!
19//! let admin = AdminClient::builder()
20//! .service_url("http://localhost:8080".parse()?)
21//! .build()?;
22//!
23//! let tenants = admin.tenants_list().await?;
24//! println!("{tenants:?}");
25//!
26//! admin
27//! .tenant_create(
28//! "acme",
29//! TenantInfo {
30//! admin_roles: vec!["admin".into()],
31//! allowed_clusters: vec!["standalone".into()],
32//! },
33//! )
34//! .await?;
35//! # Ok(()) }
36//! ```
37
38#![warn(unreachable_pub)]
39#![forbid(unsafe_code)]
40
41mod tls_crypto;
42
43use std::collections::HashMap;
44use std::sync::Arc;
45use std::time::Duration;
46
47use magnetar_auth_oauth2::ClientCredentialsFlow;
48use magnetar_proto::MessageId;
49use reqwest::header::{AUTHORIZATION, HeaderMap, HeaderValue};
50use reqwest::{Method, RequestBuilder, Response, StatusCode};
51use serde::{Deserialize, Serialize};
52use url::Url;
53
54/// Default request timeout. Mirrors `PulsarAdminBuilder` Java default of 60s
55/// (see `pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/
56/// PulsarAdminBuilderImpl.java`).
57pub const DEFAULT_TIMEOUT: Duration = Duration::from_secs(60);
58
59/// Longer per-call timeout for `*_create_with_url` / `*_update_with_url`
60/// endpoints. The broker fetches the package from the supplied URL
61/// before responding, so the round-trip can comfortably exceed 60 s
62/// against a slow internal registry (S3 / GCS / function://) or a
63/// large `.nar` / `.jar` artifact. Overrides the client-level timeout
64/// for those calls only — every other admin verb keeps the 60 s
65/// budget [`DEFAULT_TIMEOUT`] provides.
66///
67/// 5 min matches the Java admin client's read-timeout for register
68/// paths.
69pub const PACKAGE_REGISTER_TIMEOUT: Duration = Duration::from_secs(300);
70
71/// Authentication strategy used by the admin client.
72///
73/// `Token(...)` adds `Authorization: Bearer <token>` to every request.
74/// Mirrors Java's `AuthenticationToken` provider. `OAuth2(...)` performs a
75/// `client_credentials` exchange against the IDP and attaches the resulting
76/// access token as a bearer credential — mirrors Java's
77/// `AuthenticationOAuth2`.
78#[derive(Clone, Default)]
79pub enum AdminAuth {
80 /// No authentication.
81 #[default]
82 None,
83 /// Bearer token. The string is the raw token; the `Bearer ` prefix is added
84 /// at request time.
85 Token(String),
86 /// `OAuth2` `client_credentials` flow. The cached access token is refreshed
87 /// (when missing or near expiry) and attached as `Authorization: Bearer
88 /// <access-token>` at request time. The flow is shared (`Arc`) so its
89 /// token cache is reused across every admin call.
90 OAuth2(Arc<ClientCredentialsFlow>),
91}
92
93impl std::fmt::Debug for AdminAuth {
94 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
95 // Redact the token body so calling `Debug` on the admin client never
96 // spills the bearer credential to tracing or stdout. Mirrors the
97 // `Credentials`/`ClientCredentialsFlow` Debug redaction in
98 // `magnetar-auth-oauth2`.
99 match self {
100 Self::None => f.write_str("None"),
101 Self::Token(_) => f.debug_tuple("Token").field(&"<redacted>").finish(),
102 // `ClientCredentialsFlow`'s own `Debug` already redacts the secret
103 // and cached token, so forwarding to it is safe.
104 Self::OAuth2(flow) => f.debug_tuple("OAuth2").field(flow).finish(),
105 }
106 }
107}
108
109/// Apache Pulsar admin REST client.
110///
111/// Holds two pre-computed base URLs: `base_url` anchored at
112/// `/admin/v2/` (clusters / tenants / namespaces / topics /
113/// subscriptions / brokers / bookies / schemas) and `base_url_v3`
114/// anchored at `/admin/v3/` (Pulsar Functions / IO Sources / IO Sinks
115/// / Packages). The split mirrors Pulsar's own routing — Java's
116/// `PulsarAdmin` keeps them separate too. Both URLs are derived from
117/// the same broker root at builder time, so a caller never has to
118/// know which version family an endpoint belongs to.
119#[derive(Debug, Clone)]
120pub struct AdminClient {
121 base_url: Url,
122 base_url_v3: Url,
123 http: reqwest::Client,
124 auth: AdminAuth,
125}
126
127impl AdminClient {
128 /// Start building an admin client.
129 #[must_use]
130 pub fn builder() -> AdminClientBuilder {
131 AdminClientBuilder::default()
132 }
133
134 /// Return the base URL the client targets (with the trailing `/admin/v2/`
135 /// component already appended). Exposed for tests and diagnostics.
136 #[must_use]
137 pub fn base_url(&self) -> &Url {
138 &self.base_url
139 }
140
141 /// Return the configured auth strategy. Exposed for tests and diagnostics.
142 #[must_use]
143 pub fn auth(&self) -> &AdminAuth {
144 &self.auth
145 }
146
147 // --- Cluster ---------------------------------------------------------
148
149 /// List clusters.
150 ///
151 /// `GET /admin/v2/clusters`.
152 /// Java: `pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Clusters.java`
153 /// (`@Path("/clusters")`) + `admin/impl/ClustersBase.java#getClusters`.
154 pub async fn cluster_list(&self) -> Result<Vec<String>, AdminError> {
155 let url = self.url(&["clusters"])?;
156 let resp = self.send(self.http.request(Method::GET, url)).await?;
157 json_ok(resp).await
158 }
159
160 /// List failure-domains configured on a cluster.
161 ///
162 /// `GET /admin/v2/clusters/{cluster}/failureDomains`. The broker returns
163 /// a `Map<String, FailureDomain>` keyed by domain name; each value
164 /// carries a `brokers: Set<String>` member. The map is exposed as a
165 /// raw `serde_json::Value` for forward-compat — broker minor versions
166 /// add fields.
167 /// Java: `ClustersBase#getFailureDomains`.
168 pub async fn cluster_failure_domains_list(
169 &self,
170 cluster: &str,
171 ) -> Result<serde_json::Value, AdminError> {
172 validate_segment(cluster)?;
173 let url = self.url(&["clusters", cluster, "failureDomains"])?;
174 let resp = self.send(self.http.request(Method::GET, url)).await?;
175 json_ok(resp).await
176 }
177
178 /// Get one failure-domain by name.
179 ///
180 /// `GET /admin/v2/clusters/{cluster}/failureDomains/{domain}`.
181 /// Java: `ClustersBase#getDomain`.
182 pub async fn cluster_failure_domain_get(
183 &self,
184 cluster: &str,
185 domain: &str,
186 ) -> Result<serde_json::Value, AdminError> {
187 validate_segment(cluster)?;
188 validate_segment(domain)?;
189 let url = self.url(&["clusters", cluster, "failureDomains", domain])?;
190 let resp = self.send(self.http.request(Method::GET, url)).await?;
191 json_ok(resp).await
192 }
193
194 /// List namespace-isolation policies configured on a cluster.
195 ///
196 /// `GET /admin/v2/clusters/{cluster}/namespaceIsolationPolicies`. The
197 /// broker returns a `Map<String, NamespaceIsolationData>` carrying
198 /// the namespace regex, primary/secondary broker lists, and the
199 /// auto-failover policy. Exposed as raw JSON for forward-compat.
200 ///
201 /// A cluster with no isolation policies configured surfaces as
202 /// `404 NamespaceIsolationPolicies for cluster ... does not exist`
203 /// (Pulsar 4.0.x) rather than an empty map; we mirror the Java
204 /// client's `Map<String, _>` surface by mapping that specific 404 to
205 /// an empty `{}` object. Other 404s (auth, wrong cluster) still
206 /// surface as `AdminError::Status`.
207 /// Java: `ClustersBase#getNamespaceIsolationPolicies`.
208 pub async fn namespace_isolation_policies_list(
209 &self,
210 cluster: &str,
211 ) -> Result<serde_json::Value, AdminError> {
212 validate_segment(cluster)?;
213 let url = self.url(&["clusters", cluster, "namespaceIsolationPolicies"])?;
214 let resp = self.send(self.http.request(Method::GET, url)).await?;
215 match json_ok::<serde_json::Value>(resp).await {
216 Ok(v) => Ok(v),
217 Err(AdminError::Status {
218 code: 404, body, ..
219 }) if body.contains("NamespaceIsolationPolicies") => {
220 Ok(serde_json::Value::Object(serde_json::Map::new()))
221 }
222 Err(e) => Err(e),
223 }
224 }
225
226 // --- Brokers ---------------------------------------------------------
227
228 /// List active brokers in a cluster.
229 ///
230 /// `GET /admin/v2/brokers/{cluster}`. Returns a list of `host:port`
231 /// strings — one entry per broker that's currently registered with
232 /// the cluster's metadata store. Java: `BrokersBase#getActiveBrokers`.
233 pub async fn brokers_list(&self, cluster: &str) -> Result<Vec<String>, AdminError> {
234 validate_segment(cluster)?;
235 let url = self.url(&["brokers", cluster])?;
236 let resp = self.send(self.http.request(Method::GET, url)).await?;
237 json_ok(resp).await
238 }
239
240 /// Get the current leader broker for the cluster.
241 ///
242 /// `GET /admin/v2/brokers/leaderBroker`. Returns `{ serviceUrl,
243 /// brokerId }`. Exposed as raw JSON for forward-compat — newer
244 /// brokers add `clusterName` and similar fields.
245 /// Java: `BrokersBase#getLeaderBroker`.
246 pub async fn brokers_leader(&self) -> Result<serde_json::Value, AdminError> {
247 let url = self.url(&["brokers", "leaderBroker"])?;
248 let resp = self.send(self.http.request(Method::GET, url)).await?;
249 json_ok(resp).await
250 }
251
252 /// List the names of all dynamic-config keys the broker exposes.
253 ///
254 /// `GET /admin/v2/brokers/configuration`. Returns the bare list of
255 /// `ServiceConfiguration` fields tagged `@FieldContext(dynamic = true)`
256 /// — the set of keys that `brokers_set_dynamic_config` accepts. Use
257 /// [`Self::brokers_dynamic_config_overrides`] for the current values.
258 ///
259 /// Pulsar 4 normally returns a JSON array (`List<String>` from
260 /// `BrokerService#getDynamicConfiguration`), but some packaging /
261 /// proxy paths surface the underlying `Map<String, ConfigField>`
262 /// shape instead. We accept both — array → values, object → keys —
263 /// to stay version-tolerant. Java: `BrokersBase#getDynamicConfigurationName`.
264 pub async fn brokers_dynamic_config_keys(&self) -> Result<Vec<String>, AdminError> {
265 let url = self.url(&["brokers", "configuration"])?;
266 let resp = self.send(self.http.request(Method::GET, url)).await?;
267 let v: serde_json::Value = json_ok(resp).await?;
268 match v {
269 serde_json::Value::Array(items) => Ok(items
270 .into_iter()
271 .filter_map(|x| x.as_str().map(str::to_owned))
272 .collect()),
273 serde_json::Value::Object(map) => Ok(map.into_iter().map(|(k, _)| k).collect()),
274 other => Err(AdminError::Protocol(format!(
275 "brokers/configuration returned unexpected shape: {other}"
276 ))),
277 }
278 }
279
280 /// Get the currently-overridden dynamic configuration values.
281 ///
282 /// `GET /admin/v2/brokers/configuration/values`. Returns a
283 /// `Map<String, String>` of every dynamic key the operator has set
284 /// (the broker omits keys still on their static / default value).
285 /// Exposed as raw JSON because broker minor versions add keys.
286 /// Java: `BrokersBase#getAllDynamicConfigurations`.
287 pub async fn brokers_dynamic_config_overrides(&self) -> Result<serde_json::Value, AdminError> {
288 let url = self.url(&["brokers", "configuration", "values"])?;
289 let resp = self.send(self.http.request(Method::GET, url)).await?;
290 json_ok(resp).await
291 }
292
293 /// Get the broker's runtime (merged static + dynamic) configuration.
294 ///
295 /// `GET /admin/v2/brokers/configuration/runtime`. Returns the full
296 /// `Map<String, String>` of `ServiceConfiguration` values as they
297 /// currently apply on the broker process — static defaults
298 /// overlaid with any `brokers_set_dynamic_config` overrides. Raw
299 /// JSON for forward-compat. Java: `BrokersBase#getRuntimeConfiguration`.
300 pub async fn brokers_runtime_config(&self) -> Result<serde_json::Value, AdminError> {
301 let url = self.url(&["brokers", "configuration", "runtime"])?;
302 let resp = self.send(self.http.request(Method::GET, url)).await?;
303 json_ok(resp).await
304 }
305
306 /// Get the broker's internal-stack endpoints.
307 ///
308 /// `GET /admin/v2/brokers/internal-configuration`. Returns the
309 /// `InternalConfigurationData` envelope — metadata-store URLs
310 /// (`zookeeperServers`, `configurationMetadataStoreUrl`),
311 /// `BookKeeper` metadata service URI, ledger root paths. Raw JSON
312 /// for forward-compat; the shape rolls between releases as the
313 /// metadata layer evolves.
314 /// Java: `BrokersBase#getInternalConfigurationData`.
315 pub async fn brokers_internal_config(&self) -> Result<serde_json::Value, AdminError> {
316 let url = self.url(&["brokers", "internal-configuration"])?;
317 let resp = self.send(self.http.request(Method::GET, url)).await?;
318 json_ok(resp).await
319 }
320
321 /// Probe broker health — produces and consumes one heartbeat message
322 /// on an internal topic.
323 ///
324 /// `GET /admin/v2/brokers/health`. The broker returns the plain-text
325 /// string `"ok"` on success; non-200 surfaces as `AdminError::Status`.
326 /// Java: `BrokersBase#healthCheck`.
327 pub async fn brokers_health_check(&self) -> Result<String, AdminError> {
328 let url = self.url(&["brokers", "health"])?;
329 let resp = self.send(self.http.request(Method::GET, url)).await?;
330 let resp = ensure_status(resp).await?;
331 Ok(resp.resp.text().await?)
332 }
333
334 /// List the namespaces a specific broker currently owns.
335 ///
336 /// `GET /admin/v2/brokers/{cluster}/{broker}/ownedNamespaces`. The
337 /// `broker` argument must be the broker's `host:port` (matching the
338 /// strings [`Self::brokers_list`] returns). Returns a
339 /// `Map<String, NamespaceOwnershipStatus>` keyed by namespace name —
340 /// raw JSON for forward-compat.
341 /// Java: `BrokersBase#getOwnedNamespaces`.
342 pub async fn brokers_owned_namespaces(
343 &self,
344 cluster: &str,
345 broker: &str,
346 ) -> Result<serde_json::Value, AdminError> {
347 validate_segment(cluster)?;
348 validate_segment(broker)?;
349 let url = self.url(&["brokers", cluster, broker, "ownedNamespaces"])?;
350 let resp = self.send(self.http.request(Method::GET, url)).await?;
351 json_ok(resp).await
352 }
353
354 /// Override a dynamic broker configuration value.
355 ///
356 /// `POST /admin/v2/brokers/configuration/{name}/{value}`. Both the
357 /// key and the value travel in the URL path — there is no request
358 /// body — matching the broker's `updateDynamicConfiguration(@PathParam
359 /// String configName, @PathParam String configValue)` signature.
360 /// The key must be one of those returned by
361 /// [`Self::brokers_dynamic_config_keys`]; unknown keys yield 412.
362 /// Java: `BrokersBase#updateDynamicConfiguration`.
363 pub async fn brokers_set_dynamic_config(
364 &self,
365 name: &str,
366 value: &str,
367 ) -> Result<(), AdminError> {
368 validate_segment(name)?;
369 validate_segment(value)?;
370 let url = self.url(&["brokers", "configuration", name, value])?;
371 let resp = self.send(self.http.request(Method::POST, url)).await?;
372 empty_ok(resp).await
373 }
374
375 /// Drop a dynamic configuration override, reverting to the static value.
376 ///
377 /// `DELETE /admin/v2/brokers/configuration/{name}`. After the call
378 /// the key disappears from [`Self::brokers_dynamic_config_overrides`]
379 /// and [`Self::brokers_runtime_config`] reflects the underlying
380 /// static / default value again.
381 /// Java: `BrokersBase#deleteDynamicConfiguration`.
382 pub async fn brokers_delete_dynamic_config(&self, name: &str) -> Result<(), AdminError> {
383 validate_segment(name)?;
384 let url = self.url(&["brokers", "configuration", name])?;
385 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
386 empty_ok(resp).await
387 }
388
389 // --- Bookies ---------------------------------------------------------
390
391 /// List every bookie the broker knows about — both writable and
392 /// read-only — as registered in `BookKeeper` metadata.
393 ///
394 /// `GET /admin/v2/bookies/all`. Returns the broker's
395 /// `BookiesClusterInfo` envelope — a `bookies: [{ address: "host:port" }]`
396 /// array. Raw JSON for forward-compat.
397 /// Java: `BookiesBase#getAllAvailableBookies`.
398 pub async fn bookies_list_all(&self) -> Result<serde_json::Value, AdminError> {
399 let url = self.url(&["bookies", "all"])?;
400 let resp = self.send(self.http.request(Method::GET, url)).await?;
401 json_ok(resp).await
402 }
403
404 /// Get every bookie's group + rack assignment, as configured for the
405 /// rack-aware placement policy.
406 ///
407 /// `GET /admin/v2/bookies/racks-info`. Returns the nested
408 /// `Map<group, Map<bookieAddress, BookieInfo>>` shape Pulsar
409 /// persists in metadata. Raw JSON because the wire shape exposes
410 /// nested maps that change between releases (the `default` group
411 /// is implicit on older brokers).
412 /// Java: `BookiesBase#getBookieRackInfo`.
413 pub async fn bookies_racks_info(&self) -> Result<serde_json::Value, AdminError> {
414 let url = self.url(&["bookies", "racks-info"])?;
415 let resp = self.send(self.http.request(Method::GET, url)).await?;
416 json_ok(resp).await
417 }
418
419 /// Set (or update) a bookie's rack assignment.
420 ///
421 /// `POST /admin/v2/bookies/racks-info/{bookie}?group={group}` with
422 /// a JSON [`BookieInfo`] body carrying only `{rack, hostname}`.
423 /// `bookie` is the `host:port` registered in `BookKeeper` metadata.
424 /// The placement policy picks up the new rack on its next
425 /// reconciliation tick.
426 ///
427 /// `group` is **a query parameter**, not a body field — Pulsar's
428 /// `BookiesBase#updateBookieRackInfo(@PathParam("bookie") String,
429 /// @QueryParam("group") String, BookieInfo)` Jackson-binds the body
430 /// to `{rack, hostname}` only; an unknown `group` body field is
431 /// silently ignored and the query param defaults to `null`,
432 /// dropping the operator's group choice on the wire.
433 /// Java: `BookiesBase#updateBookieRackInfo`.
434 pub async fn bookies_set_rack(
435 &self,
436 bookie: &str,
437 group: &str,
438 info: BookieInfo,
439 ) -> Result<(), AdminError> {
440 validate_segment(bookie)?;
441 let mut url = self.url(&["bookies", "racks-info", bookie])?;
442 url.query_pairs_mut().append_pair("group", group);
443 let resp = self
444 .send(self.http.request(Method::POST, url).json(&info))
445 .await?;
446 empty_ok(resp).await
447 }
448
449 /// Remove a bookie's rack assignment.
450 ///
451 /// `DELETE /admin/v2/bookies/racks-info/{bookie}`. The bookie falls
452 /// back to the placement policy's default group / rack until
453 /// [`Self::bookies_set_rack`] is called again.
454 /// Java: `BookiesBase#deleteBookieRackInfo`.
455 pub async fn bookies_delete_rack(&self, bookie: &str) -> Result<(), AdminError> {
456 validate_segment(bookie)?;
457 let url = self.url(&["bookies", "racks-info", bookie])?;
458 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
459 empty_ok(resp).await
460 }
461
462 // --- Schemas ---------------------------------------------------------
463
464 /// Get the latest schema attached to a topic.
465 ///
466 /// `GET /admin/v2/schemas/{tenant}/{ns}/{topic}/schema`. Returns
467 /// `{ version, type, schema, properties, timestamp }`; raw JSON
468 /// because the `type` axis (`AVRO` / `JSON` / `PROTOBUF` /
469 /// `PROTOBUF_NATIVE` / `KEY_VALUE` / `STRING` / `BYTES` / …) is
470 /// open-ended and broker minor versions add keys (deletion
471 /// tombstones surface as `type: "DELETE"` on the GET, for
472 /// instance). Java: `SchemasResourceBase#getSchema`.
473 pub async fn schema_get_latest(&self, topic: &str) -> Result<serde_json::Value, AdminError> {
474 let (tenant, namespace, name) = split_topic(topic)?;
475 let url = self.url(&["schemas", tenant, namespace, name, "schema"])?;
476 let resp = self.send(self.http.request(Method::GET, url)).await?;
477 json_ok(resp).await
478 }
479
480 /// Get a specific schema version attached to a topic.
481 ///
482 /// `GET /admin/v2/schemas/{tenant}/{ns}/{topic}/schema/{version}`.
483 /// `version` is the monotonically-increasing integer the broker
484 /// assigns at registration. Same wire shape as
485 /// [`Self::schema_get_latest`].
486 /// Java: `SchemasResourceBase#getSchema` (with version path param).
487 pub async fn schema_get_version(
488 &self,
489 topic: &str,
490 version: i64,
491 ) -> Result<serde_json::Value, AdminError> {
492 let (tenant, namespace, name) = split_topic(topic)?;
493 let v = version.to_string();
494 let url = self.url(&["schemas", tenant, namespace, name, "schema", &v])?;
495 let resp = self.send(self.http.request(Method::GET, url)).await?;
496 json_ok(resp).await
497 }
498
499 /// List every schema version registered for a topic.
500 ///
501 /// `GET /admin/v2/schemas/{tenant}/{ns}/{topic}/schemas`. Pulsar 4
502 /// wraps the per-version entries in a
503 /// `GetAllVersionsSchemaResponse { getSchemaResponses: [...] }`
504 /// envelope (verified against `apache/pulsar@v4.0.4`
505 /// `SchemasResourceBase#convertToAllVersionsSchemaResponse`). We
506 /// unwrap that envelope at the boundary so callers see the flat
507 /// `Vec<Value>` they expect; a bare-array shape (older or
508 /// alternative serialisations) is still accepted. Raw JSON
509 /// per-entry for forward-compat. Java:
510 /// `SchemasResourceBase#getAllSchemas`.
511 pub async fn schema_list_versions(
512 &self,
513 topic: &str,
514 ) -> Result<Vec<serde_json::Value>, AdminError> {
515 let (tenant, namespace, name) = split_topic(topic)?;
516 let url = self.url(&["schemas", tenant, namespace, name, "schemas"])?;
517 let resp = self.send(self.http.request(Method::GET, url)).await?;
518 let v: serde_json::Value = json_ok(resp).await?;
519 match v {
520 serde_json::Value::Array(items) => Ok(items),
521 serde_json::Value::Object(mut envelope) => {
522 if let Some(serde_json::Value::Array(items)) = envelope.remove("getSchemaResponses")
523 {
524 return Ok(items);
525 }
526 Err(AdminError::Protocol(format!(
527 "schemas/.../schemas envelope missing `getSchemaResponses` array: {}",
528 serde_json::Value::Object(envelope)
529 )))
530 }
531 other => Err(AdminError::Protocol(format!(
532 "schemas/.../schemas returned unexpected shape: {other}"
533 ))),
534 }
535 }
536
537 /// Register a new schema version on a topic.
538 ///
539 /// `POST /admin/v2/schemas/{tenant}/{ns}/{topic}/schema` with a JSON
540 /// [`PostSchemaPayload`] body. The broker returns `{ version: N }`;
541 /// raw JSON because the upstream response envelope wraps the
542 /// version under `data` on some 4.x point releases. Compatibility
543 /// is enforced server-side per the namespace's
544 /// `schemaCompatibilityStrategy` — incompatible posts fail with
545 /// 409. Java: `SchemasResourceBase#postSchema`.
546 pub async fn schema_post(
547 &self,
548 topic: &str,
549 payload: PostSchemaPayload,
550 ) -> Result<serde_json::Value, AdminError> {
551 let (tenant, namespace, name) = split_topic(topic)?;
552 let url = self.url(&["schemas", tenant, namespace, name, "schema"])?;
553 let resp = self
554 .send(self.http.request(Method::POST, url).json(&payload))
555 .await?;
556 json_ok(resp).await
557 }
558
559 /// Delete a topic's schema.
560 ///
561 /// `DELETE /admin/v2/schemas/{tenant}/{ns}/{topic}/schema?force={force}`.
562 /// `force = true` skips the broker's "is the schema in use"
563 /// guard — equivalent to `pulsar-admin schemas delete --force`.
564 /// Java: `SchemasResourceBase#deleteSchema`.
565 pub async fn schema_delete(&self, topic: &str, force: bool) -> Result<(), AdminError> {
566 let (tenant, namespace, name) = split_topic(topic)?;
567 let mut url = self.url(&["schemas", tenant, namespace, name, "schema"])?;
568 url.query_pairs_mut()
569 .append_pair("force", if force { "true" } else { "false" });
570 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
571 empty_ok(resp).await
572 }
573
574 /// Check whether a candidate schema would be compatible with the
575 /// topic's current schema.
576 ///
577 /// `POST /admin/v2/schemas/{tenant}/{ns}/{topic}/compatibility` with
578 /// a JSON [`PostSchemaPayload`] body — the same shape
579 /// [`Self::schema_post`] sends, but the broker only evaluates
580 /// compatibility and never persists. Returns `{ isCompatible:
581 /// bool, schemaCompatibilityStrategy: "..." }`; raw JSON for
582 /// forward-compat.
583 /// Java: `SchemasResourceBase#testCompatibility`.
584 pub async fn schema_compatibility_check(
585 &self,
586 topic: &str,
587 payload: PostSchemaPayload,
588 ) -> Result<serde_json::Value, AdminError> {
589 let (tenant, namespace, name) = split_topic(topic)?;
590 let url = self.url(&["schemas", tenant, namespace, name, "compatibility"])?;
591 let resp = self
592 .send(self.http.request(Method::POST, url).json(&payload))
593 .await?;
594 json_ok(resp).await
595 }
596
597 // --- Pulsar Functions (read) ----------------------------------------
598
599 /// List every function registered under a namespace.
600 ///
601 /// `GET /admin/v3/functions/{tenant}/{namespace}`. Returns a JSON
602 /// array of bare function names (no tenant / namespace prefix) —
603 /// matches Java's `FunctionsBase#listFunctions` body shape.
604 pub async fn functions_list_by_namespace(
605 &self,
606 tenant: &str,
607 namespace: &str,
608 ) -> Result<Vec<String>, AdminError> {
609 validate_segment(tenant)?;
610 validate_segment(namespace)?;
611 let url = self.url_v3(&["functions", tenant, namespace])?;
612 let resp = self.send(self.http.request(Method::GET, url)).await?;
613 json_ok(resp).await
614 }
615
616 /// Get a function's registered `FunctionConfig`.
617 ///
618 /// `GET /admin/v3/functions/{tenant}/{namespace}/{name}`. The
619 /// `FunctionConfig` Java type has ~30 fields and grows on every
620 /// minor release — return raw JSON for forward-compat. Java:
621 /// `FunctionsBase#getFunctionInfo`.
622 pub async fn function_get(
623 &self,
624 tenant: &str,
625 namespace: &str,
626 name: &str,
627 ) -> Result<serde_json::Value, AdminError> {
628 validate_segment(tenant)?;
629 validate_segment(namespace)?;
630 validate_segment(name)?;
631 let url = self.url_v3(&["functions", tenant, namespace, name])?;
632 let resp = self.send(self.http.request(Method::GET, url)).await?;
633 json_ok(resp).await
634 }
635
636 /// Get a function's aggregate status (all instances).
637 ///
638 /// `GET /admin/v3/functions/{tenant}/{namespace}/{name}/status`.
639 /// Returns Java's `FunctionStatus` envelope:
640 /// `{numInstances, numRunning, instances: [...]}`. Raw JSON because
641 /// the per-instance shape carries broker-version-dependent fields.
642 /// Java: `FunctionsBase#getFunctionStatus`.
643 pub async fn function_status(
644 &self,
645 tenant: &str,
646 namespace: &str,
647 name: &str,
648 ) -> Result<serde_json::Value, AdminError> {
649 validate_segment(tenant)?;
650 validate_segment(namespace)?;
651 validate_segment(name)?;
652 let url = self.url_v3(&["functions", tenant, namespace, name, "status"])?;
653 let resp = self.send(self.http.request(Method::GET, url)).await?;
654 json_ok(resp).await
655 }
656
657 /// Get a function's aggregate runtime statistics (all instances).
658 ///
659 /// `GET /admin/v3/functions/{tenant}/{namespace}/{name}/stats`.
660 /// Returns Java's `FunctionStats` envelope — message rates,
661 /// processed counts, average latency, per-instance breakdown. Raw
662 /// JSON for forward-compat. Java: `FunctionsBase#getFunctionStats`.
663 pub async fn function_stats(
664 &self,
665 tenant: &str,
666 namespace: &str,
667 name: &str,
668 ) -> Result<serde_json::Value, AdminError> {
669 validate_segment(tenant)?;
670 validate_segment(namespace)?;
671 validate_segment(name)?;
672 let url = self.url_v3(&["functions", tenant, namespace, name, "stats"])?;
673 let resp = self.send(self.http.request(Method::GET, url)).await?;
674 json_ok(resp).await
675 }
676
677 /// Get one instance's status.
678 ///
679 /// `GET /admin/v3/functions/{tenant}/{namespace}/{name}/{instance_id}/status`.
680 /// `instance_id` is the integer index the broker assigns at
681 /// schedule time (`0..parallelism`). Java:
682 /// `FunctionsBase#getFunctionInstanceStatus`.
683 pub async fn function_instance_status(
684 &self,
685 tenant: &str,
686 namespace: &str,
687 name: &str,
688 instance_id: i32,
689 ) -> Result<serde_json::Value, AdminError> {
690 validate_segment(tenant)?;
691 validate_segment(namespace)?;
692 validate_segment(name)?;
693 let id = instance_id.to_string();
694 let url = self.url_v3(&["functions", tenant, namespace, name, &id, "status"])?;
695 let resp = self.send(self.http.request(Method::GET, url)).await?;
696 json_ok(resp).await
697 }
698
699 /// Get one instance's runtime statistics.
700 ///
701 /// `GET /admin/v3/functions/{tenant}/{namespace}/{name}/{instance_id}/stats`.
702 /// Java: `FunctionsBase#getFunctionInstanceStats`.
703 pub async fn function_instance_stats(
704 &self,
705 tenant: &str,
706 namespace: &str,
707 name: &str,
708 instance_id: i32,
709 ) -> Result<serde_json::Value, AdminError> {
710 validate_segment(tenant)?;
711 validate_segment(namespace)?;
712 validate_segment(name)?;
713 let id = instance_id.to_string();
714 let url = self.url_v3(&["functions", tenant, namespace, name, &id, "stats"])?;
715 let resp = self.send(self.http.request(Method::GET, url)).await?;
716 json_ok(resp).await
717 }
718
719 // --- Pulsar Functions (URL-based register / update) ----------------
720
721 /// Register a function from a remote package URL.
722 ///
723 /// `POST /admin/v3/functions/{tenant}/{namespace}/{name}` with a
724 /// `multipart/form-data` body carrying two parts: `url=<pkg-url>`
725 /// (the broker-resolvable HTTP / `function://` / `file://` URL of
726 /// the compiled package) and `functionConfig=<json>` (the
727 /// serialised [`FunctionConfig`]). The local-file upload path
728 /// (Java's `FunctionsBase#registerFunction` with a
729 /// `FormDataMultiPart` `data` part) is intentionally out of scope
730 /// for this method — operators with a pre-built JAR served over
731 /// HTTP / S3 / GCS use the URL path.
732 ///
733 /// The two-part envelope matches Java's
734 /// `FunctionsBase#registerFunction(@PathParam tenant, ...,
735 /// @FormDataParam("url") String functionPkgUrl,
736 /// @FormDataParam("functionConfig") FunctionConfig functionConfig)`
737 /// — when `data` is null and `url` is non-null the broker takes the
738 /// URL fast-path and skips the upload step.
739 pub async fn function_create_with_url(
740 &self,
741 tenant: &str,
742 namespace: &str,
743 name: &str,
744 url: &str,
745 config: FunctionConfig,
746 ) -> Result<(), AdminError> {
747 validate_segment(tenant)?;
748 validate_segment(namespace)?;
749 validate_segment(name)?;
750 let endpoint = self.url_v3(&["functions", tenant, namespace, name])?;
751 let form = function_pkg_form(url, &config)?;
752 let resp = self
753 .send(
754 self.http
755 .request(Method::POST, endpoint)
756 .multipart(form)
757 .timeout(PACKAGE_REGISTER_TIMEOUT),
758 )
759 .await?;
760 empty_ok(resp).await
761 }
762
763 /// Update an existing function from a remote package URL.
764 ///
765 /// `PUT /admin/v3/functions/{tenant}/{namespace}/{name}` with the
766 /// same two-part `multipart/form-data` shape as
767 /// [`Self::function_create_with_url`]. Java:
768 /// `FunctionsBase#updateFunction` with non-null `pkgUrl`.
769 pub async fn function_update_with_url(
770 &self,
771 tenant: &str,
772 namespace: &str,
773 name: &str,
774 url: &str,
775 config: FunctionConfig,
776 ) -> Result<(), AdminError> {
777 validate_segment(tenant)?;
778 validate_segment(namespace)?;
779 validate_segment(name)?;
780 let endpoint = self.url_v3(&["functions", tenant, namespace, name])?;
781 let form = function_pkg_form(url, &config)?;
782 let resp = self
783 .send(
784 self.http
785 .request(Method::PUT, endpoint)
786 .multipart(form)
787 .timeout(PACKAGE_REGISTER_TIMEOUT),
788 )
789 .await?;
790 empty_ok(resp).await
791 }
792
793 // --- Pulsar Functions (lifecycle) -----------------------------------
794
795 /// Deregister (delete) a function.
796 ///
797 /// `DELETE /admin/v3/functions/{tenant}/{namespace}/{name}`. The
798 /// broker stops every running instance and drops the
799 /// `FunctionConfig` from metadata. Java:
800 /// `FunctionsBase#deregisterFunction`.
801 pub async fn function_delete(
802 &self,
803 tenant: &str,
804 namespace: &str,
805 name: &str,
806 ) -> Result<(), AdminError> {
807 validate_segment(tenant)?;
808 validate_segment(namespace)?;
809 validate_segment(name)?;
810 let url = self.url_v3(&["functions", tenant, namespace, name])?;
811 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
812 empty_ok(resp).await
813 }
814
815 /// Start every instance of a function (idempotent).
816 ///
817 /// `POST /admin/v3/functions/{tenant}/{namespace}/{name}/start`. No
818 /// body. Java: `FunctionsBase#startFunction`.
819 pub async fn function_start(
820 &self,
821 tenant: &str,
822 namespace: &str,
823 name: &str,
824 ) -> Result<(), AdminError> {
825 validate_segment(tenant)?;
826 validate_segment(namespace)?;
827 validate_segment(name)?;
828 let url = self.url_v3(&["functions", tenant, namespace, name, "start"])?;
829 let resp = self.send(self.http.request(Method::POST, url)).await?;
830 empty_ok(resp).await
831 }
832
833 /// Stop every instance of a function.
834 ///
835 /// `POST /admin/v3/functions/{tenant}/{namespace}/{name}/stop`. The
836 /// broker leaves the `FunctionConfig` in metadata; a subsequent
837 /// `function_start` brings it back. Java:
838 /// `FunctionsBase#stopFunction`.
839 pub async fn function_stop(
840 &self,
841 tenant: &str,
842 namespace: &str,
843 name: &str,
844 ) -> Result<(), AdminError> {
845 validate_segment(tenant)?;
846 validate_segment(namespace)?;
847 validate_segment(name)?;
848 let url = self.url_v3(&["functions", tenant, namespace, name, "stop"])?;
849 let resp = self.send(self.http.request(Method::POST, url)).await?;
850 empty_ok(resp).await
851 }
852
853 /// Restart every instance of a function.
854 ///
855 /// `POST /admin/v3/functions/{tenant}/{namespace}/{name}/restart`.
856 /// Java: `FunctionsBase#restartFunction`.
857 pub async fn function_restart(
858 &self,
859 tenant: &str,
860 namespace: &str,
861 name: &str,
862 ) -> Result<(), AdminError> {
863 validate_segment(tenant)?;
864 validate_segment(namespace)?;
865 validate_segment(name)?;
866 let url = self.url_v3(&["functions", tenant, namespace, name, "restart"])?;
867 let resp = self.send(self.http.request(Method::POST, url)).await?;
868 empty_ok(resp).await
869 }
870
871 /// Start one specific instance.
872 ///
873 /// `POST /admin/v3/functions/{tenant}/{namespace}/{name}/{instance_id}/start`.
874 /// Java: `FunctionsBase#startFunctionInstance`.
875 pub async fn function_start_instance(
876 &self,
877 tenant: &str,
878 namespace: &str,
879 name: &str,
880 instance_id: i32,
881 ) -> Result<(), AdminError> {
882 validate_segment(tenant)?;
883 validate_segment(namespace)?;
884 validate_segment(name)?;
885 let id = instance_id.to_string();
886 let url = self.url_v3(&["functions", tenant, namespace, name, &id, "start"])?;
887 let resp = self.send(self.http.request(Method::POST, url)).await?;
888 empty_ok(resp).await
889 }
890
891 /// Stop one specific instance.
892 ///
893 /// `POST /admin/v3/functions/{tenant}/{namespace}/{name}/{instance_id}/stop`.
894 /// Java: `FunctionsBase#stopFunctionInstance`.
895 pub async fn function_stop_instance(
896 &self,
897 tenant: &str,
898 namespace: &str,
899 name: &str,
900 instance_id: i32,
901 ) -> Result<(), AdminError> {
902 validate_segment(tenant)?;
903 validate_segment(namespace)?;
904 validate_segment(name)?;
905 let id = instance_id.to_string();
906 let url = self.url_v3(&["functions", tenant, namespace, name, &id, "stop"])?;
907 let resp = self.send(self.http.request(Method::POST, url)).await?;
908 empty_ok(resp).await
909 }
910
911 // --- Tenants ---------------------------------------------------------
912
913 /// List tenants.
914 ///
915 /// `GET /admin/v2/tenants`.
916 /// Java: `pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Tenants.java`
917 /// (`@Path("/tenants")`) + `admin/impl/TenantsBase.java#getTenants`.
918 pub async fn tenants_list(&self) -> Result<Vec<String>, AdminError> {
919 let url = self.url(&["tenants"])?;
920 let resp = self.send(self.http.request(Method::GET, url)).await?;
921 json_ok(resp).await
922 }
923
924 /// Create a tenant.
925 ///
926 /// `PUT /admin/v2/tenants/{tenant}` with a JSON [`TenantInfo`] body.
927 /// Java: `pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantsBase.java#
928 /// createTenant`.
929 pub async fn tenant_create(&self, name: &str, info: TenantInfo) -> Result<(), AdminError> {
930 let url = self.url(&["tenants", name])?;
931 let resp = self
932 .send(self.http.request(Method::PUT, url).json(&info))
933 .await?;
934 empty_ok(resp).await
935 }
936
937 /// Delete a tenant.
938 ///
939 /// `DELETE /admin/v2/tenants/{tenant}`.
940 /// Java: `pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantsBase.java#
941 /// deleteTenant`.
942 pub async fn tenant_delete(&self, name: &str) -> Result<(), AdminError> {
943 let url = self.url(&["tenants", name])?;
944 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
945 empty_ok(resp).await
946 }
947
948 // --- Namespaces ------------------------------------------------------
949
950 /// List namespaces under a tenant.
951 ///
952 /// `GET /admin/v2/namespaces/{tenant}`.
953 /// Java: `pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java`
954 /// (`@Path("/namespaces")` + `@Path("/{tenant}")`).
955 pub async fn namespaces_list(&self, tenant: &str) -> Result<Vec<String>, AdminError> {
956 let url = self.url(&["namespaces", tenant])?;
957 let resp = self.send(self.http.request(Method::GET, url)).await?;
958 json_ok(resp).await
959 }
960
961 /// Create a namespace.
962 ///
963 /// `PUT /admin/v2/namespaces/{tenant}/{namespace}`. The namespace argument
964 /// is `tenant/namespace`, matching how Pulsar expresses fully qualified
965 /// namespace names on the wire and CLI.
966 /// Java: `pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java`
967 /// (`@PUT @Path("/{tenant}/{namespace}")`).
968 pub async fn namespace_create(&self, ns: &str) -> Result<(), AdminError> {
969 let (tenant, namespace) = split_namespace(ns)?;
970 let url = self.url(&["namespaces", tenant, namespace])?;
971 let resp = self.send(self.http.request(Method::PUT, url)).await?;
972 empty_ok(resp).await
973 }
974
975 /// Delete a namespace.
976 ///
977 /// `DELETE /admin/v2/namespaces/{tenant}/{namespace}`.
978 /// Java: `pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java`
979 /// (`@DELETE @Path("/{tenant}/{namespace}")`).
980 pub async fn namespace_delete(&self, ns: &str) -> Result<(), AdminError> {
981 let (tenant, namespace) = split_namespace(ns)?;
982 let url = self.url(&["namespaces", tenant, namespace])?;
983 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
984 empty_ok(resp).await
985 }
986
987 /// Get a namespace's retention policy.
988 ///
989 /// `GET /admin/v2/namespaces/{tenant}/{ns}/retention`.
990 /// Returns `RetentionPolicies { retentionTimeInMinutes, retentionSizeInMB }`.
991 /// A fresh namespace, or a namespace whose retention was just
992 /// removed, surfaces as 204 / empty body / `null` — we fold those
993 /// to `RetentionPolicies::default()` (broker semantic).
994 /// Java: `NamespacesBase#getRetention`.
995 pub async fn namespace_get_retention(&self, ns: &str) -> Result<RetentionPolicies, AdminError> {
996 let (tenant, namespace) = split_namespace(ns)?;
997 let url = self.url(&["namespaces", tenant, namespace, "retention"])?;
998 let resp = self.send(self.http.request(Method::GET, url)).await?;
999 json_ok_or_default(resp).await
1000 }
1001
1002 /// Set a namespace's retention policy.
1003 ///
1004 /// `POST /admin/v2/namespaces/{tenant}/{ns}/retention` with a JSON
1005 /// `RetentionPolicies` body. `-1` means infinite (size or time).
1006 /// Java: `NamespacesBase#setRetention`.
1007 pub async fn namespace_set_retention(
1008 &self,
1009 ns: &str,
1010 policy: RetentionPolicies,
1011 ) -> Result<(), AdminError> {
1012 let (tenant, namespace) = split_namespace(ns)?;
1013 let url = self.url(&["namespaces", tenant, namespace, "retention"])?;
1014 let resp = self
1015 .send(self.http.request(Method::POST, url).json(&policy))
1016 .await?;
1017 empty_ok(resp).await
1018 }
1019
1020 /// Remove a namespace's retention policy (fall back to broker default).
1021 ///
1022 /// `DELETE /admin/v2/namespaces/{tenant}/{ns}/retention`.
1023 /// Java: `NamespacesBase#removeRetention`.
1024 pub async fn namespace_remove_retention(&self, ns: &str) -> Result<(), AdminError> {
1025 let (tenant, namespace) = split_namespace(ns)?;
1026 let url = self.url(&["namespaces", tenant, namespace, "retention"])?;
1027 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
1028 empty_ok(resp).await
1029 }
1030
1031 /// Get all backlog-quota policies on a namespace.
1032 ///
1033 /// `GET /admin/v2/namespaces/{tenant}/{ns}/backlogQuotaMap`. Returns
1034 /// `Map<BacklogQuotaType, BacklogQuota>` — kept as raw JSON because
1035 /// broker versions add quota types (`message_age` since 2.10).
1036 /// Java: `NamespacesBase#getBacklogQuotaMap`.
1037 pub async fn namespace_get_backlog_quotas(
1038 &self,
1039 ns: &str,
1040 ) -> Result<serde_json::Value, AdminError> {
1041 let (tenant, namespace) = split_namespace(ns)?;
1042 let url = self.url(&["namespaces", tenant, namespace, "backlogQuotaMap"])?;
1043 let resp = self.send(self.http.request(Method::GET, url)).await?;
1044 json_ok(resp).await
1045 }
1046
1047 /// Set a backlog-quota policy on a namespace.
1048 ///
1049 /// `POST /admin/v2/namespaces/{tenant}/{ns}/backlogQuota?backlogQuotaType={type}`
1050 /// with a JSON `BacklogQuota` body. `backlog_quota_type` selects which
1051 /// dimension to limit (`destination_storage` for byte size, `message_age`
1052 /// for wall-clock TTL).
1053 /// Java: `NamespacesBase#setBacklogQuota`.
1054 pub async fn namespace_set_backlog_quota(
1055 &self,
1056 ns: &str,
1057 backlog_quota_type: BacklogQuotaType,
1058 quota: BacklogQuota,
1059 ) -> Result<(), AdminError> {
1060 let (tenant, namespace) = split_namespace(ns)?;
1061 let mut url = self.url(&["namespaces", tenant, namespace, "backlogQuota"])?;
1062 url.query_pairs_mut()
1063 .append_pair("backlogQuotaType", backlog_quota_type.as_query_value());
1064 let resp = self
1065 .send(self.http.request(Method::POST, url).json("a))
1066 .await?;
1067 empty_ok(resp).await
1068 }
1069
1070 /// Remove a backlog-quota policy from a namespace.
1071 ///
1072 /// `DELETE /admin/v2/namespaces/{tenant}/{ns}/backlogQuota?backlogQuotaType={type}`.
1073 /// Java: `NamespacesBase#removeBacklogQuota`.
1074 pub async fn namespace_remove_backlog_quota(
1075 &self,
1076 ns: &str,
1077 backlog_quota_type: BacklogQuotaType,
1078 ) -> Result<(), AdminError> {
1079 let (tenant, namespace) = split_namespace(ns)?;
1080 let mut url = self.url(&["namespaces", tenant, namespace, "backlogQuota"])?;
1081 url.query_pairs_mut()
1082 .append_pair("backlogQuotaType", backlog_quota_type.as_query_value());
1083 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
1084 empty_ok(resp).await
1085 }
1086
1087 /// Get a namespace's message-TTL (seconds).
1088 ///
1089 /// `GET /admin/v2/namespaces/{tenant}/{ns}/messageTTL`. Returns a
1090 /// bare integer (or `null` if no TTL is set — which decodes as
1091 /// `Option::None`).
1092 /// Java: `NamespacesBase#getNamespaceMessageTTL`.
1093 pub async fn namespace_get_message_ttl(&self, ns: &str) -> Result<Option<i32>, AdminError> {
1094 let (tenant, namespace) = split_namespace(ns)?;
1095 let url = self.url(&["namespaces", tenant, namespace, "messageTTL"])?;
1096 let resp = self.send(self.http.request(Method::GET, url)).await?;
1097 json_ok_optional(resp).await
1098 }
1099
1100 /// Set a namespace's message-TTL (seconds).
1101 ///
1102 /// `POST /admin/v2/namespaces/{tenant}/{ns}/messageTTL` with a bare
1103 /// integer body. `0` disables (broker treats as no TTL).
1104 /// Java: `NamespacesBase#setNamespaceMessageTTL`.
1105 pub async fn namespace_set_message_ttl(
1106 &self,
1107 ns: &str,
1108 ttl_seconds: i32,
1109 ) -> Result<(), AdminError> {
1110 let (tenant, namespace) = split_namespace(ns)?;
1111 let url = self.url(&["namespaces", tenant, namespace, "messageTTL"])?;
1112 let resp = self
1113 .send(self.http.request(Method::POST, url).json(&ttl_seconds))
1114 .await?;
1115 empty_ok(resp).await
1116 }
1117
1118 /// Remove a namespace's message-TTL (fall back to broker default).
1119 ///
1120 /// `DELETE /admin/v2/namespaces/{tenant}/{ns}/messageTTL`.
1121 /// Java: `NamespacesBase#removeNamespaceMessageTTL`.
1122 pub async fn namespace_remove_message_ttl(&self, ns: &str) -> Result<(), AdminError> {
1123 let (tenant, namespace) = split_namespace(ns)?;
1124 let url = self.url(&["namespaces", tenant, namespace, "messageTTL"])?;
1125 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
1126 empty_ok(resp).await
1127 }
1128
1129 // --- Namespace policies — persistence + rates ----------------------
1130
1131 /// Get a namespace's persistence policy.
1132 ///
1133 /// `GET /admin/v2/namespaces/{tenant}/{ns}/persistence`. Returns the
1134 /// `BookKeeper` ensemble / write-quorum / ack-quorum triple plus the
1135 /// managed-ledger mark-delete rate cap. `null` body decodes to
1136 /// `PersistencePolicies::default()` via `#[serde(default)]`.
1137 /// Java: `NamespacesBase#getPersistence`.
1138 pub async fn namespace_get_persistence(
1139 &self,
1140 ns: &str,
1141 ) -> Result<PersistencePolicies, AdminError> {
1142 let (tenant, namespace) = split_namespace(ns)?;
1143 let url = self.url(&["namespaces", tenant, namespace, "persistence"])?;
1144 let resp = self.send(self.http.request(Method::GET, url)).await?;
1145 json_ok_or_default(resp).await
1146 }
1147
1148 /// Set a namespace's persistence policy.
1149 ///
1150 /// `POST /admin/v2/namespaces/{tenant}/{ns}/persistence` with a JSON
1151 /// `PersistencePolicies` body.
1152 /// Java: `NamespacesBase#setPersistence`.
1153 pub async fn namespace_set_persistence(
1154 &self,
1155 ns: &str,
1156 policy: PersistencePolicies,
1157 ) -> Result<(), AdminError> {
1158 let (tenant, namespace) = split_namespace(ns)?;
1159 let url = self.url(&["namespaces", tenant, namespace, "persistence"])?;
1160 let resp = self
1161 .send(self.http.request(Method::POST, url).json(&policy))
1162 .await?;
1163 empty_ok(resp).await
1164 }
1165
1166 /// Remove a namespace's persistence policy (fall back to broker default).
1167 ///
1168 /// `DELETE /admin/v2/namespaces/{tenant}/{ns}/persistence`.
1169 /// Java: `NamespacesBase#deletePersistence`.
1170 pub async fn namespace_remove_persistence(&self, ns: &str) -> Result<(), AdminError> {
1171 let (tenant, namespace) = split_namespace(ns)?;
1172 let url = self.url(&["namespaces", tenant, namespace, "persistence"])?;
1173 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
1174 empty_ok(resp).await
1175 }
1176
1177 /// Get a namespace's consumer dispatch-rate policy.
1178 ///
1179 /// `GET /admin/v2/namespaces/{tenant}/{ns}/dispatchRate`. Returns
1180 /// the per-namespace consumer-dispatch throttle (msg/sec, byte/sec,
1181 /// window in seconds). `-1` on either dimension means unlimited.
1182 /// Java: `NamespacesBase#getDispatchRate`.
1183 pub async fn namespace_get_dispatch_rate(&self, ns: &str) -> Result<DispatchRate, AdminError> {
1184 let (tenant, namespace) = split_namespace(ns)?;
1185 let url = self.url(&["namespaces", tenant, namespace, "dispatchRate"])?;
1186 let resp = self.send(self.http.request(Method::GET, url)).await?;
1187 json_ok_or_default(resp).await
1188 }
1189
1190 /// Set a namespace's consumer dispatch-rate policy.
1191 ///
1192 /// `POST /admin/v2/namespaces/{tenant}/{ns}/dispatchRate` with a
1193 /// JSON `DispatchRate` body.
1194 /// Java: `NamespacesBase#setDispatchRate`.
1195 pub async fn namespace_set_dispatch_rate(
1196 &self,
1197 ns: &str,
1198 rate: DispatchRate,
1199 ) -> Result<(), AdminError> {
1200 let (tenant, namespace) = split_namespace(ns)?;
1201 let url = self.url(&["namespaces", tenant, namespace, "dispatchRate"])?;
1202 let resp = self
1203 .send(self.http.request(Method::POST, url).json(&rate))
1204 .await?;
1205 empty_ok(resp).await
1206 }
1207
1208 /// Remove a namespace's consumer dispatch-rate policy.
1209 ///
1210 /// `DELETE /admin/v2/namespaces/{tenant}/{ns}/dispatchRate`.
1211 /// Java: `NamespacesBase#deleteDispatchRate`.
1212 pub async fn namespace_remove_dispatch_rate(&self, ns: &str) -> Result<(), AdminError> {
1213 let (tenant, namespace) = split_namespace(ns)?;
1214 let url = self.url(&["namespaces", tenant, namespace, "dispatchRate"])?;
1215 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
1216 empty_ok(resp).await
1217 }
1218
1219 /// Get a namespace's per-subscription dispatch-rate policy.
1220 ///
1221 /// `GET /admin/v2/namespaces/{tenant}/{ns}/subscriptionDispatchRate`.
1222 /// Reuses the [`DispatchRate`] body shape — the policy applies per
1223 /// subscription rather than aggregated across all consumers.
1224 /// Java: `NamespacesBase#getSubscriptionDispatchRate`.
1225 pub async fn namespace_get_subscription_dispatch_rate(
1226 &self,
1227 ns: &str,
1228 ) -> Result<DispatchRate, AdminError> {
1229 let (tenant, namespace) = split_namespace(ns)?;
1230 let url = self.url(&["namespaces", tenant, namespace, "subscriptionDispatchRate"])?;
1231 let resp = self.send(self.http.request(Method::GET, url)).await?;
1232 json_ok(resp).await
1233 }
1234
1235 /// Set a namespace's per-subscription dispatch-rate policy.
1236 ///
1237 /// `POST /admin/v2/namespaces/{tenant}/{ns}/subscriptionDispatchRate`
1238 /// with a JSON `DispatchRate` body.
1239 /// Java: `NamespacesBase#setSubscriptionDispatchRate`.
1240 pub async fn namespace_set_subscription_dispatch_rate(
1241 &self,
1242 ns: &str,
1243 rate: DispatchRate,
1244 ) -> Result<(), AdminError> {
1245 let (tenant, namespace) = split_namespace(ns)?;
1246 let url = self.url(&["namespaces", tenant, namespace, "subscriptionDispatchRate"])?;
1247 let resp = self
1248 .send(self.http.request(Method::POST, url).json(&rate))
1249 .await?;
1250 empty_ok(resp).await
1251 }
1252
1253 /// Remove a namespace's per-subscription dispatch-rate policy.
1254 ///
1255 /// `DELETE /admin/v2/namespaces/{tenant}/{ns}/subscriptionDispatchRate`.
1256 /// Java: `NamespacesBase#deleteSubscriptionDispatchRate`.
1257 pub async fn namespace_remove_subscription_dispatch_rate(
1258 &self,
1259 ns: &str,
1260 ) -> Result<(), AdminError> {
1261 let (tenant, namespace) = split_namespace(ns)?;
1262 let url = self.url(&["namespaces", tenant, namespace, "subscriptionDispatchRate"])?;
1263 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
1264 empty_ok(resp).await
1265 }
1266
1267 /// Get a namespace's cross-cluster replicator dispatch-rate policy.
1268 ///
1269 /// `GET /admin/v2/namespaces/{tenant}/{ns}/replicatorDispatchRate`.
1270 /// Reuses the [`DispatchRate`] body shape — the policy throttles
1271 /// outbound geo-replication traffic from this cluster.
1272 /// Java: `NamespacesBase#getReplicatorDispatchRate`.
1273 pub async fn namespace_get_replicator_dispatch_rate(
1274 &self,
1275 ns: &str,
1276 ) -> Result<DispatchRate, AdminError> {
1277 let (tenant, namespace) = split_namespace(ns)?;
1278 let url = self.url(&["namespaces", tenant, namespace, "replicatorDispatchRate"])?;
1279 let resp = self.send(self.http.request(Method::GET, url)).await?;
1280 json_ok(resp).await
1281 }
1282
1283 /// Set a namespace's cross-cluster replicator dispatch-rate policy.
1284 ///
1285 /// `POST /admin/v2/namespaces/{tenant}/{ns}/replicatorDispatchRate`
1286 /// with a JSON `DispatchRate` body.
1287 /// Java: `NamespacesBase#setReplicatorDispatchRate`.
1288 pub async fn namespace_set_replicator_dispatch_rate(
1289 &self,
1290 ns: &str,
1291 rate: DispatchRate,
1292 ) -> Result<(), AdminError> {
1293 let (tenant, namespace) = split_namespace(ns)?;
1294 let url = self.url(&["namespaces", tenant, namespace, "replicatorDispatchRate"])?;
1295 let resp = self
1296 .send(self.http.request(Method::POST, url).json(&rate))
1297 .await?;
1298 empty_ok(resp).await
1299 }
1300
1301 /// Remove a namespace's cross-cluster replicator dispatch-rate policy.
1302 ///
1303 /// `DELETE /admin/v2/namespaces/{tenant}/{ns}/replicatorDispatchRate`.
1304 /// Java: `NamespacesBase#removeReplicatorDispatchRate`.
1305 pub async fn namespace_remove_replicator_dispatch_rate(
1306 &self,
1307 ns: &str,
1308 ) -> Result<(), AdminError> {
1309 let (tenant, namespace) = split_namespace(ns)?;
1310 let url = self.url(&["namespaces", tenant, namespace, "replicatorDispatchRate"])?;
1311 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
1312 empty_ok(resp).await
1313 }
1314
1315 /// Get a namespace's publish-rate policy.
1316 ///
1317 /// `GET /admin/v2/namespaces/{tenant}/{ns}/publishRate`. Returns
1318 /// the producer-side throttle (msg/sec + byte/sec). `-1` on either
1319 /// dimension means unlimited.
1320 /// Java: `NamespacesBase#getPublishRate`.
1321 pub async fn namespace_get_publish_rate(&self, ns: &str) -> Result<PublishRate, AdminError> {
1322 let (tenant, namespace) = split_namespace(ns)?;
1323 let url = self.url(&["namespaces", tenant, namespace, "publishRate"])?;
1324 let resp = self.send(self.http.request(Method::GET, url)).await?;
1325 json_ok_or_default(resp).await
1326 }
1327
1328 /// Set a namespace's publish-rate policy.
1329 ///
1330 /// `POST /admin/v2/namespaces/{tenant}/{ns}/publishRate` with a JSON
1331 /// `PublishRate` body.
1332 /// Java: `NamespacesBase#setPublishRate`.
1333 pub async fn namespace_set_publish_rate(
1334 &self,
1335 ns: &str,
1336 rate: PublishRate,
1337 ) -> Result<(), AdminError> {
1338 let (tenant, namespace) = split_namespace(ns)?;
1339 let url = self.url(&["namespaces", tenant, namespace, "publishRate"])?;
1340 let resp = self
1341 .send(self.http.request(Method::POST, url).json(&rate))
1342 .await?;
1343 empty_ok(resp).await
1344 }
1345
1346 /// Remove a namespace's publish-rate policy.
1347 ///
1348 /// `DELETE /admin/v2/namespaces/{tenant}/{ns}/publishRate`.
1349 /// Java: `NamespacesBase#removePublishRate`.
1350 pub async fn namespace_remove_publish_rate(&self, ns: &str) -> Result<(), AdminError> {
1351 let (tenant, namespace) = split_namespace(ns)?;
1352 let url = self.url(&["namespaces", tenant, namespace, "publishRate"])?;
1353 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
1354 empty_ok(resp).await
1355 }
1356
1357 // --- Namespace policies — limits + dedup + delayed delivery -----
1358
1359 /// Get a namespace's broker-side message deduplication flag.
1360 ///
1361 /// `GET /admin/v2/namespaces/{tenant}/{ns}/deduplication`. Returns a
1362 /// bare JSON boolean, or `null` (decoded as `None`) when the policy
1363 /// is unset and the broker default applies.
1364 /// Java: `NamespacesBase#getDeduplication`.
1365 pub async fn namespace_get_deduplication(&self, ns: &str) -> Result<Option<bool>, AdminError> {
1366 let (tenant, namespace) = split_namespace(ns)?;
1367 let url = self.url(&["namespaces", tenant, namespace, "deduplication"])?;
1368 let resp = self.send(self.http.request(Method::GET, url)).await?;
1369 json_ok_optional(resp).await
1370 }
1371
1372 /// Set a namespace's broker-side message deduplication flag.
1373 ///
1374 /// `POST /admin/v2/namespaces/{tenant}/{ns}/deduplication` with a
1375 /// bare JSON boolean body.
1376 /// Java: `NamespacesBase#modifyDeduplication`.
1377 pub async fn namespace_set_deduplication(
1378 &self,
1379 ns: &str,
1380 enabled: bool,
1381 ) -> Result<(), AdminError> {
1382 let (tenant, namespace) = split_namespace(ns)?;
1383 let url = self.url(&["namespaces", tenant, namespace, "deduplication"])?;
1384 let resp = self
1385 .send(self.http.request(Method::POST, url).json(&enabled))
1386 .await?;
1387 empty_ok(resp).await
1388 }
1389
1390 /// Remove a namespace's deduplication flag (fall back to broker default).
1391 ///
1392 /// `DELETE /admin/v2/namespaces/{tenant}/{ns}/deduplication`.
1393 /// Java: `NamespacesBase#removeDeduplication`.
1394 pub async fn namespace_remove_deduplication(&self, ns: &str) -> Result<(), AdminError> {
1395 let (tenant, namespace) = split_namespace(ns)?;
1396 let url = self.url(&["namespaces", tenant, namespace, "deduplication"])?;
1397 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
1398 empty_ok(resp).await
1399 }
1400
1401 /// Get a namespace's deduplication-snapshot interval (entries).
1402 ///
1403 /// `GET /admin/v2/namespaces/{tenant}/{ns}/deduplicationSnapshotInterval`.
1404 /// Returns a bare integer (the entry count between dedup cursor
1405 /// snapshots), or `null` (decoded as `None`) when the broker default
1406 /// applies.
1407 /// Java: `NamespacesBase#getDeduplicationSnapshotInterval`.
1408 pub async fn namespace_get_deduplication_snapshot_interval(
1409 &self,
1410 ns: &str,
1411 ) -> Result<Option<i32>, AdminError> {
1412 let (tenant, namespace) = split_namespace(ns)?;
1413 let url = self.url(&[
1414 "namespaces",
1415 tenant,
1416 namespace,
1417 "deduplicationSnapshotInterval",
1418 ])?;
1419 let resp = self.send(self.http.request(Method::GET, url)).await?;
1420 json_ok_optional(resp).await
1421 }
1422
1423 /// Set a namespace's deduplication-snapshot interval (entries).
1424 ///
1425 /// `POST /admin/v2/namespaces/{tenant}/{ns}/deduplicationSnapshotInterval`
1426 /// with a bare JSON integer body.
1427 /// Java: `NamespacesBase#setDeduplicationSnapshotInterval`.
1428 pub async fn namespace_set_deduplication_snapshot_interval(
1429 &self,
1430 ns: &str,
1431 interval_entries: i32,
1432 ) -> Result<(), AdminError> {
1433 let (tenant, namespace) = split_namespace(ns)?;
1434 let url = self.url(&[
1435 "namespaces",
1436 tenant,
1437 namespace,
1438 "deduplicationSnapshotInterval",
1439 ])?;
1440 let resp = self
1441 .send(self.http.request(Method::POST, url).json(&interval_entries))
1442 .await?;
1443 empty_ok(resp).await
1444 }
1445
1446 /// Remove a namespace's deduplication-snapshot interval override.
1447 ///
1448 /// `DELETE /admin/v2/namespaces/{tenant}/{ns}/deduplicationSnapshotInterval`.
1449 /// Java: `NamespacesBase#deleteDeduplicationSnapshotInterval`.
1450 pub async fn namespace_remove_deduplication_snapshot_interval(
1451 &self,
1452 ns: &str,
1453 ) -> Result<(), AdminError> {
1454 let (tenant, namespace) = split_namespace(ns)?;
1455 let url = self.url(&[
1456 "namespaces",
1457 tenant,
1458 namespace,
1459 "deduplicationSnapshotInterval",
1460 ])?;
1461 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
1462 empty_ok(resp).await
1463 }
1464
1465 /// Get a namespace's compaction threshold (bytes).
1466 ///
1467 /// `GET /admin/v2/namespaces/{tenant}/{ns}/compactionThreshold`. Returns
1468 /// a bare integer (bytes of accumulated topic backlog above which the
1469 /// broker triggers automatic compaction), or `null` (decoded as `None`)
1470 /// when the broker default applies.
1471 /// Java: `NamespacesBase#getCompactionThreshold`.
1472 pub async fn namespace_get_compaction_threshold(
1473 &self,
1474 ns: &str,
1475 ) -> Result<Option<i64>, AdminError> {
1476 let (tenant, namespace) = split_namespace(ns)?;
1477 let url = self.url(&["namespaces", tenant, namespace, "compactionThreshold"])?;
1478 let resp = self.send(self.http.request(Method::GET, url)).await?;
1479 json_ok_optional(resp).await
1480 }
1481
1482 /// Set a namespace's compaction threshold (bytes).
1483 ///
1484 /// `PUT /admin/v2/namespaces/{tenant}/{ns}/compactionThreshold` with
1485 /// a bare JSON long body. `0` disables automatic compaction.
1486 ///
1487 /// Note: this endpoint is `@PUT` in Pulsar 4 (`Namespaces.java`
1488 /// declares `@PUT @Path("/{tenant}/{namespace}/compactionThreshold")`),
1489 /// not POST — using POST yields a `405 Method Not Allowed`.
1490 /// Java: `NamespacesBase#setCompactionThreshold`.
1491 pub async fn namespace_set_compaction_threshold(
1492 &self,
1493 ns: &str,
1494 threshold_bytes: i64,
1495 ) -> Result<(), AdminError> {
1496 let (tenant, namespace) = split_namespace(ns)?;
1497 let url = self.url(&["namespaces", tenant, namespace, "compactionThreshold"])?;
1498 let resp = self
1499 .send(self.http.request(Method::PUT, url).json(&threshold_bytes))
1500 .await?;
1501 empty_ok(resp).await
1502 }
1503
1504 /// Remove a namespace's compaction threshold override.
1505 ///
1506 /// `DELETE /admin/v2/namespaces/{tenant}/{ns}/compactionThreshold`.
1507 /// Java: `NamespacesBase#deleteCompactionThreshold`.
1508 pub async fn namespace_remove_compaction_threshold(&self, ns: &str) -> Result<(), AdminError> {
1509 let (tenant, namespace) = split_namespace(ns)?;
1510 let url = self.url(&["namespaces", tenant, namespace, "compactionThreshold"])?;
1511 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
1512 empty_ok(resp).await
1513 }
1514
1515 /// Get a namespace's delayed-delivery policy.
1516 ///
1517 /// `GET /admin/v2/namespaces/{tenant}/{ns}/delayedDelivery`. Returns
1518 /// the active flag + tick time (the broker's index-tick granularity
1519 /// for delivering delayed messages). `null` decodes as `None`.
1520 /// Java: `NamespacesBase#getDelayedDeliveryPolicies`.
1521 pub async fn namespace_get_delayed_delivery(
1522 &self,
1523 ns: &str,
1524 ) -> Result<Option<DelayedDeliveryPolicies>, AdminError> {
1525 let (tenant, namespace) = split_namespace(ns)?;
1526 let url = self.url(&["namespaces", tenant, namespace, "delayedDelivery"])?;
1527 let resp = self.send(self.http.request(Method::GET, url)).await?;
1528 json_ok_optional(resp).await
1529 }
1530
1531 /// Set a namespace's delayed-delivery policy.
1532 ///
1533 /// `POST /admin/v2/namespaces/{tenant}/{ns}/delayedDelivery` with a
1534 /// JSON `DelayedDeliveryPolicies` body.
1535 /// Java: `NamespacesBase#setDelayedDeliveryPolicies`.
1536 pub async fn namespace_set_delayed_delivery(
1537 &self,
1538 ns: &str,
1539 policy: DelayedDeliveryPolicies,
1540 ) -> Result<(), AdminError> {
1541 let (tenant, namespace) = split_namespace(ns)?;
1542 let url = self.url(&["namespaces", tenant, namespace, "delayedDelivery"])?;
1543 let resp = self
1544 .send(self.http.request(Method::POST, url).json(&policy))
1545 .await?;
1546 empty_ok(resp).await
1547 }
1548
1549 /// Remove a namespace's delayed-delivery policy override.
1550 ///
1551 /// `DELETE /admin/v2/namespaces/{tenant}/{ns}/delayedDelivery`.
1552 /// Java: `NamespacesBase#removeDelayedDeliveryPolicies`.
1553 pub async fn namespace_remove_delayed_delivery(&self, ns: &str) -> Result<(), AdminError> {
1554 let (tenant, namespace) = split_namespace(ns)?;
1555 let url = self.url(&["namespaces", tenant, namespace, "delayedDelivery"])?;
1556 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
1557 empty_ok(resp).await
1558 }
1559
1560 /// Get a namespace's max-producers-per-topic limit.
1561 ///
1562 /// `GET /admin/v2/namespaces/{tenant}/{ns}/maxProducersPerTopic`. Returns
1563 /// a bare integer (the per-topic ceiling on concurrent producer
1564 /// connections), or `null` (decoded as `None`) when the broker default
1565 /// applies.
1566 /// Java: `NamespacesBase#getMaxProducersPerTopic`.
1567 pub async fn namespace_get_max_producers_per_topic(
1568 &self,
1569 ns: &str,
1570 ) -> Result<Option<i32>, AdminError> {
1571 let (tenant, namespace) = split_namespace(ns)?;
1572 let url = self.url(&["namespaces", tenant, namespace, "maxProducersPerTopic"])?;
1573 let resp = self.send(self.http.request(Method::GET, url)).await?;
1574 json_ok_optional(resp).await
1575 }
1576
1577 /// Set a namespace's max-producers-per-topic limit.
1578 ///
1579 /// `POST /admin/v2/namespaces/{tenant}/{ns}/maxProducersPerTopic` with
1580 /// a bare JSON integer body. `0` disables the limit.
1581 /// Java: `NamespacesBase#setMaxProducersPerTopic`.
1582 pub async fn namespace_set_max_producers_per_topic(
1583 &self,
1584 ns: &str,
1585 max_producers: i32,
1586 ) -> Result<(), AdminError> {
1587 let (tenant, namespace) = split_namespace(ns)?;
1588 let url = self.url(&["namespaces", tenant, namespace, "maxProducersPerTopic"])?;
1589 let resp = self
1590 .send(self.http.request(Method::POST, url).json(&max_producers))
1591 .await?;
1592 empty_ok(resp).await
1593 }
1594
1595 /// Remove a namespace's max-producers-per-topic limit override.
1596 ///
1597 /// `DELETE /admin/v2/namespaces/{tenant}/{ns}/maxProducersPerTopic`.
1598 /// Java: `NamespacesBase#removeMaxProducersPerTopic`.
1599 pub async fn namespace_remove_max_producers_per_topic(
1600 &self,
1601 ns: &str,
1602 ) -> Result<(), AdminError> {
1603 let (tenant, namespace) = split_namespace(ns)?;
1604 let url = self.url(&["namespaces", tenant, namespace, "maxProducersPerTopic"])?;
1605 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
1606 empty_ok(resp).await
1607 }
1608
1609 /// Get a namespace's max-consumers-per-topic limit.
1610 ///
1611 /// `GET /admin/v2/namespaces/{tenant}/{ns}/maxConsumersPerTopic`. Returns
1612 /// a bare integer (the per-topic ceiling on concurrent consumer
1613 /// connections across all subscriptions), or `null` (decoded as `None`)
1614 /// when the broker default applies.
1615 /// Java: `NamespacesBase#getMaxConsumersPerTopic`.
1616 pub async fn namespace_get_max_consumers_per_topic(
1617 &self,
1618 ns: &str,
1619 ) -> Result<Option<i32>, AdminError> {
1620 let (tenant, namespace) = split_namespace(ns)?;
1621 let url = self.url(&["namespaces", tenant, namespace, "maxConsumersPerTopic"])?;
1622 let resp = self.send(self.http.request(Method::GET, url)).await?;
1623 json_ok_optional(resp).await
1624 }
1625
1626 /// Set a namespace's max-consumers-per-topic limit.
1627 ///
1628 /// `POST /admin/v2/namespaces/{tenant}/{ns}/maxConsumersPerTopic` with
1629 /// a bare JSON integer body. `0` disables the limit.
1630 /// Java: `NamespacesBase#setMaxConsumersPerTopic`.
1631 pub async fn namespace_set_max_consumers_per_topic(
1632 &self,
1633 ns: &str,
1634 max_consumers: i32,
1635 ) -> Result<(), AdminError> {
1636 let (tenant, namespace) = split_namespace(ns)?;
1637 let url = self.url(&["namespaces", tenant, namespace, "maxConsumersPerTopic"])?;
1638 let resp = self
1639 .send(self.http.request(Method::POST, url).json(&max_consumers))
1640 .await?;
1641 empty_ok(resp).await
1642 }
1643
1644 /// Remove a namespace's max-consumers-per-topic limit override.
1645 ///
1646 /// `DELETE /admin/v2/namespaces/{tenant}/{ns}/maxConsumersPerTopic`.
1647 /// Java: `NamespacesBase#removeMaxConsumersPerTopic`.
1648 pub async fn namespace_remove_max_consumers_per_topic(
1649 &self,
1650 ns: &str,
1651 ) -> Result<(), AdminError> {
1652 let (tenant, namespace) = split_namespace(ns)?;
1653 let url = self.url(&["namespaces", tenant, namespace, "maxConsumersPerTopic"])?;
1654 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
1655 empty_ok(resp).await
1656 }
1657
1658 /// Get a namespace's max-unacked-messages-per-consumer limit.
1659 ///
1660 /// `GET /admin/v2/namespaces/{tenant}/{ns}/maxUnackedMessagesPerConsumer`.
1661 /// Returns a bare integer (the broker's per-consumer permit-pool cap
1662 /// before it stops dispatching), or `null` (decoded as `None`) when
1663 /// the broker default applies.
1664 /// Java: `NamespacesBase#getMaxUnackedMessagesPerConsumer`.
1665 pub async fn namespace_get_max_unacked_messages_per_consumer(
1666 &self,
1667 ns: &str,
1668 ) -> Result<Option<i32>, AdminError> {
1669 let (tenant, namespace) = split_namespace(ns)?;
1670 let url = self.url(&[
1671 "namespaces",
1672 tenant,
1673 namespace,
1674 "maxUnackedMessagesPerConsumer",
1675 ])?;
1676 let resp = self.send(self.http.request(Method::GET, url)).await?;
1677 json_ok_optional(resp).await
1678 }
1679
1680 /// Set a namespace's max-unacked-messages-per-consumer limit.
1681 ///
1682 /// `POST /admin/v2/namespaces/{tenant}/{ns}/maxUnackedMessagesPerConsumer`
1683 /// with a bare JSON integer body. `0` disables the limit.
1684 /// Java: `NamespacesBase#setMaxUnackedMessagesPerConsumer`.
1685 pub async fn namespace_set_max_unacked_messages_per_consumer(
1686 &self,
1687 ns: &str,
1688 max_unacked: i32,
1689 ) -> Result<(), AdminError> {
1690 let (tenant, namespace) = split_namespace(ns)?;
1691 let url = self.url(&[
1692 "namespaces",
1693 tenant,
1694 namespace,
1695 "maxUnackedMessagesPerConsumer",
1696 ])?;
1697 let resp = self
1698 .send(self.http.request(Method::POST, url).json(&max_unacked))
1699 .await?;
1700 empty_ok(resp).await
1701 }
1702
1703 /// Remove a namespace's max-unacked-messages-per-consumer override.
1704 ///
1705 /// `DELETE /admin/v2/namespaces/{tenant}/{ns}/maxUnackedMessagesPerConsumer`.
1706 /// Java: `NamespacesBase#removeMaxUnackedMessagesPerConsumer`.
1707 pub async fn namespace_remove_max_unacked_messages_per_consumer(
1708 &self,
1709 ns: &str,
1710 ) -> Result<(), AdminError> {
1711 let (tenant, namespace) = split_namespace(ns)?;
1712 let url = self.url(&[
1713 "namespaces",
1714 tenant,
1715 namespace,
1716 "maxUnackedMessagesPerConsumer",
1717 ])?;
1718 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
1719 empty_ok(resp).await
1720 }
1721
1722 /// Get a namespace's max-unacked-messages-per-subscription limit.
1723 ///
1724 /// `GET /admin/v2/namespaces/{tenant}/{ns}/maxUnackedMessagesPerSubscription`.
1725 /// Returns a bare integer (the broker's per-subscription unacked
1726 /// ceiling — once exceeded the broker stops dispatching to every
1727 /// consumer on that subscription), or `null` (decoded as `None`)
1728 /// when the broker default applies.
1729 /// Java: `NamespacesBase#getMaxUnackedMessagesPerSubscription`.
1730 pub async fn namespace_get_max_unacked_messages_per_subscription(
1731 &self,
1732 ns: &str,
1733 ) -> Result<Option<i32>, AdminError> {
1734 let (tenant, namespace) = split_namespace(ns)?;
1735 let url = self.url(&[
1736 "namespaces",
1737 tenant,
1738 namespace,
1739 "maxUnackedMessagesPerSubscription",
1740 ])?;
1741 let resp = self.send(self.http.request(Method::GET, url)).await?;
1742 json_ok_optional(resp).await
1743 }
1744
1745 /// Set a namespace's max-unacked-messages-per-subscription limit.
1746 ///
1747 /// `POST /admin/v2/namespaces/{tenant}/{ns}/maxUnackedMessagesPerSubscription`
1748 /// with a bare JSON integer body. `0` disables the limit.
1749 /// Java: `NamespacesBase#setMaxUnackedMessagesPerSubscription`.
1750 pub async fn namespace_set_max_unacked_messages_per_subscription(
1751 &self,
1752 ns: &str,
1753 max_unacked: i32,
1754 ) -> Result<(), AdminError> {
1755 let (tenant, namespace) = split_namespace(ns)?;
1756 let url = self.url(&[
1757 "namespaces",
1758 tenant,
1759 namespace,
1760 "maxUnackedMessagesPerSubscription",
1761 ])?;
1762 let resp = self
1763 .send(self.http.request(Method::POST, url).json(&max_unacked))
1764 .await?;
1765 empty_ok(resp).await
1766 }
1767
1768 /// Remove a namespace's max-unacked-messages-per-subscription override.
1769 ///
1770 /// `DELETE /admin/v2/namespaces/{tenant}/{ns}/maxUnackedMessagesPerSubscription`.
1771 /// Java: `NamespacesBase#removeMaxUnackedMessagesPerSubscription`.
1772 pub async fn namespace_remove_max_unacked_messages_per_subscription(
1773 &self,
1774 ns: &str,
1775 ) -> Result<(), AdminError> {
1776 let (tenant, namespace) = split_namespace(ns)?;
1777 let url = self.url(&[
1778 "namespaces",
1779 tenant,
1780 namespace,
1781 "maxUnackedMessagesPerSubscription",
1782 ])?;
1783 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
1784 empty_ok(resp).await
1785 }
1786
1787 // --- Topics ----------------------------------------------------------
1788
1789 /// List persistent topics in a namespace.
1790 ///
1791 /// `GET /admin/v2/persistent/{tenant}/{namespace}`.
1792 /// Java: `pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java`
1793 /// (`@Path("/persistent")` + `@GET @Path("/{tenant}/{namespace}")`).
1794 pub async fn topics_list(&self, namespace: &str) -> Result<Vec<String>, AdminError> {
1795 let (tenant, namespace) = split_namespace(namespace)?;
1796 let url = self.url(&["persistent", tenant, namespace])?;
1797 let resp = self.send(self.http.request(Method::GET, url)).await?;
1798 json_ok(resp).await
1799 }
1800
1801 /// Create a non-partitioned persistent topic.
1802 ///
1803 /// `PUT /admin/v2/persistent/{tenant}/{namespace}/{topic}`.
1804 /// Java: `PersistentTopics.java#createNonPartitionedTopic`
1805 /// (`@PUT @Path("/{tenant}/{namespace}/{topic}")`).
1806 pub async fn topic_create_non_partitioned(&self, topic: &str) -> Result<(), AdminError> {
1807 self.topic_create_non_partitioned_with_properties(topic, &HashMap::new())
1808 .await
1809 }
1810
1811 async fn topic_create_non_partitioned_with_properties(
1812 &self,
1813 topic: &str,
1814 properties: &HashMap<String, String>,
1815 ) -> Result<(), AdminError> {
1816 let (tenant, namespace, name) = split_topic(topic)?;
1817 let url = self.url(&["persistent", tenant, namespace, name])?;
1818 let resp = self
1819 .send(self.http.request(Method::PUT, url).json(properties))
1820 .await?;
1821 empty_ok(resp).await
1822 }
1823
1824 /// Create a partitioned topic with `partitions` partitions.
1825 ///
1826 /// `PUT /admin/v2/persistent/{tenant}/{namespace}/{topic}/partitions`
1827 /// with the partition count as a JSON integer body.
1828 /// Java: `PersistentTopics.java#createPartitionedTopic`
1829 /// (`@PUT @Path("/{tenant}/{namespace}/{topic}/partitions")`).
1830 pub async fn topic_create_partitioned(
1831 &self,
1832 topic: &str,
1833 partitions: u32,
1834 ) -> Result<(), AdminError> {
1835 let (tenant, namespace, name) = split_topic(topic)?;
1836 let url = self.url(&["persistent", tenant, namespace, name, "partitions"])?;
1837 let resp = self
1838 .send(self.http.request(Method::PUT, url).json(&partitions))
1839 .await?;
1840 empty_ok(resp).await
1841 }
1842
1843 /// Delete a topic, auto-detecting partitioned vs non-partitioned.
1844 ///
1845 /// Pulsar exposes two distinct delete endpoints — the partitioned
1846 /// parent uses `DELETE .../{topic}/partitions?force=…` and the
1847 /// non-partitioned topic uses `DELETE .../{topic}?force=…`. Hitting
1848 /// the partitioned endpoint on a non-partitioned topic returns 404
1849 /// ("Topic is not partitioned"), which used to surface as "the
1850 /// topic doesn't exist" to operators using `magnetar admin topics
1851 /// delete`.
1852 ///
1853 /// Probe via `topic_partitions_count` first (`GET .../partitions`
1854 /// returns `partitions: 0` for non-partitioned topics, `> 0` for a
1855 /// partitioned parent) and route to the matching endpoint. Same
1856 /// behaviour as pulsarctl's `topics delete`.
1857 ///
1858 /// Java: `PersistentTopics.java#deletePartitionedTopic` /
1859 /// `PersistentTopics.java#deleteTopic`.
1860 pub async fn topic_delete(&self, topic: &str, force: bool) -> Result<(), AdminError> {
1861 let (tenant, namespace, name) = split_topic(topic)?;
1862 let partitions = self.topic_partitions_count(topic).await?;
1863 let force_str = if force { "true" } else { "false" };
1864 let mut url = if partitions > 0 {
1865 self.url(&["persistent", tenant, namespace, name, "partitions"])?
1866 } else {
1867 self.url(&["persistent", tenant, namespace, name])?
1868 };
1869 url.query_pairs_mut().append_pair("force", force_str);
1870 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
1871 empty_ok(resp).await
1872 }
1873
1874 /// Get topic stats.
1875 ///
1876 /// `GET /admin/v2/persistent/{tenant}/{namespace}/{topic}/stats`.
1877 /// Java: `PersistentTopics.java#getStats`
1878 /// (`@GET @Path("/{tenant}/{namespace}/{topic}/stats")`,
1879 /// response shape `PersistentTopicStats`).
1880 ///
1881 /// For a **partitioned** topic, the broker returns 404 on this endpoint
1882 /// because there is no ledger backing the parent name. Call
1883 /// [`Self::topic_partitioned_stats`] instead, or look up the count via
1884 /// [`Self::topic_partitions_count`] first.
1885 pub async fn topic_stats(&self, topic: &str) -> Result<TopicStats, AdminError> {
1886 let (tenant, namespace, name) = split_topic(topic)?;
1887 let url = self.url(&["persistent", tenant, namespace, name, "stats"])?;
1888 let resp = self.send(self.http.request(Method::GET, url)).await?;
1889 json_ok(resp).await
1890 }
1891
1892 /// Get aggregated stats for a partitioned topic.
1893 ///
1894 /// `GET /admin/v2/persistent/{tenant}/{namespace}/{topic}/partitioned-stats?
1895 /// perPartition=false`. Java: `PersistentTopics.java#getPartitionedStats`
1896 /// (`@GET @Path("/{tenant}/{namespace}/{topic}/partitioned-stats")`,
1897 /// response shape `PartitionedTopicStats` which extends
1898 /// `PersistentTopicStats` with `partitions: Map<String, TopicStats>`
1899 /// and `metadata: PartitionedTopicMetadata`).
1900 ///
1901 /// magnetar exposes only the aggregated top-level metrics through the
1902 /// same [`TopicStats`] shape — the broker populates the rate, throughput,
1903 /// size, and counter fields at the response root summed across partitions.
1904 /// The `partitions` and `metadata` fields are
1905 /// dropped on deserialisation; for per-partition detail call
1906 /// [`Self::topic_stats`] on each `<topic>-partition-N` instead. We pass
1907 /// `perPartition=false` to keep the wire response small.
1908 pub async fn topic_partitioned_stats(&self, topic: &str) -> Result<TopicStats, AdminError> {
1909 let (tenant, namespace, name) = split_topic(topic)?;
1910 let mut url = self.url(&["persistent", tenant, namespace, name, "partitioned-stats"])?;
1911 url.query_pairs_mut().append_pair("perPartition", "false");
1912 let resp = self.send(self.http.request(Method::GET, url)).await?;
1913 json_ok(resp).await
1914 }
1915
1916 /// Resolve the partition count of a topic.
1917 ///
1918 /// `GET /admin/v2/persistent/{tenant}/{namespace}/{topic}/partitions`.
1919 /// Java: `PersistentTopics.java#getPartitionedMetadata`
1920 /// (`@GET @Path("/{tenant}/{namespace}/{topic}/partitions")`,
1921 /// response shape `PartitionedTopicMetadata{ partitions: int }`).
1922 ///
1923 /// Returns `0` for non-partitioned topics; lets a caller disambiguate
1924 /// between [`Self::topic_stats`] and [`Self::topic_partitioned_stats`]
1925 /// when the topology is not known in advance.
1926 pub async fn topic_partitions_count(&self, topic: &str) -> Result<u32, AdminError> {
1927 let (tenant, namespace, name) = split_topic(topic)?;
1928 let url = self.url(&["persistent", tenant, namespace, name, "partitions"])?;
1929 let resp = self.send(self.http.request(Method::GET, url)).await?;
1930 let meta: PartitionedTopicMetadata = json_ok(resp).await?;
1931 Ok(meta.partitions)
1932 }
1933
1934 /// Resolve a broker-entry-metadata `index` to a [`MessageId`] (PIP-415).
1935 ///
1936 /// `GET /admin/v2/persistent/{tenant}/{namespace}/{topic}/getMessageIdByIndex?index={index}`.
1937 /// Per [PIP-415](https://github.com/apache/pulsar/blob/master/pip/pip-415.md)
1938 /// this is **REST-only** — the spec's "Binary protocol" section is
1939 /// intentionally empty and the canonical implementation PR
1940 /// [`apache/pulsar#24222`](https://github.com/apache/pulsar/pull/24222)
1941 /// (merged 2025-06-23) touches only admin / broker / CLI Java code.
1942 ///
1943 /// Java:
1944 /// `pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java`
1945 /// (`@GET @Path("/{tenant}/{namespace}/{topic}/getMessageIdByIndex")`,
1946 /// `@QueryParam("index") long`); admin-client side is
1947 /// `pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/
1948 /// TopicsImpl.java#getMessageIdByIndexAsync` which deserialises the
1949 /// response into `MessageIdImpl` (i.e. `{ledgerId, entryId, partitionIndex}`).
1950 ///
1951 /// `topic` follows the same rule as every other topic-scoped method:
1952 /// either `persistent://tenant/ns/topic` or `tenant/ns/topic`. For a
1953 /// partitioned topic, pass the specific partition (`my-topic-partition-0`).
1954 ///
1955 /// The response carries only `(ledgerId, entryId, partitionIndex)`. The
1956 /// returned [`MessageId`] sets `batch_index = -1` and `batch_size = -1`
1957 /// because the broker resolves at entry granularity — see PIP-415 §"Why
1958 /// Precise Index Matching Isn't Implemented on the Broker Side".
1959 pub async fn topic_get_message_id_by_index(
1960 &self,
1961 topic: &str,
1962 index: i64,
1963 ) -> Result<MessageId, AdminError> {
1964 let (tenant, namespace, name) = split_topic(topic)?;
1965 let mut url = self.url(&["persistent", tenant, namespace, name, "getMessageIdByIndex"])?;
1966 url.query_pairs_mut()
1967 .append_pair("index", &index.to_string());
1968 let resp = self.send(self.http.request(Method::GET, url)).await?;
1969 let dto: MessageIdResponse = json_ok(resp).await?;
1970 dto.try_into_message_id()
1971 }
1972
1973 /// Trigger ledger compaction for a topic.
1974 ///
1975 /// `PUT /admin/v2/persistent/{tenant}/{namespace}/{topic}/compaction`.
1976 /// Returns 204 on success; the broker queues the work asynchronously —
1977 /// poll [`Self::topic_compaction_status`] to observe progress.
1978 /// Java: `PersistentTopics#triggerCompaction`.
1979 pub async fn topic_compact(&self, topic: &str) -> Result<(), AdminError> {
1980 let (tenant, namespace, name) = split_topic(topic)?;
1981 let url = self.url(&["persistent", tenant, namespace, name, "compaction"])?;
1982 let resp = self.send(self.http.request(Method::PUT, url)).await?;
1983 empty_ok(resp).await
1984 }
1985
1986 /// Get the current compaction status for a topic.
1987 ///
1988 /// `GET /admin/v2/persistent/{tenant}/{namespace}/{topic}/compaction`.
1989 /// Returns Java's `LongRunningProcessStatus`: `status` ∈ {`NOT_RUN`,
1990 /// `RUNNING`, `SUCCESS`, `ERROR`} plus an optional `lastError` string.
1991 /// Java: `PersistentTopics#compactionStatus`.
1992 pub async fn topic_compaction_status(
1993 &self,
1994 topic: &str,
1995 ) -> Result<LongRunningProcessStatus, AdminError> {
1996 let (tenant, namespace, name) = split_topic(topic)?;
1997 let url = self.url(&["persistent", tenant, namespace, name, "compaction"])?;
1998 let resp = self.send(self.http.request(Method::GET, url)).await?;
1999 json_ok(resp).await
2000 }
2001
2002 /// Unload a topic from its current broker (forces rebalancing).
2003 ///
2004 /// `PUT /admin/v2/persistent/{tenant}/{namespace}/{topic}/unload`.
2005 /// Operators use this to drain a hot broker or to re-elect ownership
2006 /// after a configuration change. Java: `PersistentTopics#unloadTopic`.
2007 pub async fn topic_unload(&self, topic: &str) -> Result<(), AdminError> {
2008 let (tenant, namespace, name) = split_topic(topic)?;
2009 let url = self.url(&["persistent", tenant, namespace, name, "unload"])?;
2010 let resp = self.send(self.http.request(Method::PUT, url)).await?;
2011 empty_ok(resp).await
2012 }
2013
2014 /// Terminate (seal) a topic — no further produces succeed.
2015 ///
2016 /// `POST /admin/v2/persistent/{tenant}/{namespace}/{topic}/terminate`.
2017 /// Returns the [`MessageId`] of the last message that landed before
2018 /// the seal, or `None` when the broker reports the
2019 /// `MessageIdImpl(-1, -1)` sentinel — meaning the topic was sealed
2020 /// before any confirmed entry was written (a freshly-created topic,
2021 /// or a topic whose owner was just re-elected). The Java client
2022 /// surfaces that case as `MessageId.earliest`; we use `Option` so
2023 /// callers don't have to special-case a magic value.
2024 /// Java: `PersistentTopics#terminate`.
2025 pub async fn topic_terminate(&self, topic: &str) -> Result<Option<MessageId>, AdminError> {
2026 let (tenant, namespace, name) = split_topic(topic)?;
2027 let url = self.url(&["persistent", tenant, namespace, name, "terminate"])?;
2028 let resp = self.send(self.http.request(Method::POST, url)).await?;
2029 let dto: MessageIdResponse = json_ok(resp).await?;
2030 if dto.ledger_id < 0 && dto.entry_id < 0 {
2031 return Ok(None);
2032 }
2033 dto.try_into_message_id().map(Some)
2034 }
2035
2036 /// Grow a partitioned topic's partition count.
2037 ///
2038 /// `POST /admin/v2/persistent/{tenant}/{namespace}/{topic}/partitions`
2039 /// with a bare JSON integer body. Only forward (grow) is supported by
2040 /// the broker — shrinking returns 409. Java:
2041 /// `PersistentTopics#updatePartitionedTopic`.
2042 pub async fn topic_update_partitions(
2043 &self,
2044 topic: &str,
2045 new_partitions: u32,
2046 ) -> Result<(), AdminError> {
2047 let (tenant, namespace, name) = split_topic(topic)?;
2048 let url = self.url(&["persistent", tenant, namespace, name, "partitions"])?;
2049 let resp = self
2050 .send(self.http.request(Method::POST, url).json(&new_partitions))
2051 .await?;
2052 empty_ok(resp).await
2053 }
2054
2055 // --- Topic policies — per-topic overrides ---------------------------
2056
2057 /// Get a topic's retention policy.
2058 ///
2059 /// `GET /admin/v2/persistent/{tenant}/{ns}/{topic}/retention`.
2060 /// Returns the per-topic [`RetentionPolicies`] override; the broker
2061 /// emits a `RetentionPolicies` JSON when the policy is set and a bare
2062 /// `null` (decoded as `RetentionPolicies::default()` via `#[serde(default)]`)
2063 /// when no override is in place — callers fall back to the namespace
2064 /// policy in that case. Java: `PersistentTopicsBase#getRetention`.
2065 pub async fn topic_get_retention(&self, topic: &str) -> Result<RetentionPolicies, AdminError> {
2066 let (tenant, namespace, name) = split_topic(topic)?;
2067 let url = self.url(&["persistent", tenant, namespace, name, "retention"])?;
2068 let resp = self.send(self.http.request(Method::GET, url)).await?;
2069 json_ok_or_default(resp).await
2070 }
2071
2072 /// Set a topic's retention policy (overrides the namespace default).
2073 ///
2074 /// `POST /admin/v2/persistent/{tenant}/{ns}/{topic}/retention` with a
2075 /// JSON `RetentionPolicies` body. `-1` means infinite (size or time).
2076 /// Java: `PersistentTopicsBase#setRetention`.
2077 pub async fn topic_set_retention(
2078 &self,
2079 topic: &str,
2080 policy: RetentionPolicies,
2081 ) -> Result<(), AdminError> {
2082 let (tenant, namespace, name) = split_topic(topic)?;
2083 let url = self.url(&["persistent", tenant, namespace, name, "retention"])?;
2084 let resp = self
2085 .send(self.http.request(Method::POST, url).json(&policy))
2086 .await?;
2087 empty_ok(resp).await
2088 }
2089
2090 /// Remove a topic's retention policy (fall back to namespace default).
2091 ///
2092 /// `DELETE /admin/v2/persistent/{tenant}/{ns}/{topic}/retention`.
2093 /// Java: `PersistentTopicsBase#removeRetention`.
2094 pub async fn topic_remove_retention(&self, topic: &str) -> Result<(), AdminError> {
2095 let (tenant, namespace, name) = split_topic(topic)?;
2096 let url = self.url(&["persistent", tenant, namespace, name, "retention"])?;
2097 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
2098 empty_ok(resp).await
2099 }
2100
2101 /// Get all backlog-quota policies on a topic.
2102 ///
2103 /// `GET /admin/v2/persistent/{tenant}/{ns}/{topic}/backlogQuotaMap`.
2104 /// Returns `Map<BacklogQuotaType, BacklogQuota>` — kept as raw JSON
2105 /// for the same reason as [`Self::namespace_get_backlog_quotas`]:
2106 /// broker minor versions add quota types.
2107 /// Java: `PersistentTopicsBase#getBacklogQuotaMap`.
2108 pub async fn topic_get_backlog_quotas(
2109 &self,
2110 topic: &str,
2111 ) -> Result<serde_json::Value, AdminError> {
2112 let (tenant, namespace, name) = split_topic(topic)?;
2113 let url = self.url(&["persistent", tenant, namespace, name, "backlogQuotaMap"])?;
2114 let resp = self.send(self.http.request(Method::GET, url)).await?;
2115 json_ok(resp).await
2116 }
2117
2118 /// Set a backlog-quota policy on a topic (overrides the namespace
2119 /// default for the matching `backlogQuotaType`).
2120 ///
2121 /// `POST /admin/v2/persistent/{tenant}/{ns}/{topic}/backlogQuota
2122 /// ?backlogQuotaType={type}` with a JSON `BacklogQuota` body.
2123 /// Java: `PersistentTopicsBase#setBacklogQuota`.
2124 pub async fn topic_set_backlog_quota(
2125 &self,
2126 topic: &str,
2127 backlog_quota_type: BacklogQuotaType,
2128 quota: BacklogQuota,
2129 ) -> Result<(), AdminError> {
2130 let (tenant, namespace, name) = split_topic(topic)?;
2131 let mut url = self.url(&["persistent", tenant, namespace, name, "backlogQuota"])?;
2132 url.query_pairs_mut()
2133 .append_pair("backlogQuotaType", backlog_quota_type.as_query_value());
2134 let resp = self
2135 .send(self.http.request(Method::POST, url).json("a))
2136 .await?;
2137 empty_ok(resp).await
2138 }
2139
2140 /// Remove a backlog-quota policy from a topic.
2141 ///
2142 /// `DELETE /admin/v2/persistent/{tenant}/{ns}/{topic}/backlogQuota
2143 /// ?backlogQuotaType={type}`.
2144 /// Java: `PersistentTopicsBase#removeBacklogQuota`.
2145 pub async fn topic_remove_backlog_quota(
2146 &self,
2147 topic: &str,
2148 backlog_quota_type: BacklogQuotaType,
2149 ) -> Result<(), AdminError> {
2150 let (tenant, namespace, name) = split_topic(topic)?;
2151 let mut url = self.url(&["persistent", tenant, namespace, name, "backlogQuota"])?;
2152 url.query_pairs_mut()
2153 .append_pair("backlogQuotaType", backlog_quota_type.as_query_value());
2154 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
2155 empty_ok(resp).await
2156 }
2157
2158 /// Get a topic's message-TTL (seconds, or `null` if unset).
2159 ///
2160 /// `GET /admin/v2/persistent/{tenant}/{ns}/{topic}/messageTTL`. Returns
2161 /// a bare integer when the override is set, `null` (decoded as
2162 /// `Option::None`) when no topic-level override is in place.
2163 /// Java: `PersistentTopicsBase#getMessageTTL`.
2164 pub async fn topic_get_message_ttl(&self, topic: &str) -> Result<Option<i32>, AdminError> {
2165 let (tenant, namespace, name) = split_topic(topic)?;
2166 let url = self.url(&["persistent", tenant, namespace, name, "messageTTL"])?;
2167 let resp = self.send(self.http.request(Method::GET, url)).await?;
2168 json_ok_optional(resp).await
2169 }
2170
2171 /// Set a topic's message-TTL (seconds).
2172 ///
2173 /// `POST /admin/v2/persistent/{tenant}/{ns}/{topic}/messageTTL?messageTTL={n}`.
2174 /// `0` disables (broker treats as no TTL).
2175 ///
2176 /// Note: unlike the **namespace**-level `setNamespaceMessageTTL` (which
2177 /// takes the TTL as a JSON int body), the topic-level setter binds
2178 /// `@QueryParam("messageTTL") Integer messageTTL` on both Pulsar 4.0 and
2179 /// 4.2 (`pulsar-broker/.../v2/PersistentTopics.java#setMessageTTL`).
2180 /// Sending the value as a JSON body returns 204 No Content but the
2181 /// broker reads the query param as `null` and silently treats the call
2182 /// as "no override" — the topic policy never persists, and a subsequent
2183 /// `topic_get_message_ttl` surfaces `Ok(None)`. Older Pulsar releases
2184 /// tolerated both encodings; 4.2 enforces the query-param shape.
2185 /// Java: `PersistentTopicsBase#setMessageTTL`.
2186 pub async fn topic_set_message_ttl(
2187 &self,
2188 topic: &str,
2189 ttl_seconds: i32,
2190 ) -> Result<(), AdminError> {
2191 let (tenant, namespace, name) = split_topic(topic)?;
2192 let mut url = self.url(&["persistent", tenant, namespace, name, "messageTTL"])?;
2193 url.query_pairs_mut()
2194 .append_pair("messageTTL", &ttl_seconds.to_string());
2195 let resp = self.send(self.http.request(Method::POST, url)).await?;
2196 empty_ok(resp).await
2197 }
2198
2199 /// Remove a topic's message-TTL (fall back to namespace default).
2200 ///
2201 /// `DELETE /admin/v2/persistent/{tenant}/{ns}/{topic}/messageTTL`.
2202 /// Java: `PersistentTopicsBase#removeMessageTTL`.
2203 pub async fn topic_remove_message_ttl(&self, topic: &str) -> Result<(), AdminError> {
2204 let (tenant, namespace, name) = split_topic(topic)?;
2205 let url = self.url(&["persistent", tenant, namespace, name, "messageTTL"])?;
2206 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
2207 empty_ok(resp).await
2208 }
2209
2210 /// Get a topic's persistence policy.
2211 ///
2212 /// `GET /admin/v2/persistent/{tenant}/{ns}/{topic}/persistence`. The
2213 /// broker emits a `PersistencePolicies` JSON when the topic override
2214 /// is set and `null` (decoded as `Option::None`) when no override is
2215 /// in place — callers fall back to the namespace policy in that case.
2216 /// Java: `PersistentTopicsBase#getPersistence`.
2217 pub async fn topic_get_persistence(
2218 &self,
2219 topic: &str,
2220 ) -> Result<Option<PersistencePolicies>, AdminError> {
2221 let (tenant, namespace, name) = split_topic(topic)?;
2222 let url = self.url(&["persistent", tenant, namespace, name, "persistence"])?;
2223 let resp = self.send(self.http.request(Method::GET, url)).await?;
2224 json_ok_optional(resp).await
2225 }
2226
2227 /// Set a topic's persistence policy (overrides the namespace default).
2228 ///
2229 /// `POST /admin/v2/persistent/{tenant}/{ns}/{topic}/persistence` with a
2230 /// JSON `PersistencePolicies` body.
2231 /// Java: `PersistentTopicsBase#setPersistence`.
2232 pub async fn topic_set_persistence(
2233 &self,
2234 topic: &str,
2235 policy: PersistencePolicies,
2236 ) -> Result<(), AdminError> {
2237 let (tenant, namespace, name) = split_topic(topic)?;
2238 let url = self.url(&["persistent", tenant, namespace, name, "persistence"])?;
2239 let resp = self
2240 .send(self.http.request(Method::POST, url).json(&policy))
2241 .await?;
2242 empty_ok(resp).await
2243 }
2244
2245 /// Remove a topic's persistence policy (fall back to namespace default).
2246 ///
2247 /// `DELETE /admin/v2/persistent/{tenant}/{ns}/{topic}/persistence`.
2248 /// Java: `PersistentTopicsBase#removePersistence`.
2249 pub async fn topic_remove_persistence(&self, topic: &str) -> Result<(), AdminError> {
2250 let (tenant, namespace, name) = split_topic(topic)?;
2251 let url = self.url(&["persistent", tenant, namespace, name, "persistence"])?;
2252 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
2253 empty_ok(resp).await
2254 }
2255
2256 /// Get a topic's consumer dispatch-rate policy (or `null` if no override).
2257 ///
2258 /// `GET /admin/v2/persistent/{tenant}/{ns}/{topic}/dispatchRate`. The
2259 /// broker emits the per-topic [`DispatchRate`] override or `null` when
2260 /// no override is set; callers fall back to the namespace policy in the
2261 /// `None` case. Java: `PersistentTopicsBase#getDispatchRate`.
2262 pub async fn topic_get_dispatch_rate(
2263 &self,
2264 topic: &str,
2265 ) -> Result<Option<DispatchRate>, AdminError> {
2266 let (tenant, namespace, name) = split_topic(topic)?;
2267 let url = self.url(&["persistent", tenant, namespace, name, "dispatchRate"])?;
2268 let resp = self.send(self.http.request(Method::GET, url)).await?;
2269 json_ok_optional(resp).await
2270 }
2271
2272 /// Set a topic's consumer dispatch-rate policy (overrides namespace default).
2273 ///
2274 /// `POST /admin/v2/persistent/{tenant}/{ns}/{topic}/dispatchRate` with a
2275 /// JSON `DispatchRate` body. Java: `PersistentTopicsBase#setDispatchRate`.
2276 pub async fn topic_set_dispatch_rate(
2277 &self,
2278 topic: &str,
2279 rate: DispatchRate,
2280 ) -> Result<(), AdminError> {
2281 let (tenant, namespace, name) = split_topic(topic)?;
2282 let url = self.url(&["persistent", tenant, namespace, name, "dispatchRate"])?;
2283 let resp = self
2284 .send(self.http.request(Method::POST, url).json(&rate))
2285 .await?;
2286 empty_ok(resp).await
2287 }
2288
2289 /// Remove a topic's consumer dispatch-rate policy.
2290 ///
2291 /// `DELETE /admin/v2/persistent/{tenant}/{ns}/{topic}/dispatchRate`.
2292 /// Java: `PersistentTopicsBase#removeDispatchRate`.
2293 pub async fn topic_remove_dispatch_rate(&self, topic: &str) -> Result<(), AdminError> {
2294 let (tenant, namespace, name) = split_topic(topic)?;
2295 let url = self.url(&["persistent", tenant, namespace, name, "dispatchRate"])?;
2296 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
2297 empty_ok(resp).await
2298 }
2299
2300 /// Get a topic's per-subscription dispatch-rate policy (or `null`).
2301 ///
2302 /// `GET /admin/v2/persistent/{tenant}/{ns}/{topic}/subscriptionDispatchRate`.
2303 /// Reuses the [`DispatchRate`] body shape — the policy applies per
2304 /// subscription rather than aggregated across all consumers.
2305 /// Java: `PersistentTopicsBase#getSubscriptionDispatchRate`.
2306 pub async fn topic_get_subscription_dispatch_rate(
2307 &self,
2308 topic: &str,
2309 ) -> Result<Option<DispatchRate>, AdminError> {
2310 let (tenant, namespace, name) = split_topic(topic)?;
2311 let url = self.url(&[
2312 "persistent",
2313 tenant,
2314 namespace,
2315 name,
2316 "subscriptionDispatchRate",
2317 ])?;
2318 let resp = self.send(self.http.request(Method::GET, url)).await?;
2319 json_ok_optional(resp).await
2320 }
2321
2322 /// Set a topic's per-subscription dispatch-rate policy (overrides namespace default).
2323 ///
2324 /// `POST /admin/v2/persistent/{tenant}/{ns}/{topic}/subscriptionDispatchRate`
2325 /// with a JSON `DispatchRate` body.
2326 /// Java: `PersistentTopicsBase#setSubscriptionDispatchRate`.
2327 pub async fn topic_set_subscription_dispatch_rate(
2328 &self,
2329 topic: &str,
2330 rate: DispatchRate,
2331 ) -> Result<(), AdminError> {
2332 let (tenant, namespace, name) = split_topic(topic)?;
2333 let url = self.url(&[
2334 "persistent",
2335 tenant,
2336 namespace,
2337 name,
2338 "subscriptionDispatchRate",
2339 ])?;
2340 let resp = self
2341 .send(self.http.request(Method::POST, url).json(&rate))
2342 .await?;
2343 empty_ok(resp).await
2344 }
2345
2346 /// Remove a topic's per-subscription dispatch-rate policy.
2347 ///
2348 /// `DELETE /admin/v2/persistent/{tenant}/{ns}/{topic}/subscriptionDispatchRate`.
2349 /// Java: `PersistentTopicsBase#removeSubscriptionDispatchRate`.
2350 pub async fn topic_remove_subscription_dispatch_rate(
2351 &self,
2352 topic: &str,
2353 ) -> Result<(), AdminError> {
2354 let (tenant, namespace, name) = split_topic(topic)?;
2355 let url = self.url(&[
2356 "persistent",
2357 tenant,
2358 namespace,
2359 name,
2360 "subscriptionDispatchRate",
2361 ])?;
2362 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
2363 empty_ok(resp).await
2364 }
2365
2366 /// Get a topic's cross-cluster replicator dispatch-rate policy (or `null`).
2367 ///
2368 /// `GET /admin/v2/persistent/{tenant}/{ns}/{topic}/replicatorDispatchRate`.
2369 /// Reuses the [`DispatchRate`] body shape — the policy throttles
2370 /// outbound geo-replication traffic from this cluster.
2371 /// Java: `PersistentTopicsBase#getReplicatorDispatchRate`.
2372 pub async fn topic_get_replicator_dispatch_rate(
2373 &self,
2374 topic: &str,
2375 ) -> Result<Option<DispatchRate>, AdminError> {
2376 let (tenant, namespace, name) = split_topic(topic)?;
2377 let url = self.url(&[
2378 "persistent",
2379 tenant,
2380 namespace,
2381 name,
2382 "replicatorDispatchRate",
2383 ])?;
2384 let resp = self.send(self.http.request(Method::GET, url)).await?;
2385 json_ok_optional(resp).await
2386 }
2387
2388 /// Set a topic's cross-cluster replicator dispatch-rate policy.
2389 ///
2390 /// `POST /admin/v2/persistent/{tenant}/{ns}/{topic}/replicatorDispatchRate`
2391 /// with a JSON `DispatchRate` body.
2392 /// Java: `PersistentTopicsBase#setReplicatorDispatchRate`.
2393 pub async fn topic_set_replicator_dispatch_rate(
2394 &self,
2395 topic: &str,
2396 rate: DispatchRate,
2397 ) -> Result<(), AdminError> {
2398 let (tenant, namespace, name) = split_topic(topic)?;
2399 let url = self.url(&[
2400 "persistent",
2401 tenant,
2402 namespace,
2403 name,
2404 "replicatorDispatchRate",
2405 ])?;
2406 let resp = self
2407 .send(self.http.request(Method::POST, url).json(&rate))
2408 .await?;
2409 empty_ok(resp).await
2410 }
2411
2412 /// Remove a topic's cross-cluster replicator dispatch-rate policy.
2413 ///
2414 /// `DELETE /admin/v2/persistent/{tenant}/{ns}/{topic}/replicatorDispatchRate`.
2415 /// Java: `PersistentTopicsBase#removeReplicatorDispatchRate`.
2416 pub async fn topic_remove_replicator_dispatch_rate(
2417 &self,
2418 topic: &str,
2419 ) -> Result<(), AdminError> {
2420 let (tenant, namespace, name) = split_topic(topic)?;
2421 let url = self.url(&[
2422 "persistent",
2423 tenant,
2424 namespace,
2425 name,
2426 "replicatorDispatchRate",
2427 ])?;
2428 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
2429 empty_ok(resp).await
2430 }
2431
2432 /// Get a topic's publish-rate policy (or `null` if no override).
2433 ///
2434 /// `GET /admin/v2/persistent/{tenant}/{ns}/{topic}/publishRate`. Returns
2435 /// the per-topic [`PublishRate`] producer-side throttle (msg/sec +
2436 /// byte/sec). `-1` on either dimension means unlimited.
2437 /// Java: `PersistentTopicsBase#getPublishRate`.
2438 pub async fn topic_get_publish_rate(
2439 &self,
2440 topic: &str,
2441 ) -> Result<Option<PublishRate>, AdminError> {
2442 let (tenant, namespace, name) = split_topic(topic)?;
2443 let url = self.url(&["persistent", tenant, namespace, name, "publishRate"])?;
2444 let resp = self.send(self.http.request(Method::GET, url)).await?;
2445 json_ok_optional(resp).await
2446 }
2447
2448 /// Set a topic's publish-rate policy (overrides namespace default).
2449 ///
2450 /// `POST /admin/v2/persistent/{tenant}/{ns}/{topic}/publishRate` with a
2451 /// JSON `PublishRate` body. Java: `PersistentTopicsBase#setPublishRate`.
2452 pub async fn topic_set_publish_rate(
2453 &self,
2454 topic: &str,
2455 rate: PublishRate,
2456 ) -> Result<(), AdminError> {
2457 let (tenant, namespace, name) = split_topic(topic)?;
2458 let url = self.url(&["persistent", tenant, namespace, name, "publishRate"])?;
2459 let resp = self
2460 .send(self.http.request(Method::POST, url).json(&rate))
2461 .await?;
2462 empty_ok(resp).await
2463 }
2464
2465 /// Remove a topic's publish-rate policy.
2466 ///
2467 /// `DELETE /admin/v2/persistent/{tenant}/{ns}/{topic}/publishRate`.
2468 /// Java: `PersistentTopicsBase#removePublishRate`.
2469 pub async fn topic_remove_publish_rate(&self, topic: &str) -> Result<(), AdminError> {
2470 let (tenant, namespace, name) = split_topic(topic)?;
2471 let url = self.url(&["persistent", tenant, namespace, name, "publishRate"])?;
2472 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
2473 empty_ok(resp).await
2474 }
2475
2476 /// Get a topic's max-producers cap (or `null` if no override).
2477 ///
2478 /// `GET /admin/v2/persistent/{tenant}/{ns}/{topic}/maxProducers`. Returns
2479 /// a bare integer when the override is set, `null` (decoded as
2480 /// `Option::None`) when no topic-level cap is in place.
2481 /// Java: `PersistentTopicsBase#getMaxProducers`.
2482 pub async fn topic_get_max_producers(&self, topic: &str) -> Result<Option<i32>, AdminError> {
2483 let (tenant, namespace, name) = split_topic(topic)?;
2484 let url = self.url(&["persistent", tenant, namespace, name, "maxProducers"])?;
2485 let resp = self.send(self.http.request(Method::GET, url)).await?;
2486 json_ok_optional(resp).await
2487 }
2488
2489 /// Set a topic's max-producers cap.
2490 ///
2491 /// `POST /admin/v2/persistent/{tenant}/{ns}/{topic}/maxProducers` with
2492 /// a bare integer body. `0` disables (broker treats as unlimited).
2493 /// Java: `PersistentTopicsBase#setMaxProducers`.
2494 pub async fn topic_set_max_producers(
2495 &self,
2496 topic: &str,
2497 max_producers: i32,
2498 ) -> Result<(), AdminError> {
2499 let (tenant, namespace, name) = split_topic(topic)?;
2500 let url = self.url(&["persistent", tenant, namespace, name, "maxProducers"])?;
2501 let resp = self
2502 .send(self.http.request(Method::POST, url).json(&max_producers))
2503 .await?;
2504 empty_ok(resp).await
2505 }
2506
2507 /// Remove a topic's max-producers cap (fall back to namespace / broker default).
2508 ///
2509 /// `DELETE /admin/v2/persistent/{tenant}/{ns}/{topic}/maxProducers`.
2510 /// Java: `PersistentTopicsBase#removeMaxProducers`.
2511 pub async fn topic_remove_max_producers(&self, topic: &str) -> Result<(), AdminError> {
2512 let (tenant, namespace, name) = split_topic(topic)?;
2513 let url = self.url(&["persistent", tenant, namespace, name, "maxProducers"])?;
2514 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
2515 empty_ok(resp).await
2516 }
2517
2518 /// Get a topic's max-consumers cap (or `null` if no override).
2519 ///
2520 /// `GET /admin/v2/persistent/{tenant}/{ns}/{topic}/maxConsumers`. Returns
2521 /// a bare integer when the override is set, `null` (decoded as
2522 /// `Option::None`) when no topic-level cap is in place.
2523 /// Java: `PersistentTopicsBase#getMaxConsumers`.
2524 pub async fn topic_get_max_consumers(&self, topic: &str) -> Result<Option<i32>, AdminError> {
2525 let (tenant, namespace, name) = split_topic(topic)?;
2526 let url = self.url(&["persistent", tenant, namespace, name, "maxConsumers"])?;
2527 let resp = self.send(self.http.request(Method::GET, url)).await?;
2528 json_ok_optional(resp).await
2529 }
2530
2531 /// Set a topic's max-consumers cap.
2532 ///
2533 /// `POST /admin/v2/persistent/{tenant}/{ns}/{topic}/maxConsumers` with
2534 /// a bare integer body. `0` disables (broker treats as unlimited).
2535 /// Java: `PersistentTopicsBase#setMaxConsumers`.
2536 pub async fn topic_set_max_consumers(
2537 &self,
2538 topic: &str,
2539 max_consumers: i32,
2540 ) -> Result<(), AdminError> {
2541 let (tenant, namespace, name) = split_topic(topic)?;
2542 let url = self.url(&["persistent", tenant, namespace, name, "maxConsumers"])?;
2543 let resp = self
2544 .send(self.http.request(Method::POST, url).json(&max_consumers))
2545 .await?;
2546 empty_ok(resp).await
2547 }
2548
2549 /// Remove a topic's max-consumers cap (fall back to namespace / broker default).
2550 ///
2551 /// `DELETE /admin/v2/persistent/{tenant}/{ns}/{topic}/maxConsumers`.
2552 /// Java: `PersistentTopicsBase#removeMaxConsumers`.
2553 pub async fn topic_remove_max_consumers(&self, topic: &str) -> Result<(), AdminError> {
2554 let (tenant, namespace, name) = split_topic(topic)?;
2555 let url = self.url(&["persistent", tenant, namespace, name, "maxConsumers"])?;
2556 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
2557 empty_ok(resp).await
2558 }
2559
2560 // --- Subscriptions ---------------------------------------------------
2561
2562 /// List subscription names on a topic.
2563 ///
2564 /// `GET /admin/v2/persistent/{tenant}/{namespace}/{topic}/subscriptions`.
2565 /// Java: `PersistentTopics#getSubscriptions`.
2566 pub async fn subscriptions_list(&self, topic: &str) -> Result<Vec<String>, AdminError> {
2567 let (tenant, namespace, name) = split_topic(topic)?;
2568 let url = self.url(&["persistent", tenant, namespace, name, "subscriptions"])?;
2569 let resp = self.send(self.http.request(Method::GET, url)).await?;
2570 json_ok(resp).await
2571 }
2572
2573 /// Reset a subscription's cursor to a specific message-id position.
2574 ///
2575 /// `POST /admin/v2/persistent/{tenant}/{namespace}/{topic}/subscription/{sub}/resetcursor`
2576 /// with body `{ledgerId, entryId, partitionIndex, batchIndex, isExcluded}`.
2577 /// `is_excluded = true` skips the message at `message_id` itself; `false`
2578 /// leaves it eligible for redelivery. Java: `PersistentTopics#resetCursorOnPosition`.
2579 pub async fn subscription_reset_cursor_to_position(
2580 &self,
2581 topic: &str,
2582 subscription: &str,
2583 message_id: MessageId,
2584 is_excluded: bool,
2585 ) -> Result<(), AdminError> {
2586 let (tenant, namespace, name) = split_topic(topic)?;
2587 validate_segment(subscription)?;
2588 let url = self.url(&[
2589 "persistent",
2590 tenant,
2591 namespace,
2592 name,
2593 "subscription",
2594 subscription,
2595 "resetcursor",
2596 ])?;
2597 let body = ResetCursorData {
2598 ledger_id: message_id_field_for_wire(message_id.ledger_id),
2599 entry_id: message_id_field_for_wire(message_id.entry_id),
2600 partition_index: message_id.partition,
2601 batch_index: message_id.batch_index,
2602 is_excluded,
2603 };
2604 let resp = self
2605 .send(self.http.request(Method::POST, url).json(&body))
2606 .await?;
2607 empty_ok(resp).await
2608 }
2609
2610 /// Reset a subscription's cursor to a wall-clock timestamp (millis since epoch).
2611 ///
2612 /// `POST /admin/v2/persistent/{tenant}/{namespace}/{topic}/subscription/{sub}/resetcursor/
2613 /// {timestamp}`. Java: `PersistentTopics#resetCursor(topic, sub, timestamp)`.
2614 pub async fn subscription_reset_cursor_to_timestamp(
2615 &self,
2616 topic: &str,
2617 subscription: &str,
2618 timestamp_millis: u64,
2619 ) -> Result<(), AdminError> {
2620 let (tenant, namespace, name) = split_topic(topic)?;
2621 validate_segment(subscription)?;
2622 let timestamp = timestamp_millis.to_string();
2623 let url = self.url(&[
2624 "persistent",
2625 tenant,
2626 namespace,
2627 name,
2628 "subscription",
2629 subscription,
2630 "resetcursor",
2631 ×tamp,
2632 ])?;
2633 let resp = self.send(self.http.request(Method::POST, url)).await?;
2634 empty_ok(resp).await
2635 }
2636
2637 /// Advance a subscription's cursor past N undelivered messages.
2638 ///
2639 /// `POST /admin/v2/persistent/{tenant}/{namespace}/{topic}/subscription/{sub}/skip/
2640 /// {numMessages}`. Java: `PersistentTopics#skipMessages`.
2641 pub async fn subscription_skip_messages(
2642 &self,
2643 topic: &str,
2644 subscription: &str,
2645 num_messages: u64,
2646 ) -> Result<(), AdminError> {
2647 let (tenant, namespace, name) = split_topic(topic)?;
2648 validate_segment(subscription)?;
2649 let n = num_messages.to_string();
2650 let url = self.url(&[
2651 "persistent",
2652 tenant,
2653 namespace,
2654 name,
2655 "subscription",
2656 subscription,
2657 "skip",
2658 &n,
2659 ])?;
2660 let resp = self.send(self.http.request(Method::POST, url)).await?;
2661 empty_ok(resp).await
2662 }
2663
2664 /// Drain the entire backlog of a subscription (clear-backlog).
2665 ///
2666 /// `POST /admin/v2/persistent/{tenant}/{namespace}/{topic}/subscription/{sub}/skip_all`.
2667 /// Java: `PersistentTopics#skipAllMessages`.
2668 pub async fn subscription_skip_all_messages(
2669 &self,
2670 topic: &str,
2671 subscription: &str,
2672 ) -> Result<(), AdminError> {
2673 let (tenant, namespace, name) = split_topic(topic)?;
2674 validate_segment(subscription)?;
2675 let url = self.url(&[
2676 "persistent",
2677 tenant,
2678 namespace,
2679 name,
2680 "subscription",
2681 subscription,
2682 "skip_all",
2683 ])?;
2684 let resp = self.send(self.http.request(Method::POST, url)).await?;
2685 empty_ok(resp).await
2686 }
2687
2688 /// Expire all messages older than `expire_time_seconds` for a subscription.
2689 ///
2690 /// `POST /admin/v2/persistent/{tenant}/{namespace}/{topic}/subscription/{sub}/expireMessages/
2691 /// {seconds}`. Java: `PersistentTopics#expireMessagesForSubscription`.
2692 pub async fn subscription_expire_messages(
2693 &self,
2694 topic: &str,
2695 subscription: &str,
2696 expire_time_seconds: u64,
2697 ) -> Result<(), AdminError> {
2698 let (tenant, namespace, name) = split_topic(topic)?;
2699 validate_segment(subscription)?;
2700 let s = expire_time_seconds.to_string();
2701 let url = self.url(&[
2702 "persistent",
2703 tenant,
2704 namespace,
2705 name,
2706 "subscription",
2707 subscription,
2708 "expireMessages",
2709 &s,
2710 ])?;
2711 let resp = self.send(self.http.request(Method::POST, url)).await?;
2712 empty_ok(resp).await
2713 }
2714
2715 /// Delete (unsubscribe) a subscription.
2716 ///
2717 /// `DELETE /admin/v2/persistent/{tenant}/{namespace}/{topic}/subscription/{sub}?force={force}`.
2718 /// `force = true` disconnects active consumers before deletion. Java:
2719 /// `PersistentTopics#deleteSubscription`.
2720 pub async fn subscription_delete(
2721 &self,
2722 topic: &str,
2723 subscription: &str,
2724 force: bool,
2725 ) -> Result<(), AdminError> {
2726 let (tenant, namespace, name) = split_topic(topic)?;
2727 validate_segment(subscription)?;
2728 let mut url = self.url(&[
2729 "persistent",
2730 tenant,
2731 namespace,
2732 name,
2733 "subscription",
2734 subscription,
2735 ])?;
2736 if force {
2737 url.query_pairs_mut().append_pair("force", "true");
2738 }
2739 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
2740 empty_ok(resp).await
2741 }
2742
2743 // --- Shadow topics (PIP-180 / ADR-0033) ------------------------------
2744
2745 /// Create a shadow topic ([PIP-180](https://github.com/apache/pulsar/blob/master/pip/pip-180.md)).
2746 ///
2747 /// Creates the shadow as a regular non-partitioned topic with the broker-reserved
2748 /// `PULSAR.SHADOW_SOURCE` topic property pointing at the source topic:
2749 /// `PUT /admin/v2/persistent/{tenant}/{namespace}/{shadow}` with body
2750 /// `{ "PULSAR.SHADOW_SOURCE": "persistent://tenant/ns/source" }`.
2751 /// Then links the created shadow in the source topic policy list via
2752 /// `PUT /admin/v2/persistent/{tenant}/{namespace}/{source}/shadowTopics`.
2753 ///
2754 /// Java:
2755 /// `pulsar-client-admin/.../TopicsImpl.java#createShadowTopicAsync` builds the same property
2756 /// map before calling `createNonPartitionedTopicAsync` for non-partitioned sources.
2757 ///
2758 /// Errors mirror the existing `AdminError` taxonomy: 404 → `Status { code:
2759 /// 404, .. }` (the source topic does not exist), 409 → `Status { code:
2760 /// 409, .. }` (the shadow topic already exists on this source),
2761 /// 401/403 → `Status { code: 401|403, .. }` (auth).
2762 pub async fn create_shadow_topic(&self, source: &str, shadow: &str) -> Result<(), AdminError> {
2763 let (source_tenant, source_namespace, source_name) = split_topic(source)?;
2764 // Validate the shadow name eagerly so a misformatted argument errors
2765 // out with `InvalidName` rather than as a broker 4xx after we've
2766 // already crossed the wire.
2767 let (shadow_tenant, shadow_namespace, shadow_name) = split_topic(shadow)?;
2768 let source = format!("persistent://{source_tenant}/{source_namespace}/{source_name}");
2769 let shadow = format!("persistent://{shadow_tenant}/{shadow_namespace}/{shadow_name}");
2770 let mut properties = HashMap::new();
2771 properties.insert("PULSAR.SHADOW_SOURCE".to_owned(), source.clone());
2772 self.topic_create_non_partitioned_with_properties(&shadow, &properties)
2773 .await?;
2774 self.set_shadow_topics(&source, &[shadow]).await
2775 }
2776
2777 async fn set_shadow_topics(&self, source: &str, shadows: &[String]) -> Result<(), AdminError> {
2778 let (tenant, namespace, name) = split_topic(source)?;
2779 let url = self.url(&["persistent", tenant, namespace, name, "shadowTopics"])?;
2780 let resp = self
2781 .send(self.http.request(Method::PUT, url).json(shadows))
2782 .await?;
2783 empty_ok(resp).await
2784 }
2785
2786 /// Delete a shadow topic (PIP-180).
2787 ///
2788 /// `DELETE /admin/v2/persistent/{tenant}/{namespace}/{topic}` where
2789 /// `{topic}` is the **shadow** topic name. PIP-180's deletion contract
2790 /// goes through the regular topic-delete path on the shadow itself —
2791 /// the broker recognises the topic as a shadow and detaches it from
2792 /// the source ledger atomically with the metadata delete.
2793 ///
2794 /// `force` controls whether active subscribers are kicked off before
2795 /// the delete (`?force=true`) or whether the broker rejects the
2796 /// request when subscribers exist (`?force=false`, the default).
2797 ///
2798 /// Java: `org.apache.pulsar.client.admin.Topics#deleteShadowTopic` calls
2799 /// the same `@DELETE @Path("/{tenant}/{namespace}/{topic}")` endpoint.
2800 pub async fn delete_shadow_topic(&self, shadow: &str, force: bool) -> Result<(), AdminError> {
2801 let (tenant, namespace, name) = split_topic(shadow)?;
2802 let mut url = self.url(&["persistent", tenant, namespace, name])?;
2803 url.query_pairs_mut()
2804 .append_pair("force", if force { "true" } else { "false" });
2805 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
2806 empty_ok(resp).await
2807 }
2808
2809 /// List the shadow topics created on a source topic (PIP-180).
2810 ///
2811 /// `GET /admin/v2/persistent/{tenant}/{namespace}/{topic}/shadowTopics`
2812 /// where `{topic}` is the **source** topic name. The broker returns a
2813 /// JSON array of fully-qualified shadow topic names.
2814 ///
2815 /// Java:
2816 /// `pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java`
2817 /// (`@GET @Path("/{tenant}/{namespace}/{topic}/shadowTopics")`).
2818 ///
2819 /// Used by the runtime engine at consumer subscribe time: when the user
2820 /// subscribes to a topic the runtime cannot yet classify, a single
2821 /// `get_shadow_topics` lookup on every other topic in the namespace is
2822 /// expensive; instead the runtime calls `get_shadow_topics(subscribed)`
2823 /// on the topic itself — a non-shadow topic returns an empty array, a
2824 /// shadow topic surfaces nothing but the broker has already populated
2825 /// the consumer's `shadow_metadata` via the topic's policy.
2826 /// (See `crates/magnetar-runtime-tokio/src/client.rs::subscribe`.)
2827 pub async fn get_shadow_topics(&self, source: &str) -> Result<Vec<String>, AdminError> {
2828 let (tenant, namespace, name) = split_topic(source)?;
2829 let url = self.url(&["persistent", tenant, namespace, name, "shadowTopics"])?;
2830 let resp = self.send(self.http.request(Method::GET, url)).await?;
2831 json_ok_or_default(resp).await
2832 }
2833
2834 /// Resolve the **source** topic of a shadow topic (PIP-180).
2835 ///
2836 /// `GET /admin/v2/persistent/{tenant}/{namespace}/{topic}/properties`.
2837 /// Returns the `PULSAR.SHADOW_SOURCE` property when the queried topic is a
2838 /// shadow; returns `None` when it is a regular topic. Used by the runtime
2839 /// at subscribe time to populate
2840 /// [`magnetar_proto::ShadowTopicMetadata::source_topic`] on the new
2841 /// consumer (so the receive path can emit
2842 /// [`magnetar_proto::ConnectionEvent::MessageReceivedFromShadow`]
2843 /// without an out-of-band lookup per message).
2844 ///
2845 /// Java: `org.apache.pulsar.client.admin.Topics#getShadowSource` —
2846 /// `TopicsImpl#getShadowSourceAsync` delegates to `getPropertiesAsync`.
2847 pub async fn get_shadow_source(&self, shadow: &str) -> Result<Option<String>, AdminError> {
2848 let (tenant, namespace, name) = split_topic(shadow)?;
2849 let url = self.url(&["persistent", tenant, namespace, name, "properties"])?;
2850 let resp = self.send(self.http.request(Method::GET, url)).await?;
2851 let mut properties: HashMap<String, String> = json_ok_or_default(resp).await?;
2852 Ok(properties
2853 .remove("PULSAR.SHADOW_SOURCE")
2854 .filter(|source| !source.is_empty()))
2855 }
2856
2857 // --- Pulsar IO Sources (/admin/v3/sources/...) ----------------------
2858
2859 /// List sources configured under a namespace.
2860 ///
2861 /// `GET /admin/v3/sources/{tenant}/{namespace}`. Returns the list of
2862 /// source names (the broker emits a JSON array of strings — one
2863 /// entry per declared source, regardless of running state).
2864 /// Java:
2865 /// `pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Sources.java`
2866 /// (`@Path("/sources")`) +
2867 /// `pulsar-broker/.../admin/impl/SourcesBase.java#listSources`.
2868 pub async fn sources_list_by_namespace(
2869 &self,
2870 tenant: &str,
2871 namespace: &str,
2872 ) -> Result<Vec<String>, AdminError> {
2873 validate_segment(tenant)?;
2874 validate_segment(namespace)?;
2875 let url = self.url_v3(&["sources", tenant, namespace])?;
2876 let resp = self.send(self.http.request(Method::GET, url)).await?;
2877 json_ok(resp).await
2878 }
2879
2880 /// Get one source's configuration.
2881 ///
2882 /// `GET /admin/v3/sources/{tenant}/{namespace}/{name}`. Returns the
2883 /// stored `SourceConfig` envelope as raw JSON — minor broker
2884 /// versions extend the shape (new connector knobs, secret refs)
2885 /// faster than a typed Rust DTO can keep up.
2886 /// Java: `SourcesBase#getSourceInfo`.
2887 pub async fn source_get(
2888 &self,
2889 tenant: &str,
2890 namespace: &str,
2891 name: &str,
2892 ) -> Result<serde_json::Value, AdminError> {
2893 validate_segment(tenant)?;
2894 validate_segment(namespace)?;
2895 validate_segment(name)?;
2896 let url = self.url_v3(&["sources", tenant, namespace, name])?;
2897 let resp = self.send(self.http.request(Method::GET, url)).await?;
2898 json_ok(resp).await
2899 }
2900
2901 /// Get a source's running status (per-instance worker telemetry).
2902 ///
2903 /// `GET /admin/v3/sources/{tenant}/{namespace}/{name}/status`.
2904 /// Returns the broker's `SourceStatus` envelope (`numInstances`,
2905 /// `numRunning`, per-instance `workerId` + `running` + last
2906 /// received timestamp). Exposed as raw JSON for forward-compat.
2907 /// Java: `SourcesBase#getSourceStatus`.
2908 pub async fn source_status(
2909 &self,
2910 tenant: &str,
2911 namespace: &str,
2912 name: &str,
2913 ) -> Result<serde_json::Value, AdminError> {
2914 validate_segment(tenant)?;
2915 validate_segment(namespace)?;
2916 validate_segment(name)?;
2917 let url = self.url_v3(&["sources", tenant, namespace, name, "status"])?;
2918 let resp = self.send(self.http.request(Method::GET, url)).await?;
2919 json_ok(resp).await
2920 }
2921
2922 /// Register a source from a remote package URL.
2923 ///
2924 /// `POST /admin/v3/sources/{tenant}/{namespace}/{name}` with
2925 /// `multipart/form-data` carrying two parts: a `url` text part (the
2926 /// package URL — `http(s)://`, `file://`, or `function://` per the
2927 /// broker's `WorkerUtils#downloadFileFromPackageUrl`) and a
2928 /// `sourceConfig` JSON part with the [`SourceConfig`] body.
2929 /// A sibling `source_create` (binary upload) is intentionally not
2930 /// yet exposed — this URL-based variant covers every
2931 /// CI/operator scenario that does not need to ship a JAR through
2932 /// the admin client itself.
2933 /// Java: `SourcesBase#registerSource`.
2934 pub async fn source_create_with_url(
2935 &self,
2936 tenant: &str,
2937 namespace: &str,
2938 name: &str,
2939 url: &str,
2940 config: SourceConfig,
2941 ) -> Result<(), AdminError> {
2942 validate_segment(tenant)?;
2943 validate_segment(namespace)?;
2944 validate_segment(name)?;
2945 let endpoint = self.url_v3(&["sources", tenant, namespace, name])?;
2946 let form = build_url_config_multipart(url, "sourceConfig", &config)?;
2947 let resp = self
2948 .send(
2949 self.http
2950 .request(Method::POST, endpoint)
2951 .multipart(form)
2952 .timeout(PACKAGE_REGISTER_TIMEOUT),
2953 )
2954 .await?;
2955 empty_ok(resp).await
2956 }
2957
2958 /// Update a source from a remote package URL.
2959 ///
2960 /// `PUT /admin/v3/sources/{tenant}/{namespace}/{name}` with the
2961 /// same multipart shape as [`Self::source_create_with_url`].
2962 /// Java: `SourcesBase#updateSource`.
2963 pub async fn source_update_with_url(
2964 &self,
2965 tenant: &str,
2966 namespace: &str,
2967 name: &str,
2968 url: &str,
2969 config: SourceConfig,
2970 ) -> Result<(), AdminError> {
2971 validate_segment(tenant)?;
2972 validate_segment(namespace)?;
2973 validate_segment(name)?;
2974 let endpoint = self.url_v3(&["sources", tenant, namespace, name])?;
2975 let form = build_url_config_multipart(url, "sourceConfig", &config)?;
2976 let resp = self
2977 .send(
2978 self.http
2979 .request(Method::PUT, endpoint)
2980 .multipart(form)
2981 .timeout(PACKAGE_REGISTER_TIMEOUT),
2982 )
2983 .await?;
2984 empty_ok(resp).await
2985 }
2986
2987 /// Delete a source.
2988 ///
2989 /// `DELETE /admin/v3/sources/{tenant}/{namespace}/{name}`. Removes
2990 /// the source declaration and tears the running instances down.
2991 /// Java: `SourcesBase#deregisterSource`.
2992 pub async fn source_delete(
2993 &self,
2994 tenant: &str,
2995 namespace: &str,
2996 name: &str,
2997 ) -> Result<(), AdminError> {
2998 validate_segment(tenant)?;
2999 validate_segment(namespace)?;
3000 validate_segment(name)?;
3001 let url = self.url_v3(&["sources", tenant, namespace, name])?;
3002 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
3003 empty_ok(resp).await
3004 }
3005
3006 /// Start every instance of a source.
3007 ///
3008 /// `POST /admin/v3/sources/{tenant}/{namespace}/{name}/start`.
3009 /// Java: `SourcesBase#startSource`.
3010 pub async fn source_start(
3011 &self,
3012 tenant: &str,
3013 namespace: &str,
3014 name: &str,
3015 ) -> Result<(), AdminError> {
3016 validate_segment(tenant)?;
3017 validate_segment(namespace)?;
3018 validate_segment(name)?;
3019 let url = self.url_v3(&["sources", tenant, namespace, name, "start"])?;
3020 let resp = self.send(self.http.request(Method::POST, url)).await?;
3021 empty_ok(resp).await
3022 }
3023
3024 /// Stop every instance of a source.
3025 ///
3026 /// `POST /admin/v3/sources/{tenant}/{namespace}/{name}/stop`.
3027 /// Java: `SourcesBase#stopSource`.
3028 pub async fn source_stop(
3029 &self,
3030 tenant: &str,
3031 namespace: &str,
3032 name: &str,
3033 ) -> Result<(), AdminError> {
3034 validate_segment(tenant)?;
3035 validate_segment(namespace)?;
3036 validate_segment(name)?;
3037 let url = self.url_v3(&["sources", tenant, namespace, name, "stop"])?;
3038 let resp = self.send(self.http.request(Method::POST, url)).await?;
3039 empty_ok(resp).await
3040 }
3041
3042 /// Restart every instance of a source.
3043 ///
3044 /// `POST /admin/v3/sources/{tenant}/{namespace}/{name}/restart`.
3045 /// Java: `SourcesBase#restartSource`.
3046 pub async fn source_restart(
3047 &self,
3048 tenant: &str,
3049 namespace: &str,
3050 name: &str,
3051 ) -> Result<(), AdminError> {
3052 validate_segment(tenant)?;
3053 validate_segment(namespace)?;
3054 validate_segment(name)?;
3055 let url = self.url_v3(&["sources", tenant, namespace, name, "restart"])?;
3056 let resp = self.send(self.http.request(Method::POST, url)).await?;
3057 empty_ok(resp).await
3058 }
3059
3060 // --- Pulsar IO Sinks (/admin/v3/sinks/...) --------------------------
3061
3062 /// List sinks configured under a namespace.
3063 ///
3064 /// `GET /admin/v3/sinks/{tenant}/{namespace}`. Returns the list of
3065 /// sink names. Mirrors [`Self::sources_list_by_namespace`] —
3066 /// Pulsar's Sources / Sinks REST surfaces are intentionally
3067 /// symmetric.
3068 /// Java:
3069 /// `pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Sinks.java`
3070 /// (`@Path("/sinks")`) +
3071 /// `pulsar-broker/.../admin/impl/SinksBase.java#listSinks`.
3072 pub async fn sinks_list_by_namespace(
3073 &self,
3074 tenant: &str,
3075 namespace: &str,
3076 ) -> Result<Vec<String>, AdminError> {
3077 validate_segment(tenant)?;
3078 validate_segment(namespace)?;
3079 let url = self.url_v3(&["sinks", tenant, namespace])?;
3080 let resp = self.send(self.http.request(Method::GET, url)).await?;
3081 json_ok(resp).await
3082 }
3083
3084 /// Get one sink's configuration.
3085 ///
3086 /// `GET /admin/v3/sinks/{tenant}/{namespace}/{name}`. Returns the
3087 /// stored `SinkConfig` as raw JSON for the same forward-compat
3088 /// reason as [`Self::source_get`]. Java: `SinksBase#getSinkInfo`.
3089 pub async fn sink_get(
3090 &self,
3091 tenant: &str,
3092 namespace: &str,
3093 name: &str,
3094 ) -> Result<serde_json::Value, AdminError> {
3095 validate_segment(tenant)?;
3096 validate_segment(namespace)?;
3097 validate_segment(name)?;
3098 let url = self.url_v3(&["sinks", tenant, namespace, name])?;
3099 let resp = self.send(self.http.request(Method::GET, url)).await?;
3100 json_ok(resp).await
3101 }
3102
3103 /// Get a sink's running status (per-instance worker telemetry).
3104 ///
3105 /// `GET /admin/v3/sinks/{tenant}/{namespace}/{name}/status`.
3106 /// Same envelope shape as the Sources status. Raw JSON.
3107 /// Java: `SinksBase#getSinkStatus`.
3108 pub async fn sink_status(
3109 &self,
3110 tenant: &str,
3111 namespace: &str,
3112 name: &str,
3113 ) -> Result<serde_json::Value, AdminError> {
3114 validate_segment(tenant)?;
3115 validate_segment(namespace)?;
3116 validate_segment(name)?;
3117 let url = self.url_v3(&["sinks", tenant, namespace, name, "status"])?;
3118 let resp = self.send(self.http.request(Method::GET, url)).await?;
3119 json_ok(resp).await
3120 }
3121
3122 /// Register a sink from a remote package URL.
3123 ///
3124 /// `POST /admin/v3/sinks/{tenant}/{namespace}/{name}` with
3125 /// `multipart/form-data` carrying a `url` text part and a
3126 /// `sinkConfig` JSON part. Mirrors
3127 /// [`Self::source_create_with_url`]; the only wire-level
3128 /// difference is the JSON-part field name (`sinkConfig` vs
3129 /// `sourceConfig`). Java: `SinksBase#registerSink`.
3130 pub async fn sink_create_with_url(
3131 &self,
3132 tenant: &str,
3133 namespace: &str,
3134 name: &str,
3135 url: &str,
3136 config: SinkConfig,
3137 ) -> Result<(), AdminError> {
3138 validate_segment(tenant)?;
3139 validate_segment(namespace)?;
3140 validate_segment(name)?;
3141 let endpoint = self.url_v3(&["sinks", tenant, namespace, name])?;
3142 let form = build_url_config_multipart(url, "sinkConfig", &config)?;
3143 let resp = self
3144 .send(
3145 self.http
3146 .request(Method::POST, endpoint)
3147 .multipart(form)
3148 .timeout(PACKAGE_REGISTER_TIMEOUT),
3149 )
3150 .await?;
3151 empty_ok(resp).await
3152 }
3153
3154 /// Update a sink from a remote package URL.
3155 ///
3156 /// `PUT /admin/v3/sinks/{tenant}/{namespace}/{name}` with the same
3157 /// multipart shape as [`Self::sink_create_with_url`].
3158 /// Java: `SinksBase#updateSink`.
3159 pub async fn sink_update_with_url(
3160 &self,
3161 tenant: &str,
3162 namespace: &str,
3163 name: &str,
3164 url: &str,
3165 config: SinkConfig,
3166 ) -> Result<(), AdminError> {
3167 validate_segment(tenant)?;
3168 validate_segment(namespace)?;
3169 validate_segment(name)?;
3170 let endpoint = self.url_v3(&["sinks", tenant, namespace, name])?;
3171 let form = build_url_config_multipart(url, "sinkConfig", &config)?;
3172 let resp = self
3173 .send(
3174 self.http
3175 .request(Method::PUT, endpoint)
3176 .multipart(form)
3177 .timeout(PACKAGE_REGISTER_TIMEOUT),
3178 )
3179 .await?;
3180 empty_ok(resp).await
3181 }
3182
3183 /// Delete a sink.
3184 ///
3185 /// `DELETE /admin/v3/sinks/{tenant}/{namespace}/{name}`.
3186 /// Java: `SinksBase#deregisterSink`.
3187 pub async fn sink_delete(
3188 &self,
3189 tenant: &str,
3190 namespace: &str,
3191 name: &str,
3192 ) -> Result<(), AdminError> {
3193 validate_segment(tenant)?;
3194 validate_segment(namespace)?;
3195 validate_segment(name)?;
3196 let url = self.url_v3(&["sinks", tenant, namespace, name])?;
3197 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
3198 empty_ok(resp).await
3199 }
3200
3201 /// Start every instance of a sink.
3202 ///
3203 /// `POST /admin/v3/sinks/{tenant}/{namespace}/{name}/start`.
3204 /// Java: `SinksBase#startSink`.
3205 pub async fn sink_start(
3206 &self,
3207 tenant: &str,
3208 namespace: &str,
3209 name: &str,
3210 ) -> Result<(), AdminError> {
3211 validate_segment(tenant)?;
3212 validate_segment(namespace)?;
3213 validate_segment(name)?;
3214 let url = self.url_v3(&["sinks", tenant, namespace, name, "start"])?;
3215 let resp = self.send(self.http.request(Method::POST, url)).await?;
3216 empty_ok(resp).await
3217 }
3218
3219 /// Stop every instance of a sink.
3220 ///
3221 /// `POST /admin/v3/sinks/{tenant}/{namespace}/{name}/stop`.
3222 /// Java: `SinksBase#stopSink`.
3223 pub async fn sink_stop(
3224 &self,
3225 tenant: &str,
3226 namespace: &str,
3227 name: &str,
3228 ) -> Result<(), AdminError> {
3229 validate_segment(tenant)?;
3230 validate_segment(namespace)?;
3231 validate_segment(name)?;
3232 let url = self.url_v3(&["sinks", tenant, namespace, name, "stop"])?;
3233 let resp = self.send(self.http.request(Method::POST, url)).await?;
3234 empty_ok(resp).await
3235 }
3236
3237 /// Restart every instance of a sink.
3238 ///
3239 /// `POST /admin/v3/sinks/{tenant}/{namespace}/{name}/restart`.
3240 /// Java: `SinksBase#restartSink`.
3241 pub async fn sink_restart(
3242 &self,
3243 tenant: &str,
3244 namespace: &str,
3245 name: &str,
3246 ) -> Result<(), AdminError> {
3247 validate_segment(tenant)?;
3248 validate_segment(namespace)?;
3249 validate_segment(name)?;
3250 let url = self.url_v3(&["sinks", tenant, namespace, name, "restart"])?;
3251 let resp = self.send(self.http.request(Method::POST, url)).await?;
3252 empty_ok(resp).await
3253 }
3254
3255 // --- Pulsar Packages (/admin/v3/packages/...) -----------------------
3256
3257 /// List package names declared under (`type`, `tenant`, `namespace`).
3258 ///
3259 /// `GET /admin/v3/packages/{type}/{tenant}/{namespace}`. Returns the
3260 /// list of package names — *not* versions; one entry per declared
3261 /// package. Use [`Self::package_versions_list`] to enumerate the
3262 /// versions of one package.
3263 /// Java: `pulsar-broker/src/main/java/org/apache/pulsar/broker/
3264 /// admin/v3/Packages.java#listPackages`.
3265 pub async fn packages_list(
3266 &self,
3267 pkg_type: PackageType,
3268 tenant: &str,
3269 namespace: &str,
3270 ) -> Result<Vec<String>, AdminError> {
3271 validate_segment(tenant)?;
3272 validate_segment(namespace)?;
3273 let url = self.url_v3(&["packages", pkg_type.as_str(), tenant, namespace])?;
3274 let resp = self.send(self.http.request(Method::GET, url)).await?;
3275 json_ok(resp).await
3276 }
3277
3278 /// List the versions declared for one package.
3279 ///
3280 /// `GET /admin/v3/packages/{type}/{tenant}/{namespace}/{name}`.
3281 /// Returns the list of version strings (Pulsar treats versions as
3282 /// opaque strings — `1.0.0`, `latest`, build hashes — and only the
3283 /// metadata endpoints understand them).
3284 /// Java: `PackagesBase#listPackageVersions`.
3285 pub async fn package_versions_list(
3286 &self,
3287 pkg_type: PackageType,
3288 tenant: &str,
3289 namespace: &str,
3290 name: &str,
3291 ) -> Result<Vec<String>, AdminError> {
3292 validate_segment(tenant)?;
3293 validate_segment(namespace)?;
3294 validate_segment(name)?;
3295 let url = self.url_v3(&["packages", pkg_type.as_str(), tenant, namespace, name])?;
3296 let resp = self.send(self.http.request(Method::GET, url)).await?;
3297 json_ok(resp).await
3298 }
3299
3300 /// Get the metadata envelope for one package version.
3301 ///
3302 /// `GET /admin/v3/packages/{type}/{tenant}/{namespace}/{name}/
3303 /// {version}/metadata`. Returns the `PackageMetadata` envelope as
3304 /// raw JSON for forward-compat — broker minor versions extend the
3305 /// shape with `tags`, `documentationUrl`, etc.
3306 /// Java: `PackagesBase#getPackageMetadata`.
3307 pub async fn package_metadata_get(
3308 &self,
3309 pkg_type: PackageType,
3310 tenant: &str,
3311 namespace: &str,
3312 name: &str,
3313 version: &str,
3314 ) -> Result<serde_json::Value, AdminError> {
3315 validate_segment(tenant)?;
3316 validate_segment(namespace)?;
3317 validate_segment(name)?;
3318 validate_segment(version)?;
3319 let url = self.url_v3(&[
3320 "packages",
3321 pkg_type.as_str(),
3322 tenant,
3323 namespace,
3324 name,
3325 version,
3326 "metadata",
3327 ])?;
3328 let resp = self.send(self.http.request(Method::GET, url)).await?;
3329 json_ok(resp).await
3330 }
3331
3332 /// Replace the metadata envelope for one package version.
3333 ///
3334 /// `PUT /admin/v3/packages/{type}/{tenant}/{namespace}/{name}/
3335 /// {version}/metadata` with a [`PackageMetadata`] JSON body.
3336 /// The broker rejects this verb with 404 if the package version
3337 /// does not exist — `package_metadata_set` is *update*, never
3338 /// *create*. Java: `PackagesBase#updatePackageMetadata`.
3339 pub async fn package_metadata_set(
3340 &self,
3341 pkg_type: PackageType,
3342 tenant: &str,
3343 namespace: &str,
3344 name: &str,
3345 version: &str,
3346 metadata: PackageMetadata,
3347 ) -> Result<(), AdminError> {
3348 validate_segment(tenant)?;
3349 validate_segment(namespace)?;
3350 validate_segment(name)?;
3351 validate_segment(version)?;
3352 let url = self.url_v3(&[
3353 "packages",
3354 pkg_type.as_str(),
3355 tenant,
3356 namespace,
3357 name,
3358 version,
3359 "metadata",
3360 ])?;
3361 let resp = self
3362 .send(self.http.request(Method::PUT, url).json(&metadata))
3363 .await?;
3364 empty_ok(resp).await
3365 }
3366
3367 /// Delete one package version.
3368 ///
3369 /// `DELETE /admin/v3/packages/{type}/{tenant}/{namespace}/{name}/
3370 /// {version}`. The broker drops the version's metadata + storage
3371 /// atomically; other versions of the same package are untouched.
3372 /// Java: `PackagesBase#deletePackage`.
3373 pub async fn package_delete(
3374 &self,
3375 pkg_type: PackageType,
3376 tenant: &str,
3377 namespace: &str,
3378 name: &str,
3379 version: &str,
3380 ) -> Result<(), AdminError> {
3381 validate_segment(tenant)?;
3382 validate_segment(namespace)?;
3383 validate_segment(name)?;
3384 validate_segment(version)?;
3385 let url = self.url_v3(&[
3386 "packages",
3387 pkg_type.as_str(),
3388 tenant,
3389 namespace,
3390 name,
3391 version,
3392 ])?;
3393 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
3394 empty_ok(resp).await
3395 }
3396
3397 // --- Internal --------------------------------------------------------
3398
3399 /// Build a request URL by joining `segments` onto `base_url`. Each segment
3400 /// is percent-encoded for the URL path.
3401 fn url(&self, segments: &[&str]) -> Result<Url, AdminError> {
3402 Self::url_for(&self.base_url, segments)
3403 }
3404
3405 /// Build a request URL by joining `segments` onto the `/admin/v3/`
3406 /// base. Used by Pulsar Functions / IO Sources / IO Sinks /
3407 /// Packages, which Pulsar moved off of `/admin/v2/` historically.
3408 fn url_v3(&self, segments: &[&str]) -> Result<Url, AdminError> {
3409 Self::url_for(&self.base_url_v3, segments)
3410 }
3411
3412 /// Shared URL-builder body for the v2 / v3 helpers.
3413 fn url_for(base: &Url, segments: &[&str]) -> Result<Url, AdminError> {
3414 let mut url = base.clone();
3415 {
3416 // `Url::path_segments_mut` only fails for cannot-be-a-base URLs;
3417 // builder already rejected those.
3418 let mut path = url
3419 .path_segments_mut()
3420 .map_err(|()| AdminError::Builder("base url is cannot-be-a-base".into()))?;
3421 // Both `base_url` (anchored at `/admin/v2/`) and `base_url_v3`
3422 // (`/admin/v3/`) carry a trailing slash, which produces a
3423 // sentinel empty trailing segment in `path_segments_mut`. Drop
3424 // it before appending API segments — otherwise pushes land
3425 // after the empty, producing `/admin/v2//persistent/...`. Real
3426 // brokers tolerate the double slash; strict mocks (wiremock)
3427 // do not, and Java's `PulsarAdmin` emits the single-slash
3428 // form.
3429 path.pop_if_empty();
3430 for segment in segments {
3431 path.push(segment);
3432 }
3433 }
3434 Ok(url)
3435 }
3436
3437 /// Apply auth headers and dispatch.
3438 ///
3439 /// Returns an [`ApiResponse`] carrying the response alongside the
3440 /// resolved method + URL, so the decode helpers can attribute a
3441 /// non-JSON body to the exact request that produced it
3442 /// (see [`AdminError::Decode`] / [`AdminError::Status`]).
3443 async fn send(&self, req: RequestBuilder) -> Result<ApiResponse, AdminError> {
3444 let req = match &self.auth {
3445 AdminAuth::None => req,
3446 AdminAuth::Token(tok) => bearer(req, tok)?,
3447 AdminAuth::OAuth2(flow) => {
3448 // Refresh the cached token if it is missing or near expiry,
3449 // then attach it. `ensure_fresh` is a no-op when the cached
3450 // token is still valid, so the steady-state path is a single
3451 // mutex read.
3452 flow.ensure_fresh()
3453 .await
3454 .map_err(|err| AdminError::Auth(format!("oauth2 token refresh: {err}")))?;
3455 // A cache miss (None) and an IDP that handed back an empty
3456 // `access_token` (Some(empty)) are both "no usable token" —
3457 // collapse them into one guard.
3458 let token = flow
3459 .cached_access_token()
3460 .filter(|t| !t.is_empty())
3461 .ok_or_else(|| {
3462 AdminError::Auth("oauth2 returned an empty access token".to_owned())
3463 })?;
3464 // The access token is base64url JWT text — valid UTF-8 — but
3465 // guard the conversion rather than assume it.
3466 let tok = std::str::from_utf8(&token).map_err(|err| {
3467 AdminError::Auth(format!("oauth2 access token is not valid utf-8: {err}"))
3468 })?;
3469 bearer(req, tok)?
3470 }
3471 };
3472 // Build the request so the resolved method + URL are captured for
3473 // diagnostics, then execute on the shared client. This is
3474 // behaviorally identical to `req.send()`.
3475 let request = req.build()?;
3476 let method = request.method().clone();
3477 let url = request.url().clone();
3478 let resp = self.http.execute(request).await?;
3479 Ok(ApiResponse { method, url, resp })
3480 }
3481}
3482
3483/// A dispatched response paired with the request's resolved method + URL.
3484///
3485/// Threaded from [`AdminClient::send`] into the decode helpers so a
3486/// decode / status failure can name the exact request that produced it
3487/// (see [`AdminError::Decode`] and [`AdminError::Status`]).
3488struct ApiResponse {
3489 method: Method,
3490 url: Url,
3491 resp: Response,
3492}
3493
3494/// Pulsar Functions configuration — the subset of Java's
3495/// `org.apache.pulsar.common.functions.FunctionConfig` that the URL-based
3496/// `register` / `update` calls actually require. The Java type carries
3497/// ~30 fields (process / k8s runtime tuning, secrets, deadletter
3498/// topics, …); we expose the load-bearing ones an operator running a
3499/// pre-compiled JAR needs. Unknown fields on the wire are tolerated by
3500/// the broker, so adding extra knobs is an additive change later.
3501///
3502/// `tenant` / `namespace` / `name` are duplicated in the body because
3503/// the broker re-validates them against the URL path (Java's
3504/// `WorkerUtils.validateFunctionName`). The CLI fills these in from the
3505/// fully qualified name the operator passes on the command line.
3506///
3507/// `runtime` is the string `"JAVA"`, `"PYTHON"`, or `"GO"` — matches
3508/// Java's `org.apache.pulsar.common.functions.FunctionConfig.Runtime`
3509/// enum serialised by name.
3510#[derive(Debug, Clone, Default, Deserialize, Serialize)]
3511#[serde(default, rename_all = "camelCase")]
3512pub struct FunctionConfig {
3513 /// Owning tenant.
3514 pub tenant: String,
3515 /// Owning namespace (bare name, not `tenant/namespace`).
3516 pub namespace: String,
3517 /// Function name (unique within the namespace).
3518 pub name: String,
3519 /// Fully qualified entry-point class name (`com.acme.MyFunction`
3520 /// for Java; `module.fn` for Python; `main` for Go).
3521 #[serde(rename = "className")]
3522 pub class_name: String,
3523 /// Input topics the function subscribes to.
3524 pub inputs: Vec<String>,
3525 /// Output topic the function produces to. Empty when the function
3526 /// has no output sink.
3527 pub output: String,
3528 /// Runtime — `"JAVA"`, `"PYTHON"`, or `"GO"`.
3529 pub runtime: String,
3530 /// Number of parallel instances the worker schedules.
3531 pub parallelism: i32,
3532 /// Optional opaque user-config map passed to the function's
3533 /// `Context#getUserConfigValue`. JSON object on the wire.
3534 #[serde(rename = "userConfig", skip_serializing_if = "Option::is_none")]
3535 pub user_config: Option<serde_json::Value>,
3536}
3537
3538/// Tenant policy info — admin roles and allowed clusters.
3539///
3540/// Mirrors Java's `org.apache.pulsar.common.policies.data.TenantInfoImpl` —
3541/// the JSON keys (`adminRoles`, `allowedClusters`) match upstream verbatim.
3542#[derive(Debug, Clone, Default, Serialize, Deserialize)]
3543pub struct TenantInfo {
3544 /// Roles permitted to administrate the tenant.
3545 #[serde(rename = "adminRoles")]
3546 pub admin_roles: Vec<String>,
3547 /// Cluster names the tenant may use.
3548 #[serde(rename = "allowedClusters")]
3549 pub allowed_clusters: Vec<String>,
3550}
3551
3552/// Wire shape of the PIP-415 `getMessageIdByIndex` response.
3553///
3554/// Mirrors Java's `MessageIdImpl` JSON shape (Jackson default property-name
3555/// serialisation): `{ledgerId, entryId, partitionIndex}`. See
3556/// `pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java`.
3557///
3558/// Kept as a deserialise-only DTO and converted into
3559/// [`magnetar_proto::MessageId`] at the boundary so callers do not see this
3560/// wire detail. Not exposed publicly.
3561#[derive(Debug, Clone, Deserialize)]
3562#[serde(rename_all = "camelCase")]
3563struct MessageIdResponse {
3564 ledger_id: i64,
3565 entry_id: i64,
3566 #[serde(default = "default_partition_index")]
3567 partition_index: i32,
3568}
3569
3570fn default_partition_index() -> i32 {
3571 -1
3572}
3573
3574impl MessageIdResponse {
3575 /// Convert the REST response into the canonical [`MessageId`]. The broker
3576 /// resolves at entry granularity, so `batch_index` / `batch_size` are not
3577 /// part of the JSON — they default to `-1` (the same sentinel
3578 /// `MessageId::from_pb` uses for `MessageIdData` without batch fields).
3579 ///
3580 /// Returns `AdminError::Protocol` if the broker emits a negative
3581 /// `ledgerId` or `entryId` — both fields are `u64` in the canonical type
3582 /// (matching the proto wire format) and Java's `MessageIdImpl` cannot
3583 /// represent negative values either, so a negative wire value is a
3584 /// broker bug we must surface rather than silently wrap.
3585 fn try_into_message_id(self) -> Result<MessageId, AdminError> {
3586 let ledger_id = u64::try_from(self.ledger_id).map_err(|_| {
3587 AdminError::Protocol(format!("negative ledgerId from broker: {}", self.ledger_id))
3588 })?;
3589 let entry_id = u64::try_from(self.entry_id).map_err(|_| {
3590 AdminError::Protocol(format!("negative entryId from broker: {}", self.entry_id))
3591 })?;
3592 Ok(MessageId {
3593 ledger_id,
3594 entry_id,
3595 partition: self.partition_index,
3596 batch_index: -1,
3597 batch_size: -1,
3598 // PIP-460 (ADR-0031): admin REST never resolves a scalable
3599 // segment id; the field only exists under `scalable-topics`.
3600 #[cfg(feature = "scalable-topics")]
3601 segment_id: None,
3602 })
3603 }
3604}
3605
3606/// Topic stats. Intentionally permissive: the Java
3607/// `PersistentTopicStatsImpl` shape is large and shifts between releases;
3608/// we extract the high-signal rates, throughput, sizes, and counters and
3609/// pass the rest through as raw JSON.
3610///
3611/// All scalar fields default to `0` / `0.0` when the broker omits them, so a
3612/// release that drops or renames one decodes cleanly instead of failing the
3613/// whole stats call.
3614#[derive(Debug, Clone, Default, Deserialize)]
3615#[serde(default)]
3616pub struct TopicStats {
3617 /// Current publish rate into the topic, in messages/sec, averaged over
3618 /// the broker's most recent stats window. Java: `PersistentTopicStats#msgRateIn`.
3619 #[serde(rename = "msgRateIn")]
3620 pub msg_rate_in: f64,
3621 /// Current dispatch rate out of the topic, in messages/sec.
3622 /// Java: `PersistentTopicStats#msgRateOut`.
3623 #[serde(rename = "msgRateOut")]
3624 pub msg_rate_out: f64,
3625 /// Current publish throughput into the topic, in bytes/sec.
3626 /// Java: `PersistentTopicStats#msgThroughputIn`.
3627 #[serde(rename = "msgThroughputIn")]
3628 pub msg_throughput_in: f64,
3629 /// Current dispatch throughput out of the topic, in bytes/sec.
3630 /// Java: `PersistentTopicStats#msgThroughputOut`.
3631 #[serde(rename = "msgThroughputOut")]
3632 pub msg_throughput_out: f64,
3633 /// Average message size, in bytes, over the most recent stats window.
3634 /// Java: `PersistentTopicStats#averageMsgSize`.
3635 #[serde(rename = "averageMsgSize")]
3636 pub average_msg_size: f64,
3637 /// Total messages received.
3638 #[serde(rename = "msgInCounter")]
3639 pub msg_in_counter: i64,
3640 /// Total bytes received.
3641 #[serde(rename = "bytesInCounter")]
3642 pub bytes_in_counter: i64,
3643 /// Total storage used by the topic's managed ledger, in bytes (includes
3644 /// replicas). Java: `PersistentTopicStats#storageSize`.
3645 #[serde(rename = "storageSize")]
3646 pub storage_size: i64,
3647 /// Current backlog size across all subscriptions, in bytes.
3648 /// Java: `PersistentTopicStats#backlogSize`.
3649 #[serde(rename = "backlogSize")]
3650 pub backlog_size: i64,
3651 /// Publishers, raw JSON because the schema is large and version-dependent.
3652 pub publishers: Vec<serde_json::Value>,
3653 /// Subscriptions map (raw JSON).
3654 pub subscriptions: serde_json::Value,
3655}
3656
3657/// Partitioned-topic metadata, as returned by
3658/// `GET /admin/v2/persistent/{tenant}/{namespace}/{topic}/partitions`.
3659/// Java: `org.apache.pulsar.common.partition.PartitionedTopicMetadata`.
3660/// Only the partition count is consumed; broker-side extensions are ignored.
3661#[derive(Debug, Clone, Default, Deserialize)]
3662#[serde(default)]
3663struct PartitionedTopicMetadata {
3664 partitions: u32,
3665}
3666
3667/// Java `RetentionPolicies` — namespace-level retention policy. `-1` for
3668/// either dimension means infinite. The broker applies whichever quota
3669/// becomes binding first (time OR size).
3670#[derive(Debug, Clone, Copy, Default, Deserialize, Serialize)]
3671#[serde(default, rename_all = "camelCase")]
3672pub struct RetentionPolicies {
3673 /// Maximum retention time in minutes. `-1` = infinite, `0` = none.
3674 pub retention_time_in_minutes: i32,
3675 /// Maximum retention size in megabytes. `-1` = infinite, `0` = none.
3676 #[serde(rename = "retentionSizeInMB")]
3677 pub retention_size_in_mb: i64,
3678}
3679
3680/// Java `PersistencePolicies` — namespace-level `BookKeeper` layout +
3681/// managed-ledger write-shaping knobs. Maps to the broker's
3682/// `org.apache.pulsar.common.policies.data.PersistencePolicies`.
3683///
3684/// `Default::default()` returns the broker's documented default for a
3685/// new namespace (`2/2/2/0.0`), NOT all zeros — the broker rejects
3686/// ensemble values < 1 on `set`, so an all-zero policy is unusable
3687/// for a round-trip. Missing fields on decode default the same way:
3688/// a partial body where the broker omits one field round-trips with
3689/// the legal default, never with the illegal `0`.
3690#[derive(Debug, Clone, Copy, Deserialize, Serialize)]
3691#[serde(rename_all = "camelCase")]
3692pub struct PersistencePolicies {
3693 /// `BookKeeper` ensemble size — the number of bookies the managed
3694 /// ledger striping is spread across. Default: 2.
3695 #[serde(default = "default_bookkeeper_quorum")]
3696 pub bookkeeper_ensemble: i32,
3697 /// `BookKeeper` write quorum — the number of bookies each entry is
3698 /// written to. Default: 2.
3699 #[serde(default = "default_bookkeeper_quorum")]
3700 pub bookkeeper_write_quorum: i32,
3701 /// `BookKeeper` ack quorum — the number of acks required before an
3702 /// add is considered durable. Default: 2.
3703 #[serde(default = "default_bookkeeper_quorum")]
3704 pub bookkeeper_ack_quorum: i32,
3705 /// Managed-ledger mark-delete-rate cap (ops/sec). `0.0` disables
3706 /// the throttle. Default: 0.0 (disabled).
3707 #[serde(default)]
3708 pub managed_ledger_max_mark_delete_rate: f64,
3709}
3710
3711impl Default for PersistencePolicies {
3712 fn default() -> Self {
3713 Self {
3714 bookkeeper_ensemble: 2,
3715 bookkeeper_write_quorum: 2,
3716 bookkeeper_ack_quorum: 2,
3717 managed_ledger_max_mark_delete_rate: 0.0,
3718 }
3719 }
3720}
3721
3722#[inline]
3723fn default_bookkeeper_quorum() -> i32 {
3724 2
3725}
3726
3727/// Java `DispatchRate` — a sliding-window throttle (msg/sec + byte/sec
3728/// over a `ratePeriodInSecond` window). Shared shape between the
3729/// per-namespace consumer dispatch rate, the per-subscription dispatch
3730/// rate, and the cross-cluster replicator dispatch rate.
3731///
3732/// `-1` on either rate dimension disables that axis of the throttle —
3733/// missing fields default to `-1` (not `0`) so a broker-omitted
3734/// dimension round-trips as "no throttle", never as "throttle to
3735/// zero" (which would block consumer dispatch on the namespace).
3736#[derive(Debug, Clone, Copy, Deserialize, Serialize)]
3737#[serde(rename_all = "camelCase")]
3738pub struct DispatchRate {
3739 /// Throttle in messages/sec. `-1` = unlimited.
3740 #[serde(default = "neg_one_i32")]
3741 pub dispatch_throttling_rate_in_msg: i32,
3742 /// Throttle in bytes/sec. `-1` = unlimited.
3743 #[serde(default = "neg_one_i64")]
3744 pub dispatch_throttling_rate_in_byte: i64,
3745 /// Window size in seconds the throttle averages over.
3746 #[serde(default = "default_rate_period_seconds")]
3747 pub rate_period_in_second: i32,
3748 /// If `true`, dispatch rate is interpreted as an addend on top of
3749 /// the namespace publish rate rather than an absolute cap.
3750 #[serde(default)]
3751 pub relative_to_publish_rate: bool,
3752}
3753
3754impl Default for DispatchRate {
3755 fn default() -> Self {
3756 Self {
3757 dispatch_throttling_rate_in_msg: -1,
3758 dispatch_throttling_rate_in_byte: -1,
3759 rate_period_in_second: 1,
3760 relative_to_publish_rate: false,
3761 }
3762 }
3763}
3764
3765/// Java `PublishRate` — producer-side throttle (msg/sec + byte/sec).
3766/// `-1` on either dimension disables that axis of the throttle.
3767/// Missing fields default to `-1` (not `0`) — same sentinel semantics
3768/// as [`DispatchRate`]. Unlike `DispatchRate`, there is no
3769/// rate-period field; the broker uses a fixed 1-second window.
3770#[derive(Debug, Clone, Copy, Deserialize, Serialize)]
3771#[serde(rename_all = "camelCase")]
3772pub struct PublishRate {
3773 /// Throttle in messages/sec. `-1` = unlimited.
3774 #[serde(default = "neg_one_i32")]
3775 pub publish_throttling_rate_in_msg: i32,
3776 /// Throttle in bytes/sec. `-1` = unlimited.
3777 #[serde(default = "neg_one_i64")]
3778 pub publish_throttling_rate_in_byte: i64,
3779}
3780
3781impl Default for PublishRate {
3782 fn default() -> Self {
3783 Self {
3784 publish_throttling_rate_in_msg: -1,
3785 publish_throttling_rate_in_byte: -1,
3786 }
3787 }
3788}
3789
3790#[inline]
3791fn neg_one_i32() -> i32 {
3792 -1
3793}
3794#[inline]
3795fn neg_one_i64() -> i64 {
3796 -1
3797}
3798#[inline]
3799fn default_rate_period_seconds() -> i32 {
3800 1
3801}
3802
3803/// Java `DelayedDeliveryPolicies` — namespace-level switch + index-tick
3804/// granularity for the broker's delayed-message delivery tracker.
3805/// Maps to `org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies`.
3806/// `tick_time` controls how often the broker's delay-index buckets are
3807/// re-evaluated; smaller values give tighter delivery accuracy at a
3808/// higher tracker cost.
3809///
3810/// The Java field name is `tickTime` (carrying a `@JsonProperty("tickTime")`
3811/// annotation), **not** `tickTimeMillis`. The unit is documented as
3812/// milliseconds (see the upstream class doc), but the wire key drops
3813/// the unit suffix — Jackson on the broker only binds the literal
3814/// `tickTime`.
3815#[derive(Debug, Clone, Copy, Default, Deserialize, Serialize)]
3816#[serde(default, rename_all = "camelCase")]
3817pub struct DelayedDeliveryPolicies {
3818 /// Whether delayed delivery is enabled for the namespace.
3819 pub active: bool,
3820 /// Index-tick granularity in milliseconds. Wire key `tickTime`.
3821 #[serde(rename = "tickTime")]
3822 pub tick_time_millis: i64,
3823}
3824
3825/// Java `BacklogQuota` — one entry in the namespace-level backlog quota
3826/// map. `policy` is a string (`producer_request_hold`,
3827/// `producer_exception`, `consumer_backlog_eviction`) rather than a
3828/// closed Rust enum so new broker enum values forward-decode cleanly.
3829///
3830/// `-1` on either limit dimension disables that axis — missing fields
3831/// default to `-1` (not `0`) so a broker-omitted dimension round-trips
3832/// as "no quota", never as "expire-everything" or "block-everything".
3833#[derive(Debug, Clone, Deserialize, Serialize)]
3834#[serde(rename_all = "camelCase")]
3835pub struct BacklogQuota {
3836 /// Maximum allowed backlog in bytes (when type=`destination_storage`).
3837 /// `-1` = unlimited.
3838 #[serde(default = "neg_one_i64")]
3839 pub limit_size: i64,
3840 /// Maximum allowed backlog age in seconds (when type=`message_age`).
3841 /// `-1` = unlimited.
3842 #[serde(default = "neg_one_i32")]
3843 pub limit_time: i32,
3844 /// Action when the quota is exceeded.
3845 #[serde(default)]
3846 pub policy: String,
3847}
3848
3849impl Default for BacklogQuota {
3850 fn default() -> Self {
3851 Self {
3852 limit_size: -1,
3853 limit_time: -1,
3854 policy: String::new(),
3855 }
3856 }
3857}
3858
3859/// Java `BookieInfo` — a single bookie's rack assignment, as stored in
3860/// the `racks-info` metadata path and shipped on
3861/// [`AdminClient::bookies_set_rack`]. Field names are camelCase on the
3862/// wire (matching `org.apache.pulsar.common.policies.data.BookieInfo`,
3863/// which carries only `rack` and `hostname`).
3864///
3865/// The placement group is **not** part of this body — Pulsar's
3866/// `BookiesBase#updateBookieRackInfo` exposes `group` as a
3867/// `@QueryParam`, and the JSON body Jackson-binds only to
3868/// `{rack, hostname}`. Treating it as a body field silently drops the
3869/// operator's group choice on the wire.
3870#[derive(Debug, Clone, Default, Deserialize, Serialize)]
3871#[serde(default, rename_all = "camelCase")]
3872pub struct BookieInfo {
3873 /// Rack identifier within the group — opaque to the broker, only
3874 /// the placement policy cares about it.
3875 pub rack: String,
3876 /// Resolved hostname for the bookie. The broker uses it for
3877 /// log lines; it does not have to match DNS.
3878 pub hostname: String,
3879}
3880
3881/// Java `PostSchemaPayload` — the request body for
3882/// [`AdminClient::schema_post`] and
3883/// [`AdminClient::schema_compatibility_check`]. The Java DTO has
3884/// (`type`, `schema`, `properties`); both keys travel as-is on the wire.
3885/// `schema` is the canonical-form blob for AVRO / JSON / PROTOBUF and
3886/// the protobuf descriptor for `PROTOBUF_NATIVE`.
3887#[derive(Debug, Clone, Default, Deserialize, Serialize)]
3888#[serde(default)]
3889pub struct PostSchemaPayload {
3890 /// Schema type (`AVRO` / `JSON` / `PROTOBUF` / `PROTOBUF_NATIVE` /
3891 /// `KEY_VALUE` / `STRING` / `BYTES` / ...).
3892 #[serde(rename = "type")]
3893 pub schema_type: String,
3894 /// Schema definition, encoded per the type axis.
3895 pub schema: String,
3896 /// User-defined per-schema properties.
3897 pub properties: std::collections::HashMap<String, String>,
3898}
3899
3900/// Java `SourceConfig` — declarative description of a Pulsar IO
3901/// Source. Mirrors `org.apache.pulsar.common.io.SourceConfig` (Jackson
3902/// camelCase on the wire). Only the fields the JAX-RS `create` /
3903/// `update` paths require are typed; per-connector knobs ride along in
3904/// the open-ended `configs` map so a forward broker can add fields
3905/// without a magnetar release.
3906#[derive(Debug, Clone, Default, Deserialize, Serialize)]
3907#[serde(default, rename_all = "camelCase")]
3908pub struct SourceConfig {
3909 /// Tenant owning the source. Must match the URL path tenant.
3910 pub tenant: String,
3911 /// Namespace owning the source. Must match the URL path namespace.
3912 pub namespace: String,
3913 /// Source name. Must match the URL path name.
3914 pub name: String,
3915 /// Fully-qualified connector class (e.g.
3916 /// `org.apache.pulsar.io.kafka.KafkaSource`).
3917 pub class_name: String,
3918 /// Destination topic the source writes to.
3919 pub topic_name: String,
3920 /// Number of source instances to schedule.
3921 pub parallelism: i32,
3922 /// Connector-specific configuration map. Skipped from JSON when
3923 /// `None` so a `null` does not override the broker default.
3924 #[serde(skip_serializing_if = "Option::is_none")]
3925 pub configs: Option<serde_json::Value>,
3926}
3927
3928/// Java `SinkConfig` — declarative description of a Pulsar IO Sink.
3929/// Mirrors `org.apache.pulsar.common.io.SinkConfig` (Jackson camelCase
3930/// on the wire). The `inputs` slot is the list of source topics the
3931/// sink reads from — the broker accepts either fully-qualified topic
3932/// names or `tenant/namespace/topic` shorthand. Per-connector knobs
3933/// ride in `configs` for the same forward-compat reason as
3934/// [`SourceConfig`].
3935#[derive(Debug, Clone, Default, Deserialize, Serialize)]
3936#[serde(default, rename_all = "camelCase")]
3937pub struct SinkConfig {
3938 /// Tenant owning the sink. Must match the URL path tenant.
3939 pub tenant: String,
3940 /// Namespace owning the sink. Must match the URL path namespace.
3941 pub namespace: String,
3942 /// Sink name. Must match the URL path name.
3943 pub name: String,
3944 /// Fully-qualified connector class (e.g.
3945 /// `org.apache.pulsar.io.jdbc.PostgresJdbcAutoSchemaSink`).
3946 pub class_name: String,
3947 /// Source topics the sink subscribes to.
3948 pub inputs: Vec<String>,
3949 /// Number of sink instances to schedule.
3950 pub parallelism: i32,
3951 /// Connector-specific configuration map. Skipped from JSON when
3952 /// `None` so a `null` does not override the broker default.
3953 #[serde(skip_serializing_if = "Option::is_none")]
3954 pub configs: Option<serde_json::Value>,
3955}
3956
3957/// Pulsar Packages namespace dimension — the `{type}` segment of the
3958/// `/admin/v3/packages/{type}/...` URL. Maps to upstream's
3959/// `PackageType` enum
3960/// (`pulsar-packages-management/pulsar-packages-management-core/.../
3961/// PackageType.java`): the broker only accepts the three lowercase
3962/// tokens `function`, `source`, `sink` and rejects everything else
3963/// with 400. Modelled as a closed Rust enum so the URL builder
3964/// cannot emit a value the broker will refuse.
3965#[derive(Debug, Clone, Copy, PartialEq, Eq)]
3966pub enum PackageType {
3967 /// `function` — Pulsar Functions JAR.
3968 Function,
3969 /// `source` — Pulsar IO Source NAR.
3970 Source,
3971 /// `sink` — Pulsar IO Sink NAR.
3972 Sink,
3973}
3974
3975impl PackageType {
3976 /// Render as the lowercase token the broker URL surface expects.
3977 #[must_use]
3978 pub fn as_str(self) -> &'static str {
3979 match self {
3980 Self::Function => "function",
3981 Self::Source => "source",
3982 Self::Sink => "sink",
3983 }
3984 }
3985}
3986
3987impl std::str::FromStr for PackageType {
3988 type Err = AdminError;
3989
3990 /// Parse from the lowercase tokens the broker emits (`function` /
3991 /// `source` / `sink`). Hyphenated aliases are accepted to make the
3992 /// CLI feel idiomatic — `package-type=source` vs the broker's
3993 /// `source` are equivalent.
3994 fn from_str(s: &str) -> Result<Self, AdminError> {
3995 match s.to_ascii_lowercase().as_str() {
3996 "function" | "functions" => Ok(Self::Function),
3997 "source" | "sources" => Ok(Self::Source),
3998 "sink" | "sinks" => Ok(Self::Sink),
3999 other => Err(AdminError::InvalidName(format!(
4000 "unknown package type {other:?} (expected: function | source | sink)"
4001 ))),
4002 }
4003 }
4004}
4005
4006/// Java `PackageMetadata` — the metadata envelope Pulsar Packages
4007/// attaches to each `(type, tenant, namespace, name, version)` tuple.
4008/// Mirrors `org.apache.pulsar.packages.management.core.common.PackageMetadata`
4009/// (Jackson camelCase on the wire). `modification_time` is a
4010/// broker-side timestamp in milliseconds-since-epoch — the broker
4011/// emits it on `GET` and ignores caller-supplied values on `PUT`
4012/// (overwriting them with the receive timestamp).
4013#[derive(Debug, Clone, Default, Deserialize, Serialize)]
4014#[serde(default, rename_all = "camelCase")]
4015pub struct PackageMetadata {
4016 /// Free-form package description.
4017 pub description: String,
4018 /// Maintainer contact (typically an email or team handle).
4019 pub contact: String,
4020 /// Last-modification timestamp in ms-since-epoch. Read-only for
4021 /// callers — the broker overwrites the value on `PUT`.
4022 pub modification_time: i64,
4023 /// Arbitrary key/value labels (release notes, CI ids, etc.).
4024 pub properties: std::collections::HashMap<String, String>,
4025}
4026
4027/// Java `BacklogQuotaType` — selects which dimension a `BacklogQuota`
4028/// entry limits.
4029#[derive(Debug, Clone, Copy, PartialEq, Eq)]
4030pub enum BacklogQuotaType {
4031 /// Bytes-on-disk dimension. Uses `BacklogQuota::limit_size`.
4032 DestinationStorage,
4033 /// Message-age dimension. Uses `BacklogQuota::limit_time`.
4034 MessageAge,
4035}
4036
4037impl BacklogQuotaType {
4038 /// Render as the lowercase `snake_case` value the broker REST surface
4039 /// expects in the `backlogQuotaType` query parameter.
4040 #[must_use]
4041 pub fn as_query_value(self) -> &'static str {
4042 match self {
4043 Self::DestinationStorage => "destination_storage",
4044 Self::MessageAge => "message_age",
4045 }
4046 }
4047}
4048
4049/// Java `LongRunningProcessStatus` — the polling shape for triggered
4050/// background jobs (compaction, offload). The broker returns one of four
4051/// `status` values: `NOT_RUN` (never triggered), `RUNNING`, `SUCCESS`,
4052/// `ERROR`. `last_error` is populated only on `ERROR`.
4053#[derive(Debug, Clone, Default, Deserialize, Serialize)]
4054#[serde(default, rename_all = "camelCase")]
4055pub struct LongRunningProcessStatus {
4056 /// Job state — `NOT_RUN`, `RUNNING`, `SUCCESS`, or `ERROR`.
4057 pub status: String,
4058 /// Human-readable error message, present on `ERROR`.
4059 pub last_error: String,
4060}
4061
4062/// Request body for `POST .../subscription/{sub}/resetcursor` (Java
4063/// `ResetCursorData`). The CLI exposes `message_id` and `is_excluded`;
4064/// Pulsar's `batchIndexes` / `properties` fields are not currently set
4065/// — they exist for transactional dedup metadata and would require
4066/// txn-aware callers anyway.
4067///
4068/// `ledger_id` / `entry_id` are `i64` on the wire because Pulsar
4069/// Jackson-binds them to Java `long`. `MessageId` on the Rust side uses
4070/// `u64` (matching the wire-protocol envelope), with `u64::MAX` as the
4071/// `EARLIEST` / `LATEST` sentinel — the conversion below maps those
4072/// sentinels to `-1` (the Java sentinel) so a reset-to-earliest /
4073/// reset-to-latest doesn't overflow Java's `long` parser.
4074#[derive(Debug, Serialize)]
4075#[serde(rename_all = "camelCase")]
4076struct ResetCursorData {
4077 ledger_id: i64,
4078 entry_id: i64,
4079 partition_index: i32,
4080 batch_index: i32,
4081 #[serde(rename = "isExcluded")]
4082 is_excluded: bool,
4083}
4084
4085/// Map a `MessageId` u64 ledger/entry id onto Pulsar's Java-`long`
4086/// wire field, translating the Rust-side `u64::MAX` sentinel (used by
4087/// `MessageId::EARLIEST` / `LATEST`) to Java's `-1` sentinel.
4088/// Non-sentinel values are passed through verbatim — Pulsar's
4089/// `LedgerHandle` / `EntryId` indices fit in `i64::MAX` long before
4090/// overflowing.
4091#[inline]
4092fn message_id_field_for_wire(value: u64) -> i64 {
4093 if value == u64::MAX {
4094 -1
4095 } else {
4096 // Pulsar entry indices fit in i63 — `as i64` cannot overflow
4097 // for any legitimate broker-emitted value.
4098 value as i64
4099 }
4100}
4101
4102/// Builder for [`AdminClient`].
4103#[derive(Debug, Default)]
4104pub struct AdminClientBuilder {
4105 base_url: Option<Url>,
4106 auth: AdminAuth,
4107 timeout: Option<Duration>,
4108 /// Extra CA root, PEM-encoded, added to reqwest's trust store. Mirrors
4109 /// pulsarctl's `tls_trust_certs_file_path`.
4110 tls_trust_cert_pem: Option<Vec<u8>>,
4111 /// Disable certificate verification. Mirrors pulsarctl's
4112 /// `tls_allow_insecure_connection`. **Insecure** — defeats MITM
4113 /// protection; only for self-signed dev brokers.
4114 tls_allow_insecure: bool,
4115}
4116
4117impl AdminClientBuilder {
4118 /// Set the service URL — the base for `/admin/v2/...`. Required.
4119 #[must_use]
4120 pub fn service_url(mut self, url: Url) -> Self {
4121 self.base_url = Some(url);
4122 self
4123 }
4124
4125 /// Configure bearer-token auth (`Authorization: Bearer <token>`).
4126 #[must_use]
4127 pub fn token(mut self, token: String) -> Self {
4128 self.auth = AdminAuth::Token(token);
4129 self
4130 }
4131
4132 /// Configure `OAuth2` `client_credentials` auth. The shared flow's token
4133 /// cache is refreshed on demand at request time (see [`AdminAuth::OAuth2`]).
4134 #[must_use]
4135 pub fn oauth2(mut self, flow: Arc<ClientCredentialsFlow>) -> Self {
4136 self.auth = AdminAuth::OAuth2(flow);
4137 self
4138 }
4139
4140 /// Add a custom CA root (PEM bytes) to the HTTPS trust store. Mirrors
4141 /// pulsarctl's `tls_trust_certs_file_path`. The CLI reads the file and
4142 /// passes its bytes here.
4143 #[must_use]
4144 pub fn tls_trust_cert_pem(mut self, pem: Vec<u8>) -> Self {
4145 self.tls_trust_cert_pem = Some(pem);
4146 self
4147 }
4148
4149 /// Disable TLS certificate verification (`danger_accept_invalid_certs`).
4150 /// Mirrors pulsarctl's `tls_allow_insecure_connection`. **Insecure** —
4151 /// only for self-signed dev brokers; never in production.
4152 #[must_use]
4153 pub fn tls_allow_insecure(mut self, allow: bool) -> Self {
4154 self.tls_allow_insecure = allow;
4155 self
4156 }
4157
4158 /// Override the request timeout. Defaults to [`DEFAULT_TIMEOUT`].
4159 #[must_use]
4160 pub fn timeout(mut self, dur: Duration) -> Self {
4161 self.timeout = Some(dur);
4162 self
4163 }
4164
4165 /// Build the client.
4166 pub fn build(self) -> Result<AdminClient, AdminError> {
4167 let base_url = self
4168 .base_url
4169 .ok_or_else(|| AdminError::Builder("service_url is required".into()))?;
4170 if base_url.cannot_be_a_base() {
4171 return Err(AdminError::Builder(format!(
4172 "service_url cannot be a base url: {base_url}"
4173 )));
4174 }
4175 // Anchor every V2 API call below `/admin/v2/` and every V3 call
4176 // below `/admin/v3/`. We append the suffix here so callers pass
4177 // plain `http://broker:8080` rather than baking either prefix in.
4178 //
4179 // `Url::join` follows WHATWG semantics: if `base_url` has no
4180 // trailing slash, its last path segment is replaceable. So
4181 // `http://broker/pulsar` + `admin/v2/` would yield
4182 // `http://broker/admin/v2/` — the `pulsar` prefix silently
4183 // dropped (common for path-prefixed K8s ingresses). Normalise
4184 // the base to end with `/` first so the join always appends.
4185 let base_url = {
4186 let mut b = base_url.clone();
4187 if !b.path().ends_with('/') {
4188 b.set_path(&format!("{}/", b.path()));
4189 }
4190 b
4191 };
4192 let base_url_v3 = base_url.join("admin/v3/")?;
4193 let base_url = base_url.join("admin/v2/")?;
4194
4195 // reqwest 0.13 panics in `Client::builder().build()` when the active
4196 // `rustls` flavor is `rustls-no-provider` and no global
4197 // `CryptoProvider` is installed. That happens whenever more than one
4198 // `crypto-*` feature is unified (e.g. default `crypto-aws-lc-rs`
4199 // plus an explicit `crypto-ring`), so install the default here —
4200 // the shim is idempotent and a no-op once a provider is set, which
4201 // covers parallel callers and processes that also boot the tokio
4202 // engine.
4203 tls_crypto::install_default_provider();
4204
4205 let timeout = self.timeout.unwrap_or(DEFAULT_TIMEOUT);
4206 let http_builder = reqwest::Client::builder().timeout(timeout);
4207 // The custom-CA / allow-insecure reqwest knobs only exist when a
4208 // rustls TLS feature (`__tls`) is compiled in, which every `crypto-*`
4209 // feature enables. The binary crate always selects one provider, so
4210 // this branch is live in production; a no-crypto library build that
4211 // asks for TLS options gets a clear builder error rather than a
4212 // silently-ignored option.
4213 let http = apply_tls_options(
4214 http_builder,
4215 self.tls_trust_cert_pem,
4216 self.tls_allow_insecure,
4217 )?
4218 .build()
4219 .map_err(AdminError::Http)?;
4220
4221 Ok(AdminClient {
4222 base_url,
4223 base_url_v3,
4224 http,
4225 auth: self.auth,
4226 })
4227 }
4228}
4229
4230/// Errors returned by the admin client.
4231#[derive(Debug, thiserror::Error)]
4232pub enum AdminError {
4233 /// Transport-layer error from `reqwest`.
4234 #[error("http error: {0}")]
4235 Http(#[from] reqwest::Error),
4236 /// API returned a non-success HTTP status.
4237 ///
4238 /// `method` and `url` identify the exact request that failed, so an
4239 /// operator can tell at a glance whether they hit the wrong endpoint,
4240 /// cluster, or proxy without re-deriving the URL from the call site.
4241 #[error("api error {code} from {method} {url}: {body}")]
4242 Status {
4243 /// HTTP method of the failed request (`GET`, `POST`, …).
4244 method: String,
4245 /// Full request URL the client dispatched.
4246 url: String,
4247 /// HTTP status code.
4248 code: u16,
4249 /// Response body (or a placeholder if reading the body failed).
4250 body: String,
4251 },
4252 /// A 2xx response body could not be decoded as the expected JSON.
4253 ///
4254 /// Carries the request `method` + `url`, the HTTP `status`, the
4255 /// response `content_type`, and a truncated body `snippet` so the
4256 /// failure is self-diagnosing: a `text/html` body or an empty
4257 /// `<none>` content-type on a 200 almost always means the request
4258 /// was answered by the wrong endpoint, a reverse proxy, or an auth
4259 /// redirect rather than the broker's admin API. Replaces the bare
4260 /// `serde_json` "expected value at line 1 column 1" message.
4261 #[error(
4262 "unexpected response from {method} {url}: HTTP {status}, content-type {content_type}, body: {snippet}"
4263 )]
4264 Decode {
4265 /// HTTP method of the request whose response failed to decode.
4266 method: String,
4267 /// Full request URL the client dispatched.
4268 url: String,
4269 /// HTTP status code of the response (always 2xx here).
4270 status: u16,
4271 /// Response `Content-Type` header, or `<none>` when absent.
4272 content_type: String,
4273 /// First ~256 bytes of the body (UTF-8 lossy), truncated with an
4274 /// ellipsis marker when longer.
4275 snippet: String,
4276 /// The underlying serde decode error.
4277 #[source]
4278 source: serde_json::Error,
4279 },
4280 /// JSON encode error — building a request body from a Rust value.
4281 ///
4282 /// Response-decode failures surface as [`AdminError::Decode`]; this
4283 /// variant covers the rare serialization-side failure (e.g.
4284 /// `build_url_config_multipart`).
4285 #[error("json encode: {0}")]
4286 Json(#[from] serde_json::Error),
4287 /// URL parse / construction error.
4288 #[error("invalid url: {0}")]
4289 Url(#[from] url::ParseError),
4290 /// Builder configuration error (missing service URL, invalid argument...).
4291 #[error("invalid builder: {0}")]
4292 Builder(String),
4293 /// Authentication failure — e.g. the `OAuth2` `client_credentials` exchange
4294 /// failed or returned an empty access token.
4295 #[error("auth error: {0}")]
4296 Auth(String),
4297 /// Caller passed a namespace or topic name that the client could not parse.
4298 #[error("invalid name: {0}")]
4299 InvalidName(String),
4300 /// Broker returned a response that violates the documented wire contract
4301 /// (e.g. negative `ledgerId` from `getMessageIdByIndex`, which Java
4302 /// `MessageIdImpl` cannot represent either).
4303 #[error("broker protocol violation: {0}")]
4304 Protocol(String),
4305}
4306
4307/// Apply the pulsarctl-derived TLS options (custom CA root + allow-insecure)
4308/// to a reqwest [`ClientBuilder`].
4309///
4310/// `add_root_certificate` / `danger_accept_invalid_certs` / `Certificate`
4311/// live behind reqwest's `__tls` cfg, enabled by every `crypto-*` feature.
4312/// The two-variant cfg split keeps a no-crypto library build compiling: with
4313/// no TLS feature, asking for either option is a hard builder error rather
4314/// than a silently-dropped knob.
4315#[cfg(any(
4316 feature = "crypto-aws-lc-rs",
4317 feature = "crypto-ring",
4318 feature = "crypto-openssl",
4319 feature = "crypto-fips",
4320))]
4321fn apply_tls_options(
4322 mut builder: reqwest::ClientBuilder,
4323 trust_cert_pem: Option<Vec<u8>>,
4324 allow_insecure: bool,
4325) -> Result<reqwest::ClientBuilder, AdminError> {
4326 // Custom CA trust (pulsarctl `tls_trust_certs_file_path`). reqwest's
4327 // `add_root_certificate` adds the cert *alongside* the platform roots,
4328 // matching Pulsar's "extra trust anchor" semantics.
4329 if let Some(pem) = trust_cert_pem {
4330 let cert = reqwest::Certificate::from_pem(&pem)
4331 .map_err(|err| AdminError::Builder(format!("invalid tls trust cert PEM: {err}")))?;
4332 builder = builder.add_root_certificate(cert);
4333 }
4334 // Allow-insecure (pulsarctl `tls_allow_insecure_connection`). Defeats
4335 // certificate verification — only meaningful against self-signed dev
4336 // brokers, hence gated behind the explicit opt-in flag.
4337 if allow_insecure {
4338 builder = builder.danger_accept_invalid_certs(true);
4339 }
4340 Ok(builder)
4341}
4342
4343/// No-TLS fallback: a library build with no `crypto-*` feature cannot honour
4344/// TLS options. Accept the builder unchanged when no option is requested;
4345/// error clearly when one is.
4346#[cfg(not(any(
4347 feature = "crypto-aws-lc-rs",
4348 feature = "crypto-ring",
4349 feature = "crypto-openssl",
4350 feature = "crypto-fips",
4351)))]
4352fn apply_tls_options(
4353 builder: reqwest::ClientBuilder,
4354 trust_cert_pem: Option<Vec<u8>>,
4355 allow_insecure: bool,
4356) -> Result<reqwest::ClientBuilder, AdminError> {
4357 if trust_cert_pem.is_some() || allow_insecure {
4358 return Err(AdminError::Builder(
4359 "TLS options (trust cert / allow-insecure) require a crypto-* feature".to_owned(),
4360 ));
4361 }
4362 Ok(builder)
4363}
4364
4365/// Attach `Authorization: Bearer <tok>` to a request builder.
4366///
4367/// Shared by the `Token` and `OAuth2` auth arms — both ultimately set a
4368/// bearer header; only the source of the token bytes differs.
4369fn bearer(req: RequestBuilder, tok: &str) -> Result<RequestBuilder, AdminError> {
4370 let value = format!("Bearer {tok}");
4371 let mut headers = HeaderMap::new();
4372 let header_value = HeaderValue::from_str(&value)
4373 .map_err(|err| AdminError::Builder(format!("invalid bearer token: {err}")))?;
4374 headers.insert(AUTHORIZATION, header_value);
4375 Ok(req.headers(headers))
4376}
4377
4378/// Maximum number of body bytes echoed back in [`AdminError::Decode`].
4379/// Enough to recognise an HTML error page or a plain-text proxy banner
4380/// without dumping a whole payload into the error message.
4381const DECODE_SNIPPET_LIMIT: usize = 256;
4382
4383/// Build an [`AdminError::Decode`] from a failed-to-decode response's
4384/// parts. Centralises content-type extraction and body-snippet
4385/// truncation so every JSON decoder reports the same enriched context.
4386fn decode_error(
4387 method: &Method,
4388 url: &Url,
4389 status: u16,
4390 headers: &HeaderMap,
4391 body: &[u8],
4392 source: serde_json::Error,
4393) -> AdminError {
4394 let content_type = headers
4395 .get(reqwest::header::CONTENT_TYPE)
4396 .and_then(|v| v.to_str().ok())
4397 .map_or_else(|| "<none>".to_owned(), ToOwned::to_owned);
4398 let snippet = if body.len() > DECODE_SNIPPET_LIMIT {
4399 format!(
4400 "{}… (truncated)",
4401 String::from_utf8_lossy(&body[..DECODE_SNIPPET_LIMIT])
4402 )
4403 } else {
4404 String::from_utf8_lossy(body).into_owned()
4405 };
4406 AdminError::Decode {
4407 method: method.to_string(),
4408 url: url.to_string(),
4409 status,
4410 content_type,
4411 snippet,
4412 source,
4413 }
4414}
4415
4416/// Decode a non-error JSON response body.
4417async fn json_ok<T>(api: ApiResponse) -> Result<T, AdminError>
4418where
4419 T: for<'de> Deserialize<'de>,
4420{
4421 let api = ensure_status(api).await?;
4422 let ApiResponse { method, url, resp } = api;
4423 let status = resp.status().as_u16();
4424 let headers = resp.headers().clone();
4425 let bytes = resp.bytes().await?;
4426 serde_json::from_slice(&bytes)
4427 .map_err(|err| decode_error(&method, &url, status, &headers, &bytes, err))
4428}
4429
4430/// Decode a JSON response body, defaulting to `T::default()` when the
4431/// broker returns 204 / empty body / literal `null`. Pulsar's
4432/// "policy-unset" pattern collapses to one of those three encodings:
4433///
4434/// - `getRetention` / `getPersistence` / `getDispatchRate` / `getPublishRate` on a fresh namespace
4435/// surface `null` through Jersey's `resume(null)`, which travels as 204 No Content;
4436/// - `getTopicPolicies` on a topic with no explicit policy returns either 204 or the literal JSON
4437/// `null`.
4438///
4439/// `json_ok::<RetentionPolicies>` errors with `EOF while parsing a
4440/// value` for either case; this helper maps them to
4441/// `RetentionPolicies::default()` (broker semantic: missing fields ==
4442/// broker default). Callers asserting "post-remove returns the
4443/// default" stay correct without changing the public signature.
4444async fn json_ok_or_default<T>(api: ApiResponse) -> Result<T, AdminError>
4445where
4446 T: for<'de> Deserialize<'de> + Default,
4447{
4448 let api = ensure_status(api).await?;
4449 let ApiResponse { method, url, resp } = api;
4450 // Tolerant short-circuits MUST run before any serde attempt.
4451 if resp.status() == StatusCode::NO_CONTENT {
4452 return Ok(T::default());
4453 }
4454 let status = resp.status().as_u16();
4455 let headers = resp.headers().clone();
4456 let bytes = resp.bytes().await?;
4457 if bytes.is_empty() {
4458 return Ok(T::default());
4459 }
4460 // `null` body — broker says "policy unset" — also folds to default.
4461 if bytes.as_ref().trim_ascii() == b"null" {
4462 return Ok(T::default());
4463 }
4464 serde_json::from_slice::<T>(&bytes)
4465 .map_err(|err| decode_error(&method, &url, status, &headers, &bytes, err))
4466}
4467
4468/// Decode a JSON response body that the broker may emit as an empty
4469/// payload to mean "no value here". Pulsar's pattern for "no override
4470/// set" on the policy GET endpoints is to return `204 No Content` (or
4471/// a 200 with an empty body) rather than the literal JSON `null`;
4472/// `serde_json::from_slice::<Option<T>>(b"")` errors with `EOF while
4473/// parsing a value`, so every `namespace_get_*` / `topic_get_*` that
4474/// returns `Option<T>` needs this tolerant decoder.
4475///
4476/// Decoding rules:
4477/// - 204 No Content → `Ok(None)`
4478/// - 2xx with empty body bytes → `Ok(None)`
4479/// - 2xx with the literal `null` → `Ok(None)`
4480/// - 2xx with a JSON value → `Ok(Some(value))` via serde
4481async fn json_ok_optional<T>(api: ApiResponse) -> Result<Option<T>, AdminError>
4482where
4483 T: for<'de> Deserialize<'de>,
4484{
4485 let api = ensure_status(api).await?;
4486 let ApiResponse { method, url, resp } = api;
4487 // Tolerant short-circuits MUST run before any serde attempt.
4488 if resp.status() == StatusCode::NO_CONTENT {
4489 return Ok(None);
4490 }
4491 let status = resp.status().as_u16();
4492 let headers = resp.headers().clone();
4493 let bytes = resp.bytes().await?;
4494 if bytes.is_empty() {
4495 return Ok(None);
4496 }
4497 serde_json::from_slice::<Option<T>>(&bytes)
4498 .map_err(|err| decode_error(&method, &url, status, &headers, &bytes, err))
4499}
4500
4501/// Discard a successful no-content response body.
4502async fn empty_ok(api: ApiResponse) -> Result<(), AdminError> {
4503 let _ = ensure_status(api).await?;
4504 Ok(())
4505}
4506
4507/// Convert a non-success response into [`AdminError::Status`]. Returns the
4508/// original [`ApiResponse`] on 2xx so the caller can decode the body.
4509///
4510/// The `Status` error carries the request method + URL so a failure
4511/// names the exact endpoint hit, not just the status code and body.
4512async fn ensure_status(api: ApiResponse) -> Result<ApiResponse, AdminError> {
4513 let status = api.resp.status();
4514 if status.is_success() || status == StatusCode::NO_CONTENT {
4515 return Ok(api);
4516 }
4517 let code = status.as_u16();
4518 let ApiResponse { method, url, resp } = api;
4519 let body = resp
4520 .text()
4521 .await
4522 .unwrap_or_else(|err| format!("<failed to read body: {err}>"));
4523 Err(AdminError::Status {
4524 method: method.to_string(),
4525 url: url.to_string(),
4526 code,
4527 body,
4528 })
4529}
4530
4531/// Split a `tenant/namespace` string into its two segments.
4532/// Reject path segments the `url` crate would silently rewrite. `.` and `..`
4533/// disappear under RFC 3986 dot-segment normalisation; percent-encoded slash
4534/// (`%2F` / `%2f`) lets a hostile name escape its segment; NUL / ASCII
4535/// control bytes have no place in an admin path. Refusing all of these at
4536/// the input boundary keeps the URL the client builds in lock-step with the
4537/// path the broker eventually parses.
4538fn validate_segment(segment: &str) -> Result<(), AdminError> {
4539 if segment.is_empty() {
4540 return Err(AdminError::InvalidName("empty path segment".into()));
4541 }
4542 if segment == "." || segment == ".." {
4543 return Err(AdminError::InvalidName(format!(
4544 "dot segment is not a valid name: {segment:?}",
4545 )));
4546 }
4547 if segment.contains("%2F") || segment.contains("%2f") {
4548 return Err(AdminError::InvalidName(format!(
4549 "percent-encoded slash in segment: {segment:?}",
4550 )));
4551 }
4552 if segment.bytes().any(|b| b < 0x20 || b == 0x7f) {
4553 return Err(AdminError::InvalidName(format!(
4554 "control byte in segment: {segment:?}",
4555 )));
4556 }
4557 Ok(())
4558}
4559
4560/// Split a `tenant/namespace` string into its two segments.
4561///
4562/// Exposed for the CLI (and any other admin-client wrapper) so the
4563/// `tenant/namespace` shape used by the namespace-scoped list verbs
4564/// (`functions list`, `sources list`, `sinks list`, `packages list`)
4565/// validates against the same `validate_segment` rules every admin
4566/// method enforces internally — no parallel parsers, no divergent
4567/// error categories.
4568pub fn split_namespace(ns: &str) -> Result<(&str, &str), AdminError> {
4569 let (tenant, namespace) = ns.split_once('/').ok_or_else(|| {
4570 AdminError::InvalidName(format!("expected tenant/namespace, got {ns:?} (no '/')"))
4571 })?;
4572 if tenant.is_empty() || namespace.is_empty() || namespace.contains('/') {
4573 return Err(AdminError::InvalidName(format!(
4574 "expected tenant/namespace, got {ns:?}"
4575 )));
4576 }
4577 validate_segment(tenant)?;
4578 validate_segment(namespace)?;
4579 Ok((tenant, namespace))
4580}
4581
4582/// Split a `persistent://tenant/namespace/topic` (or `tenant/namespace/topic`)
4583/// into its three path segments. The scheme is optional; if present it must
4584/// be `persistent://`.
4585fn split_topic(topic: &str) -> Result<(&str, &str, &str), AdminError> {
4586 let rest = topic.strip_prefix("persistent://").unwrap_or(topic);
4587 let mut parts = rest.splitn(3, '/');
4588 let tenant = parts.next().unwrap_or("");
4589 let namespace = parts.next().unwrap_or("");
4590 let name = parts.next().unwrap_or("");
4591 if tenant.is_empty() || namespace.is_empty() || name.is_empty() || name.contains('/') {
4592 return Err(AdminError::InvalidName(format!(
4593 "expected [persistent://]tenant/namespace/topic, got {topic:?}"
4594 )));
4595 }
4596 validate_segment(tenant)?;
4597 validate_segment(namespace)?;
4598 validate_segment(name)?;
4599 Ok((tenant, namespace, name))
4600}
4601
4602/// Build the two-part `multipart/form-data` envelope the broker expects
4603/// on URL-based function register / update calls. Order is fixed (`url`
4604/// then `functionConfig`) so wire-level tests can pin the body shape.
4605/// The `functionConfig` part carries an explicit `application/json`
4606/// content type; without it the broker's Jersey
4607/// `FormDataMultiPartFeature` falls back to `text/plain` and refuses
4608/// the JSON.
4609fn function_pkg_form(
4610 pkg_url: &str,
4611 config: &FunctionConfig,
4612) -> Result<reqwest::multipart::Form, AdminError> {
4613 build_url_config_multipart(pkg_url, "functionConfig", config)
4614}
4615
4616/// Split a `tenant/namespace/name` Functions / IO identifier into its
4617/// three segments. Pulsar Functions never carry a `persistent://`
4618/// scheme prefix (functions are not topics), so the parser is stricter
4619/// than the internal `split_topic` (rustdoc cannot resolve the bare
4620/// identifier because `split_topic` is module-private).
4621///
4622/// Exposed for the CLI, which parses the fully qualified name out of a
4623/// single positional argument before calling the admin methods (which
4624/// take `tenant`, `namespace`, `name` separately so the broker's
4625/// per-segment validation maps 1:1 to the URL path).
4626pub fn split_function_id(id: &str) -> Result<(&str, &str, &str), AdminError> {
4627 let mut parts = id.splitn(3, '/');
4628 let tenant = parts.next().unwrap_or("");
4629 let namespace = parts.next().unwrap_or("");
4630 let name = parts.next().unwrap_or("");
4631 if tenant.is_empty() || namespace.is_empty() || name.is_empty() || name.contains('/') {
4632 return Err(AdminError::InvalidName(format!(
4633 "expected tenant/namespace/name, got {id:?}"
4634 )));
4635 }
4636 validate_segment(tenant)?;
4637 validate_segment(namespace)?;
4638 validate_segment(name)?;
4639 Ok((tenant, namespace, name))
4640}
4641
4642/// Build the two-part `multipart/form-data` body Pulsar IO's
4643/// `register*` / `update*` endpoints expect when the package is
4644/// referenced by URL (`http(s)://`, `file://`, `function://`): a `url`
4645/// text part and a `<config_field>` JSON part. The broker enforces
4646/// both parts at the dispatcher boundary
4647/// (`SourcesBase#registerSource`, `SinksBase#registerSink`) — missing
4648/// either yields 400. Generic over the config type so Functions,
4649/// Sources, and Sinks share one helper (`functionConfig` /
4650/// `sourceConfig` / `sinkConfig` via the `config_field` argument).
4651fn build_url_config_multipart<T: Serialize>(
4652 pkg_url: &str,
4653 config_field: &str,
4654 config: &T,
4655) -> Result<reqwest::multipart::Form, AdminError> {
4656 let body = serde_json::to_string(config)?;
4657 // `mime_str` only fails on a malformed string; the literal we pass
4658 // is well-formed so the `expect` is on a never-taken branch.
4659 let config_part = reqwest::multipart::Part::text(body)
4660 .mime_str("application/json")
4661 .expect("application/json is a well-formed media type");
4662 let form = reqwest::multipart::Form::new()
4663 .text("url", pkg_url.to_owned())
4664 .part(config_field.to_owned(), config_part);
4665 Ok(form)
4666}
4667
4668#[cfg(test)]
4669mod tests {
4670 use super::*;
4671
4672 #[test]
4673 fn builder_requires_service_url() {
4674 let err = AdminClient::builder().build().unwrap_err();
4675 assert!(matches!(err, AdminError::Builder(_)));
4676 }
4677
4678 #[test]
4679 fn builder_appends_admin_v2_prefix() {
4680 let client = AdminClient::builder()
4681 .service_url("http://localhost:8080".parse().unwrap())
4682 .build()
4683 .unwrap();
4684 assert_eq!(
4685 client.base_url().as_str(),
4686 "http://localhost:8080/admin/v2/"
4687 );
4688 }
4689
4690 #[test]
4691 fn builder_carries_token() {
4692 let client = AdminClient::builder()
4693 .service_url("http://localhost:8080".parse().unwrap())
4694 .token("abc".into())
4695 .build()
4696 .unwrap();
4697 assert!(matches!(client.auth(), AdminAuth::Token(t) if t == "abc"));
4698 }
4699
4700 #[test]
4701 fn admin_auth_token_debug_redacts_secret() {
4702 let auth = AdminAuth::Token("super-secret-jwt".to_owned());
4703 let rendered = format!("{auth:?}");
4704 assert!(
4705 !rendered.contains("super-secret-jwt"),
4706 "raw token leaked through Debug: {rendered}",
4707 );
4708 assert!(
4709 rendered.contains("<redacted>"),
4710 "expected redaction sentinel in {rendered}"
4711 );
4712 assert!(
4713 rendered.contains("Token"),
4714 "expected variant name in {rendered}"
4715 );
4716
4717 let none_rendered = format!("{:?}", AdminAuth::None);
4718 assert_eq!(none_rendered, "None");
4719 }
4720
4721 #[test]
4722 fn admin_client_debug_does_not_leak_token() {
4723 let client = AdminClient::builder()
4724 .service_url("http://localhost:8080".parse().unwrap())
4725 .token("leaky-token".into())
4726 .build()
4727 .unwrap();
4728 let rendered = format!("{client:?}");
4729 assert!(
4730 !rendered.contains("leaky-token"),
4731 "raw token leaked through AdminClient Debug: {rendered}",
4732 );
4733 }
4734
4735 #[test]
4736 fn split_namespace_ok() {
4737 assert_eq!(
4738 split_namespace("public/default").unwrap(),
4739 ("public", "default")
4740 );
4741 }
4742
4743 #[test]
4744 fn split_namespace_rejects_missing_slash() {
4745 assert!(matches!(
4746 split_namespace("public"),
4747 Err(AdminError::InvalidName(_))
4748 ));
4749 }
4750
4751 #[test]
4752 fn split_namespace_rejects_extra_segment() {
4753 assert!(matches!(
4754 split_namespace("public/default/extra"),
4755 Err(AdminError::InvalidName(_))
4756 ));
4757 }
4758
4759 #[test]
4760 fn split_topic_with_scheme() {
4761 let (t, n, name) = split_topic("persistent://acme/svc/orders").unwrap();
4762 assert_eq!((t, n, name), ("acme", "svc", "orders"));
4763 }
4764
4765 #[test]
4766 fn split_topic_without_scheme() {
4767 let (t, n, name) = split_topic("acme/svc/orders").unwrap();
4768 assert_eq!((t, n, name), ("acme", "svc", "orders"));
4769 }
4770
4771 #[test]
4772 fn split_topic_rejects_short_name() {
4773 assert!(matches!(
4774 split_topic("acme/svc"),
4775 Err(AdminError::InvalidName(_))
4776 ));
4777 }
4778
4779 #[test]
4780 fn split_function_id_ok() {
4781 let (t, n, name) = split_function_id("public/default/my-fn").unwrap();
4782 assert_eq!((t, n, name), ("public", "default", "my-fn"));
4783 }
4784
4785 #[test]
4786 fn split_function_id_rejects_short_name() {
4787 assert!(matches!(
4788 split_function_id("public/default"),
4789 Err(AdminError::InvalidName(_))
4790 ));
4791 }
4792
4793 #[test]
4794 fn split_function_id_rejects_persistent_scheme() {
4795 // Functions never carry a `persistent://` prefix — the parser
4796 // refuses one rather than silently treating the scheme as the
4797 // tenant name (which `validate_segment` would later catch via
4798 // the percent-encoded slash rule, but we want a clearer error
4799 // up front).
4800 assert!(matches!(
4801 split_function_id("persistent://acme/svc/fn"),
4802 Err(AdminError::InvalidName(_))
4803 ));
4804 }
4805
4806 #[test]
4807 fn message_id_response_deserialises_java_camelcase() {
4808 // The exact body shape upstream PIP-415 §"Success Response" advertises.
4809 let json = r#"{"ledgerId":12345,"entryId":67890,"partitionIndex":0}"#;
4810 let dto: MessageIdResponse = serde_json::from_str(json).unwrap();
4811 let msg = dto.try_into_message_id().unwrap();
4812 assert_eq!(msg.ledger_id, 12345);
4813 assert_eq!(msg.entry_id, 67890);
4814 assert_eq!(msg.partition, 0);
4815 // The broker resolves at entry granularity — batch fields are absent
4816 // from the JSON and must default to -1 to match the canonical sentinel.
4817 assert_eq!(msg.batch_index, -1);
4818 assert_eq!(msg.batch_size, -1);
4819 }
4820
4821 #[test]
4822 fn message_id_response_defaults_partition_for_non_partitioned_topic() {
4823 // PIP-415 §"Success Response": `partitionIndex: -1` for non-partitioned
4824 // topics. Some broker versions omit the field entirely on
4825 // non-partitioned topics; serde default keeps us correct in either case.
4826 let json = r#"{"ledgerId":1,"entryId":2}"#;
4827 let dto: MessageIdResponse = serde_json::from_str(json).unwrap();
4828 assert_eq!(dto.try_into_message_id().unwrap().partition, -1);
4829 }
4830
4831 #[test]
4832 fn url_helper_emits_single_slash_after_admin_v2() {
4833 // Regression guard: the previous url() helper appended segments after
4834 // the trailing-slash sentinel of /admin/v2/, producing
4835 // /admin/v2//persistent/... — real brokers tolerated it but strict
4836 // mocks (and Java's PulsarAdmin) emit the single-slash form. Pin the
4837 // current behaviour so we notice any future regression.
4838 let client = AdminClient::builder()
4839 .service_url("http://broker.example:8080".parse().unwrap())
4840 .build()
4841 .unwrap();
4842 let url = client.url(&["clusters"]).unwrap();
4843 assert_eq!(url.as_str(), "http://broker.example:8080/admin/v2/clusters");
4844 let url2 = client
4845 .url(&["persistent", "public", "default", "topic", "stats"])
4846 .unwrap();
4847 assert_eq!(
4848 url2.as_str(),
4849 "http://broker.example:8080/admin/v2/persistent/public/default/topic/stats"
4850 );
4851 }
4852
4853 #[test]
4854 fn url_helper_preserves_path_prefix_without_trailing_slash() {
4855 // Regression guard: per WHATWG URL semantics, `Url::join("admin/v2/")`
4856 // on a base whose path has no trailing slash REPLACES the last segment
4857 // — `http://broker/pulsar` + `admin/v2/` becomes
4858 // `http://broker/admin/v2/`, silently dropping `/pulsar`. The builder
4859 // normalises the base to end in `/` before joining so any operator
4860 // running behind a path-prefixed K8s ingress (`--admin-url
4861 // http://gw/pulsar`) gets the right URLs on both V2 and V3 endpoints.
4862 let client = AdminClient::builder()
4863 .service_url("http://broker.example:8080/pulsar".parse().unwrap())
4864 .build()
4865 .unwrap();
4866 let url = client.url(&["clusters"]).unwrap();
4867 assert_eq!(
4868 url.as_str(),
4869 "http://broker.example:8080/pulsar/admin/v2/clusters"
4870 );
4871 let url_v3 = client.url_v3(&["functions", "a", "b"]).unwrap();
4872 assert_eq!(
4873 url_v3.as_str(),
4874 "http://broker.example:8080/pulsar/admin/v3/functions/a/b"
4875 );
4876 }
4877
4878 #[test]
4879 fn split_topic_rejects_dot_segments() {
4880 // LISA-001: `..` / `.` in any segment would silently normalise out via
4881 // url::Url::path_segments_mut, producing a client/server URL parser
4882 // differential. Refuse them at the input boundary.
4883 assert!(matches!(
4884 split_topic("persistent://../foo/bar"),
4885 Err(AdminError::InvalidName(_))
4886 ));
4887 assert!(matches!(
4888 split_topic("./foo/bar"),
4889 Err(AdminError::InvalidName(_))
4890 ));
4891 assert!(matches!(
4892 split_topic("tenant/./topic"),
4893 Err(AdminError::InvalidName(_))
4894 ));
4895 }
4896
4897 #[test]
4898 fn split_topic_rejects_control_bytes_and_percent_encoded_slash() {
4899 assert!(matches!(
4900 split_topic("tenant/ns/topic%2Fevil"),
4901 Err(AdminError::InvalidName(_))
4902 ));
4903 assert!(matches!(
4904 split_topic("tenant/ns/top\0ic"),
4905 Err(AdminError::InvalidName(_))
4906 ));
4907 }
4908}