magnetar_admin/lib.rs
1// SPDX-License-Identifier: Apache-2.0
2
3//! Apache Pulsar admin REST client (`/admin/v2/...`).
4//!
5//! Thin async wrapper around [`reqwest`] for the broker's JAX-RS admin API.
6//! TLS is via `rustls-tls`. There are no channels and no background tasks: every
7//! call is a one-shot `await` that resolves to a [`Result`].
8//!
9//! Endpoint paths mirror the broker. Each method's rustdoc cites the Java
10//! endpoint class (file + relevant `@Path` annotation) in `apache/pulsar` so a
11//! reader can confirm the URL and HTTP verb against upstream.
12//!
13//! ## Quick start
14//!
15//! ```no_run
16//! # async fn run() -> Result<(), Box<dyn std::error::Error>> {
17//! use magnetar_admin::{AdminClient, TenantInfo};
18//!
19//! let admin = AdminClient::builder()
20//! .service_url("http://localhost:8080".parse()?)
21//! .build()?;
22//!
23//! let tenants = admin.tenants_list().await?;
24//! println!("{tenants:?}");
25//!
26//! admin
27//! .tenant_create(
28//! "acme",
29//! TenantInfo {
30//! admin_roles: vec!["admin".into()],
31//! allowed_clusters: vec!["standalone".into()],
32//! },
33//! )
34//! .await?;
35//! # Ok(()) }
36//! ```
37
38#![warn(unreachable_pub)]
39#![forbid(unsafe_code)]
40
41mod tls_crypto;
42
43use std::collections::HashMap;
44use std::sync::Arc;
45use std::time::Duration;
46
47use magnetar_auth_oauth2::ClientCredentialsFlow;
48use magnetar_proto::MessageId;
49use reqwest::header::{AUTHORIZATION, HeaderMap, HeaderValue};
50use reqwest::{Method, RequestBuilder, Response, StatusCode};
51use serde::{Deserialize, Serialize};
52use url::Url;
53
54/// Default request timeout. Mirrors `PulsarAdminBuilder` Java default of 60s
55/// (see `pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/
56/// PulsarAdminBuilderImpl.java`).
57pub const DEFAULT_TIMEOUT: Duration = Duration::from_secs(60);
58
59/// Longer per-call timeout for `*_create_with_url` / `*_update_with_url`
60/// endpoints. The broker fetches the package from the supplied URL
61/// before responding, so the round-trip can comfortably exceed 60 s
62/// against a slow internal registry (S3 / GCS / function://) or a
63/// large `.nar` / `.jar` artifact. Overrides the client-level timeout
64/// for those calls only — every other admin verb keeps the 60 s
65/// budget [`DEFAULT_TIMEOUT`] provides.
66///
67/// 5 min matches the Java admin client's read-timeout for register
68/// paths.
69pub const PACKAGE_REGISTER_TIMEOUT: Duration = Duration::from_secs(300);
70
71/// Authentication strategy used by the admin client.
72///
73/// `Token(...)` adds `Authorization: Bearer <token>` to every request.
74/// Mirrors Java's `AuthenticationToken` provider. `OAuth2(...)` performs a
75/// `client_credentials` exchange against the IDP and attaches the resulting
76/// access token as a bearer credential — mirrors Java's
77/// `AuthenticationOAuth2`.
78#[derive(Clone, Default)]
79pub enum AdminAuth {
80 /// No authentication.
81 #[default]
82 None,
83 /// Bearer token. The string is the raw token; the `Bearer ` prefix is added
84 /// at request time.
85 Token(String),
86 /// `OAuth2` `client_credentials` flow. The cached access token is refreshed
87 /// (when missing or near expiry) and attached as `Authorization: Bearer
88 /// <access-token>` at request time. The flow is shared (`Arc`) so its
89 /// token cache is reused across every admin call.
90 OAuth2(Arc<ClientCredentialsFlow>),
91}
92
93impl std::fmt::Debug for AdminAuth {
94 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
95 // Redact the token body so calling `Debug` on the admin client never
96 // spills the bearer credential to tracing or stdout. Mirrors the
97 // `Credentials`/`ClientCredentialsFlow` Debug redaction in
98 // `magnetar-auth-oauth2`.
99 match self {
100 Self::None => f.write_str("None"),
101 Self::Token(_) => f.debug_tuple("Token").field(&"<redacted>").finish(),
102 // `ClientCredentialsFlow`'s own `Debug` already redacts the secret
103 // and cached token, so forwarding to it is safe.
104 Self::OAuth2(flow) => f.debug_tuple("OAuth2").field(flow).finish(),
105 }
106 }
107}
108
109/// Apache Pulsar admin REST client.
110///
111/// Holds two pre-computed base URLs: `base_url` anchored at
112/// `/admin/v2/` (clusters / tenants / namespaces / topics /
113/// subscriptions / brokers / bookies / schemas) and `base_url_v3`
114/// anchored at `/admin/v3/` (Pulsar Functions / IO Sources / IO Sinks
115/// / Packages). The split mirrors Pulsar's own routing — Java's
116/// `PulsarAdmin` keeps them separate too. Both URLs are derived from
117/// the same broker root at builder time, so a caller never has to
118/// know which version family an endpoint belongs to.
119#[derive(Debug, Clone)]
120pub struct AdminClient {
121 base_url: Url,
122 base_url_v3: Url,
123 http: reqwest::Client,
124 auth: AdminAuth,
125}
126
127impl AdminClient {
128 /// Start building an admin client.
129 #[must_use]
130 pub fn builder() -> AdminClientBuilder {
131 AdminClientBuilder::default()
132 }
133
134 /// Return the base URL the client targets (with the trailing `/admin/v2/`
135 /// component already appended). Exposed for tests and diagnostics.
136 #[must_use]
137 pub fn base_url(&self) -> &Url {
138 &self.base_url
139 }
140
141 /// Return the configured auth strategy. Exposed for tests and diagnostics.
142 #[must_use]
143 pub fn auth(&self) -> &AdminAuth {
144 &self.auth
145 }
146
147 // --- Cluster ---------------------------------------------------------
148
149 /// List clusters.
150 ///
151 /// `GET /admin/v2/clusters`.
152 /// Java: `pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Clusters.java`
153 /// (`@Path("/clusters")`) + `admin/impl/ClustersBase.java#getClusters`.
154 pub async fn cluster_list(&self) -> Result<Vec<String>, AdminError> {
155 let url = self.url(&["clusters"])?;
156 let resp = self.send(self.http.request(Method::GET, url)).await?;
157 json_ok(resp).await
158 }
159
160 /// List failure-domains configured on a cluster.
161 ///
162 /// `GET /admin/v2/clusters/{cluster}/failureDomains`. The broker returns
163 /// a `Map<String, FailureDomain>` keyed by domain name; each value
164 /// carries a `brokers: Set<String>` member. The map is exposed as a
165 /// raw `serde_json::Value` for forward-compat — broker minor versions
166 /// add fields.
167 /// Java: `ClustersBase#getFailureDomains`.
168 pub async fn cluster_failure_domains_list(
169 &self,
170 cluster: &str,
171 ) -> Result<serde_json::Value, AdminError> {
172 validate_segment(cluster)?;
173 let url = self.url(&["clusters", cluster, "failureDomains"])?;
174 let resp = self.send(self.http.request(Method::GET, url)).await?;
175 json_ok(resp).await
176 }
177
178 /// Get one failure-domain by name.
179 ///
180 /// `GET /admin/v2/clusters/{cluster}/failureDomains/{domain}`.
181 /// Java: `ClustersBase#getDomain`.
182 pub async fn cluster_failure_domain_get(
183 &self,
184 cluster: &str,
185 domain: &str,
186 ) -> Result<serde_json::Value, AdminError> {
187 validate_segment(cluster)?;
188 validate_segment(domain)?;
189 let url = self.url(&["clusters", cluster, "failureDomains", domain])?;
190 let resp = self.send(self.http.request(Method::GET, url)).await?;
191 json_ok(resp).await
192 }
193
194 /// List namespace-isolation policies configured on a cluster.
195 ///
196 /// `GET /admin/v2/clusters/{cluster}/namespaceIsolationPolicies`. The
197 /// broker returns a `Map<String, NamespaceIsolationData>` carrying
198 /// the namespace regex, primary/secondary broker lists, and the
199 /// auto-failover policy. Exposed as raw JSON for forward-compat.
200 ///
201 /// A cluster with no isolation policies configured surfaces as
202 /// `404 NamespaceIsolationPolicies for cluster ... does not exist`
203 /// (Pulsar 4.0.x) rather than an empty map; we mirror the Java
204 /// client's `Map<String, _>` surface by mapping that specific 404 to
205 /// an empty `{}` object. Other 404s (auth, wrong cluster) still
206 /// surface as `AdminError::Status`.
207 /// Java: `ClustersBase#getNamespaceIsolationPolicies`.
208 pub async fn namespace_isolation_policies_list(
209 &self,
210 cluster: &str,
211 ) -> Result<serde_json::Value, AdminError> {
212 validate_segment(cluster)?;
213 let url = self.url(&["clusters", cluster, "namespaceIsolationPolicies"])?;
214 let resp = self.send(self.http.request(Method::GET, url)).await?;
215 match json_ok::<serde_json::Value>(resp).await {
216 Ok(v) => Ok(v),
217 Err(AdminError::Status {
218 code: 404, body, ..
219 }) if body.contains("NamespaceIsolationPolicies") => {
220 Ok(serde_json::Value::Object(serde_json::Map::new()))
221 }
222 Err(e) => Err(e),
223 }
224 }
225
226 // --- Brokers ---------------------------------------------------------
227
228 /// List active brokers in a cluster.
229 ///
230 /// `GET /admin/v2/brokers/{cluster}`. Returns a list of `host:port`
231 /// strings — one entry per broker that's currently registered with
232 /// the cluster's metadata store. Java: `BrokersBase#getActiveBrokers`.
233 pub async fn brokers_list(&self, cluster: &str) -> Result<Vec<String>, AdminError> {
234 validate_segment(cluster)?;
235 let url = self.url(&["brokers", cluster])?;
236 let resp = self.send(self.http.request(Method::GET, url)).await?;
237 json_ok(resp).await
238 }
239
240 /// Get the current leader broker for the cluster.
241 ///
242 /// `GET /admin/v2/brokers/leaderBroker`. Returns `{ serviceUrl,
243 /// brokerId }`. Exposed as raw JSON for forward-compat — newer
244 /// brokers add `clusterName` and similar fields.
245 /// Java: `BrokersBase#getLeaderBroker`.
246 pub async fn brokers_leader(&self) -> Result<serde_json::Value, AdminError> {
247 let url = self.url(&["brokers", "leaderBroker"])?;
248 let resp = self.send(self.http.request(Method::GET, url)).await?;
249 json_ok(resp).await
250 }
251
252 /// List the names of all dynamic-config keys the broker exposes.
253 ///
254 /// `GET /admin/v2/brokers/configuration`. Returns the bare list of
255 /// `ServiceConfiguration` fields tagged `@FieldContext(dynamic = true)`
256 /// — the set of keys that `brokers_set_dynamic_config` accepts. Use
257 /// [`Self::brokers_dynamic_config_overrides`] for the current values.
258 ///
259 /// Pulsar 4 normally returns a JSON array (`List<String>` from
260 /// `BrokerService#getDynamicConfiguration`), but some packaging /
261 /// proxy paths surface the underlying `Map<String, ConfigField>`
262 /// shape instead. We accept both — array → values, object → keys —
263 /// to stay version-tolerant. Java: `BrokersBase#getDynamicConfigurationName`.
264 pub async fn brokers_dynamic_config_keys(&self) -> Result<Vec<String>, AdminError> {
265 let url = self.url(&["brokers", "configuration"])?;
266 let resp = self.send(self.http.request(Method::GET, url)).await?;
267 let v: serde_json::Value = json_ok(resp).await?;
268 match v {
269 serde_json::Value::Array(items) => Ok(items
270 .into_iter()
271 .filter_map(|x| x.as_str().map(str::to_owned))
272 .collect()),
273 serde_json::Value::Object(map) => Ok(map.into_iter().map(|(k, _)| k).collect()),
274 other => Err(AdminError::Protocol(format!(
275 "brokers/configuration returned unexpected shape: {other}"
276 ))),
277 }
278 }
279
280 /// Get the currently-overridden dynamic configuration values.
281 ///
282 /// `GET /admin/v2/brokers/configuration/values`. Returns a
283 /// `Map<String, String>` of every dynamic key the operator has set
284 /// (the broker omits keys still on their static / default value).
285 /// Exposed as raw JSON because broker minor versions add keys.
286 /// Java: `BrokersBase#getAllDynamicConfigurations`.
287 pub async fn brokers_dynamic_config_overrides(&self) -> Result<serde_json::Value, AdminError> {
288 let url = self.url(&["brokers", "configuration", "values"])?;
289 let resp = self.send(self.http.request(Method::GET, url)).await?;
290 json_ok(resp).await
291 }
292
293 /// Get the broker's runtime (merged static + dynamic) configuration.
294 ///
295 /// `GET /admin/v2/brokers/configuration/runtime`. Returns the full
296 /// `Map<String, String>` of `ServiceConfiguration` values as they
297 /// currently apply on the broker process — static defaults
298 /// overlaid with any `brokers_set_dynamic_config` overrides. Raw
299 /// JSON for forward-compat. Java: `BrokersBase#getRuntimeConfiguration`.
300 pub async fn brokers_runtime_config(&self) -> Result<serde_json::Value, AdminError> {
301 let url = self.url(&["brokers", "configuration", "runtime"])?;
302 let resp = self.send(self.http.request(Method::GET, url)).await?;
303 json_ok(resp).await
304 }
305
306 /// Get the broker's internal-stack endpoints.
307 ///
308 /// `GET /admin/v2/brokers/internal-configuration`. Returns the
309 /// `InternalConfigurationData` envelope — metadata-store URLs
310 /// (`zookeeperServers`, `configurationMetadataStoreUrl`),
311 /// `BookKeeper` metadata service URI, ledger root paths. Raw JSON
312 /// for forward-compat; the shape rolls between releases as the
313 /// metadata layer evolves.
314 /// Java: `BrokersBase#getInternalConfigurationData`.
315 pub async fn brokers_internal_config(&self) -> Result<serde_json::Value, AdminError> {
316 let url = self.url(&["brokers", "internal-configuration"])?;
317 let resp = self.send(self.http.request(Method::GET, url)).await?;
318 json_ok(resp).await
319 }
320
321 /// Probe broker health — produces and consumes one heartbeat message
322 /// on an internal topic.
323 ///
324 /// `GET /admin/v2/brokers/health`. The broker returns the plain-text
325 /// string `"ok"` on success; non-200 surfaces as `AdminError::Status`.
326 /// Java: `BrokersBase#healthCheck`.
327 pub async fn brokers_health_check(&self) -> Result<String, AdminError> {
328 let url = self.url(&["brokers", "health"])?;
329 let resp = self.send(self.http.request(Method::GET, url)).await?;
330 let resp = ensure_status(resp).await?;
331 Ok(resp.resp.text().await?)
332 }
333
334 /// List the namespaces a specific broker currently owns.
335 ///
336 /// `GET /admin/v2/brokers/{cluster}/{broker}/ownedNamespaces`. The
337 /// `broker` argument must be the broker's `host:port` (matching the
338 /// strings [`Self::brokers_list`] returns). Returns a
339 /// `Map<String, NamespaceOwnershipStatus>` keyed by namespace name —
340 /// raw JSON for forward-compat.
341 /// Java: `BrokersBase#getOwnedNamespaces`.
342 pub async fn brokers_owned_namespaces(
343 &self,
344 cluster: &str,
345 broker: &str,
346 ) -> Result<serde_json::Value, AdminError> {
347 validate_segment(cluster)?;
348 validate_segment(broker)?;
349 let url = self.url(&["brokers", cluster, broker, "ownedNamespaces"])?;
350 let resp = self.send(self.http.request(Method::GET, url)).await?;
351 json_ok(resp).await
352 }
353
354 /// Override a dynamic broker configuration value.
355 ///
356 /// `POST /admin/v2/brokers/configuration/{name}/{value}`. Both the
357 /// key and the value travel in the URL path — there is no request
358 /// body — matching the broker's `updateDynamicConfiguration(@PathParam
359 /// String configName, @PathParam String configValue)` signature.
360 /// The key must be one of those returned by
361 /// [`Self::brokers_dynamic_config_keys`]; unknown keys yield 412.
362 /// Java: `BrokersBase#updateDynamicConfiguration`.
363 pub async fn brokers_set_dynamic_config(
364 &self,
365 name: &str,
366 value: &str,
367 ) -> Result<(), AdminError> {
368 validate_segment(name)?;
369 validate_segment(value)?;
370 let url = self.url(&["brokers", "configuration", name, value])?;
371 let resp = self.send(self.http.request(Method::POST, url)).await?;
372 empty_ok(resp).await
373 }
374
375 /// Drop a dynamic configuration override, reverting to the static value.
376 ///
377 /// `DELETE /admin/v2/brokers/configuration/{name}`. After the call
378 /// the key disappears from [`Self::brokers_dynamic_config_overrides`]
379 /// and [`Self::brokers_runtime_config`] reflects the underlying
380 /// static / default value again.
381 /// Java: `BrokersBase#deleteDynamicConfiguration`.
382 pub async fn brokers_delete_dynamic_config(&self, name: &str) -> Result<(), AdminError> {
383 validate_segment(name)?;
384 let url = self.url(&["brokers", "configuration", name])?;
385 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
386 empty_ok(resp).await
387 }
388
389 // --- Bookies ---------------------------------------------------------
390
391 /// List every bookie the broker knows about — both writable and
392 /// read-only — as registered in `BookKeeper` metadata.
393 ///
394 /// `GET /admin/v2/bookies/all`. Returns the broker's
395 /// `BookiesClusterInfo` envelope — a `bookies: [{ address: "host:port" }]`
396 /// array. Raw JSON for forward-compat.
397 /// Java: `BookiesBase#getAllAvailableBookies`.
398 pub async fn bookies_list_all(&self) -> Result<serde_json::Value, AdminError> {
399 let url = self.url(&["bookies", "all"])?;
400 let resp = self.send(self.http.request(Method::GET, url)).await?;
401 json_ok(resp).await
402 }
403
404 /// Get every bookie's group + rack assignment, as configured for the
405 /// rack-aware placement policy.
406 ///
407 /// `GET /admin/v2/bookies/racks-info`. Returns the nested
408 /// `Map<group, Map<bookieAddress, BookieInfo>>` shape Pulsar
409 /// persists in metadata. Raw JSON because the wire shape exposes
410 /// nested maps that change between releases (the `default` group
411 /// is implicit on older brokers).
412 /// Java: `BookiesBase#getBookieRackInfo`.
413 pub async fn bookies_racks_info(&self) -> Result<serde_json::Value, AdminError> {
414 let url = self.url(&["bookies", "racks-info"])?;
415 let resp = self.send(self.http.request(Method::GET, url)).await?;
416 json_ok(resp).await
417 }
418
419 /// Set (or update) a bookie's rack assignment.
420 ///
421 /// `POST /admin/v2/bookies/racks-info/{bookie}?group={group}` with
422 /// a JSON [`BookieInfo`] body carrying only `{rack, hostname}`.
423 /// `bookie` is the `host:port` registered in `BookKeeper` metadata.
424 /// The placement policy picks up the new rack on its next
425 /// reconciliation tick.
426 ///
427 /// `group` is **a query parameter**, not a body field — Pulsar's
428 /// `BookiesBase#updateBookieRackInfo(@PathParam("bookie") String,
429 /// @QueryParam("group") String, BookieInfo)` Jackson-binds the body
430 /// to `{rack, hostname}` only; an unknown `group` body field is
431 /// silently ignored and the query param defaults to `null`,
432 /// dropping the operator's group choice on the wire.
433 /// Java: `BookiesBase#updateBookieRackInfo`.
434 pub async fn bookies_set_rack(
435 &self,
436 bookie: &str,
437 group: &str,
438 info: BookieInfo,
439 ) -> Result<(), AdminError> {
440 validate_segment(bookie)?;
441 let mut url = self.url(&["bookies", "racks-info", bookie])?;
442 url.query_pairs_mut().append_pair("group", group);
443 let resp = self
444 .send(self.http.request(Method::POST, url).json(&info))
445 .await?;
446 empty_ok(resp).await
447 }
448
449 /// Remove a bookie's rack assignment.
450 ///
451 /// `DELETE /admin/v2/bookies/racks-info/{bookie}`. The bookie falls
452 /// back to the placement policy's default group / rack until
453 /// [`Self::bookies_set_rack`] is called again.
454 /// Java: `BookiesBase#deleteBookieRackInfo`.
455 pub async fn bookies_delete_rack(&self, bookie: &str) -> Result<(), AdminError> {
456 validate_segment(bookie)?;
457 let url = self.url(&["bookies", "racks-info", bookie])?;
458 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
459 empty_ok(resp).await
460 }
461
462 // --- Schemas ---------------------------------------------------------
463
464 /// Get the latest schema attached to a topic.
465 ///
466 /// `GET /admin/v2/schemas/{tenant}/{ns}/{topic}/schema`. Returns
467 /// `{ version, type, schema, properties, timestamp }`; raw JSON
468 /// because the `type` axis (`AVRO` / `JSON` / `PROTOBUF` /
469 /// `PROTOBUF_NATIVE` / `KEY_VALUE` / `STRING` / `BYTES` / …) is
470 /// open-ended and broker minor versions add keys (deletion
471 /// tombstones surface as `type: "DELETE"` on the GET, for
472 /// instance). Java: `SchemasResourceBase#getSchema`.
473 pub async fn schema_get_latest(&self, topic: &str) -> Result<serde_json::Value, AdminError> {
474 let (tenant, namespace, name) = split_topic(topic)?;
475 let url = self.url(&["schemas", tenant, namespace, name, "schema"])?;
476 let resp = self.send(self.http.request(Method::GET, url)).await?;
477 json_ok(resp).await
478 }
479
480 /// Get a specific schema version attached to a topic.
481 ///
482 /// `GET /admin/v2/schemas/{tenant}/{ns}/{topic}/schema/{version}`.
483 /// `version` is the monotonically-increasing integer the broker
484 /// assigns at registration. Same wire shape as
485 /// [`Self::schema_get_latest`].
486 /// Java: `SchemasResourceBase#getSchema` (with version path param).
487 pub async fn schema_get_version(
488 &self,
489 topic: &str,
490 version: i64,
491 ) -> Result<serde_json::Value, AdminError> {
492 let (tenant, namespace, name) = split_topic(topic)?;
493 let v = version.to_string();
494 let url = self.url(&["schemas", tenant, namespace, name, "schema", &v])?;
495 let resp = self.send(self.http.request(Method::GET, url)).await?;
496 json_ok(resp).await
497 }
498
499 /// List every schema version registered for a topic.
500 ///
501 /// `GET /admin/v2/schemas/{tenant}/{ns}/{topic}/schemas`. Pulsar 4
502 /// wraps the per-version entries in a
503 /// `GetAllVersionsSchemaResponse { getSchemaResponses: [...] }`
504 /// envelope (verified against `apache/pulsar@v4.0.4`
505 /// `SchemasResourceBase#convertToAllVersionsSchemaResponse`). We
506 /// unwrap that envelope at the boundary so callers see the flat
507 /// `Vec<Value>` they expect; a bare-array shape (older or
508 /// alternative serialisations) is still accepted. Raw JSON
509 /// per-entry for forward-compat. Java:
510 /// `SchemasResourceBase#getAllSchemas`.
511 pub async fn schema_list_versions(
512 &self,
513 topic: &str,
514 ) -> Result<Vec<serde_json::Value>, AdminError> {
515 let (tenant, namespace, name) = split_topic(topic)?;
516 let url = self.url(&["schemas", tenant, namespace, name, "schemas"])?;
517 let resp = self.send(self.http.request(Method::GET, url)).await?;
518 let v: serde_json::Value = json_ok(resp).await?;
519 match v {
520 serde_json::Value::Array(items) => Ok(items),
521 serde_json::Value::Object(mut envelope) => {
522 if let Some(serde_json::Value::Array(items)) = envelope.remove("getSchemaResponses")
523 {
524 return Ok(items);
525 }
526 Err(AdminError::Protocol(format!(
527 "schemas/.../schemas envelope missing `getSchemaResponses` array: {}",
528 serde_json::Value::Object(envelope)
529 )))
530 }
531 other => Err(AdminError::Protocol(format!(
532 "schemas/.../schemas returned unexpected shape: {other}"
533 ))),
534 }
535 }
536
537 /// Register a new schema version on a topic.
538 ///
539 /// `POST /admin/v2/schemas/{tenant}/{ns}/{topic}/schema` with a JSON
540 /// [`PostSchemaPayload`] body. The broker returns `{ version: N }`;
541 /// raw JSON because the upstream response envelope wraps the
542 /// version under `data` on some 4.x point releases. Compatibility
543 /// is enforced server-side per the namespace's
544 /// `schemaCompatibilityStrategy` — incompatible posts fail with
545 /// 409. Java: `SchemasResourceBase#postSchema`.
546 pub async fn schema_post(
547 &self,
548 topic: &str,
549 payload: PostSchemaPayload,
550 ) -> Result<serde_json::Value, AdminError> {
551 let (tenant, namespace, name) = split_topic(topic)?;
552 let url = self.url(&["schemas", tenant, namespace, name, "schema"])?;
553 let resp = self
554 .send(self.http.request(Method::POST, url).json(&payload))
555 .await?;
556 json_ok(resp).await
557 }
558
559 /// Delete a topic's schema.
560 ///
561 /// `DELETE /admin/v2/schemas/{tenant}/{ns}/{topic}/schema?force={force}`.
562 /// `force = true` skips the broker's "is the schema in use"
563 /// guard — equivalent to `pulsar-admin schemas delete --force`.
564 /// Java: `SchemasResourceBase#deleteSchema`.
565 pub async fn schema_delete(&self, topic: &str, force: bool) -> Result<(), AdminError> {
566 let (tenant, namespace, name) = split_topic(topic)?;
567 let mut url = self.url(&["schemas", tenant, namespace, name, "schema"])?;
568 url.query_pairs_mut()
569 .append_pair("force", if force { "true" } else { "false" });
570 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
571 empty_ok(resp).await
572 }
573
574 /// Check whether a candidate schema would be compatible with the
575 /// topic's current schema.
576 ///
577 /// `POST /admin/v2/schemas/{tenant}/{ns}/{topic}/compatibility` with
578 /// a JSON [`PostSchemaPayload`] body — the same shape
579 /// [`Self::schema_post`] sends, but the broker only evaluates
580 /// compatibility and never persists. Returns `{ isCompatible:
581 /// bool, schemaCompatibilityStrategy: "..." }`; raw JSON for
582 /// forward-compat.
583 /// Java: `SchemasResourceBase#testCompatibility`.
584 pub async fn schema_compatibility_check(
585 &self,
586 topic: &str,
587 payload: PostSchemaPayload,
588 ) -> Result<serde_json::Value, AdminError> {
589 let (tenant, namespace, name) = split_topic(topic)?;
590 let url = self.url(&["schemas", tenant, namespace, name, "compatibility"])?;
591 let resp = self
592 .send(self.http.request(Method::POST, url).json(&payload))
593 .await?;
594 json_ok(resp).await
595 }
596
597 // --- Pulsar Functions (read) ----------------------------------------
598
599 /// List every function registered under a namespace.
600 ///
601 /// `GET /admin/v3/functions/{tenant}/{namespace}`. Returns a JSON
602 /// array of bare function names (no tenant / namespace prefix) —
603 /// matches Java's `FunctionsBase#listFunctions` body shape.
604 pub async fn functions_list_by_namespace(
605 &self,
606 tenant: &str,
607 namespace: &str,
608 ) -> Result<Vec<String>, AdminError> {
609 validate_segment(tenant)?;
610 validate_segment(namespace)?;
611 let url = self.url_v3(&["functions", tenant, namespace])?;
612 let resp = self.send(self.http.request(Method::GET, url)).await?;
613 json_ok(resp).await
614 }
615
616 /// Get a function's registered `FunctionConfig`.
617 ///
618 /// `GET /admin/v3/functions/{tenant}/{namespace}/{name}`. The
619 /// `FunctionConfig` Java type has ~30 fields and grows on every
620 /// minor release — return raw JSON for forward-compat. Java:
621 /// `FunctionsBase#getFunctionInfo`.
622 pub async fn function_get(
623 &self,
624 tenant: &str,
625 namespace: &str,
626 name: &str,
627 ) -> Result<serde_json::Value, AdminError> {
628 validate_segment(tenant)?;
629 validate_segment(namespace)?;
630 validate_segment(name)?;
631 let url = self.url_v3(&["functions", tenant, namespace, name])?;
632 let resp = self.send(self.http.request(Method::GET, url)).await?;
633 json_ok(resp).await
634 }
635
636 /// Get a function's aggregate status (all instances).
637 ///
638 /// `GET /admin/v3/functions/{tenant}/{namespace}/{name}/status`.
639 /// Returns Java's `FunctionStatus` envelope:
640 /// `{numInstances, numRunning, instances: [...]}`. Raw JSON because
641 /// the per-instance shape carries broker-version-dependent fields.
642 /// Java: `FunctionsBase#getFunctionStatus`.
643 pub async fn function_status(
644 &self,
645 tenant: &str,
646 namespace: &str,
647 name: &str,
648 ) -> Result<serde_json::Value, AdminError> {
649 validate_segment(tenant)?;
650 validate_segment(namespace)?;
651 validate_segment(name)?;
652 let url = self.url_v3(&["functions", tenant, namespace, name, "status"])?;
653 let resp = self.send(self.http.request(Method::GET, url)).await?;
654 json_ok(resp).await
655 }
656
657 /// Get a function's aggregate runtime statistics (all instances).
658 ///
659 /// `GET /admin/v3/functions/{tenant}/{namespace}/{name}/stats`.
660 /// Returns Java's `FunctionStats` envelope — message rates,
661 /// processed counts, average latency, per-instance breakdown. Raw
662 /// JSON for forward-compat. Java: `FunctionsBase#getFunctionStats`.
663 pub async fn function_stats(
664 &self,
665 tenant: &str,
666 namespace: &str,
667 name: &str,
668 ) -> Result<serde_json::Value, AdminError> {
669 validate_segment(tenant)?;
670 validate_segment(namespace)?;
671 validate_segment(name)?;
672 let url = self.url_v3(&["functions", tenant, namespace, name, "stats"])?;
673 let resp = self.send(self.http.request(Method::GET, url)).await?;
674 json_ok(resp).await
675 }
676
677 /// Get one instance's status.
678 ///
679 /// `GET /admin/v3/functions/{tenant}/{namespace}/{name}/{instance_id}/status`.
680 /// `instance_id` is the integer index the broker assigns at
681 /// schedule time (`0..parallelism`). Java:
682 /// `FunctionsBase#getFunctionInstanceStatus`.
683 pub async fn function_instance_status(
684 &self,
685 tenant: &str,
686 namespace: &str,
687 name: &str,
688 instance_id: i32,
689 ) -> Result<serde_json::Value, AdminError> {
690 validate_segment(tenant)?;
691 validate_segment(namespace)?;
692 validate_segment(name)?;
693 let id = instance_id.to_string();
694 let url = self.url_v3(&["functions", tenant, namespace, name, &id, "status"])?;
695 let resp = self.send(self.http.request(Method::GET, url)).await?;
696 json_ok(resp).await
697 }
698
699 /// Get one instance's runtime statistics.
700 ///
701 /// `GET /admin/v3/functions/{tenant}/{namespace}/{name}/{instance_id}/stats`.
702 /// Java: `FunctionsBase#getFunctionInstanceStats`.
703 pub async fn function_instance_stats(
704 &self,
705 tenant: &str,
706 namespace: &str,
707 name: &str,
708 instance_id: i32,
709 ) -> Result<serde_json::Value, AdminError> {
710 validate_segment(tenant)?;
711 validate_segment(namespace)?;
712 validate_segment(name)?;
713 let id = instance_id.to_string();
714 let url = self.url_v3(&["functions", tenant, namespace, name, &id, "stats"])?;
715 let resp = self.send(self.http.request(Method::GET, url)).await?;
716 json_ok(resp).await
717 }
718
719 // --- Pulsar Functions (URL-based register / update) ----------------
720
721 /// Register a function from a remote package URL.
722 ///
723 /// `POST /admin/v3/functions/{tenant}/{namespace}/{name}` with a
724 /// `multipart/form-data` body carrying two parts: `url=<pkg-url>`
725 /// (the broker-resolvable HTTP / `function://` / `file://` URL of
726 /// the compiled package) and `functionConfig=<json>` (the
727 /// serialised [`FunctionConfig`]). The local-file upload path
728 /// (Java's `FunctionsBase#registerFunction` with a
729 /// `FormDataMultiPart` `data` part) is intentionally out of scope
730 /// for this method — operators with a pre-built JAR served over
731 /// HTTP / S3 / GCS use the URL path.
732 ///
733 /// The two-part envelope matches Java's
734 /// `FunctionsBase#registerFunction(@PathParam tenant, ...,
735 /// @FormDataParam("url") String functionPkgUrl,
736 /// @FormDataParam("functionConfig") FunctionConfig functionConfig)`
737 /// — when `data` is null and `url` is non-null the broker takes the
738 /// URL fast-path and skips the upload step.
739 pub async fn function_create_with_url(
740 &self,
741 tenant: &str,
742 namespace: &str,
743 name: &str,
744 url: &str,
745 config: FunctionConfig,
746 ) -> Result<(), AdminError> {
747 validate_segment(tenant)?;
748 validate_segment(namespace)?;
749 validate_segment(name)?;
750 let endpoint = self.url_v3(&["functions", tenant, namespace, name])?;
751 let form = function_pkg_form(url, &config)?;
752 let resp = self
753 .send(
754 self.http
755 .request(Method::POST, endpoint)
756 .multipart(form)
757 .timeout(PACKAGE_REGISTER_TIMEOUT),
758 )
759 .await?;
760 empty_ok(resp).await
761 }
762
763 /// Update an existing function from a remote package URL.
764 ///
765 /// `PUT /admin/v3/functions/{tenant}/{namespace}/{name}` with the
766 /// same two-part `multipart/form-data` shape as
767 /// [`Self::function_create_with_url`]. Java:
768 /// `FunctionsBase#updateFunction` with non-null `pkgUrl`.
769 pub async fn function_update_with_url(
770 &self,
771 tenant: &str,
772 namespace: &str,
773 name: &str,
774 url: &str,
775 config: FunctionConfig,
776 ) -> Result<(), AdminError> {
777 validate_segment(tenant)?;
778 validate_segment(namespace)?;
779 validate_segment(name)?;
780 let endpoint = self.url_v3(&["functions", tenant, namespace, name])?;
781 let form = function_pkg_form(url, &config)?;
782 let resp = self
783 .send(
784 self.http
785 .request(Method::PUT, endpoint)
786 .multipart(form)
787 .timeout(PACKAGE_REGISTER_TIMEOUT),
788 )
789 .await?;
790 empty_ok(resp).await
791 }
792
793 // --- Pulsar Functions (lifecycle) -----------------------------------
794
795 /// Deregister (delete) a function.
796 ///
797 /// `DELETE /admin/v3/functions/{tenant}/{namespace}/{name}`. The
798 /// broker stops every running instance and drops the
799 /// `FunctionConfig` from metadata. Java:
800 /// `FunctionsBase#deregisterFunction`.
801 pub async fn function_delete(
802 &self,
803 tenant: &str,
804 namespace: &str,
805 name: &str,
806 ) -> Result<(), AdminError> {
807 validate_segment(tenant)?;
808 validate_segment(namespace)?;
809 validate_segment(name)?;
810 let url = self.url_v3(&["functions", tenant, namespace, name])?;
811 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
812 empty_ok(resp).await
813 }
814
815 /// Start every instance of a function (idempotent).
816 ///
817 /// `POST /admin/v3/functions/{tenant}/{namespace}/{name}/start`. No
818 /// body. Java: `FunctionsBase#startFunction`.
819 pub async fn function_start(
820 &self,
821 tenant: &str,
822 namespace: &str,
823 name: &str,
824 ) -> Result<(), AdminError> {
825 validate_segment(tenant)?;
826 validate_segment(namespace)?;
827 validate_segment(name)?;
828 let url = self.url_v3(&["functions", tenant, namespace, name, "start"])?;
829 let resp = self.send(self.http.request(Method::POST, url)).await?;
830 empty_ok(resp).await
831 }
832
833 /// Stop every instance of a function.
834 ///
835 /// `POST /admin/v3/functions/{tenant}/{namespace}/{name}/stop`. The
836 /// broker leaves the `FunctionConfig` in metadata; a subsequent
837 /// `function_start` brings it back. Java:
838 /// `FunctionsBase#stopFunction`.
839 pub async fn function_stop(
840 &self,
841 tenant: &str,
842 namespace: &str,
843 name: &str,
844 ) -> Result<(), AdminError> {
845 validate_segment(tenant)?;
846 validate_segment(namespace)?;
847 validate_segment(name)?;
848 let url = self.url_v3(&["functions", tenant, namespace, name, "stop"])?;
849 let resp = self.send(self.http.request(Method::POST, url)).await?;
850 empty_ok(resp).await
851 }
852
853 /// Restart every instance of a function.
854 ///
855 /// `POST /admin/v3/functions/{tenant}/{namespace}/{name}/restart`.
856 /// Java: `FunctionsBase#restartFunction`.
857 pub async fn function_restart(
858 &self,
859 tenant: &str,
860 namespace: &str,
861 name: &str,
862 ) -> Result<(), AdminError> {
863 validate_segment(tenant)?;
864 validate_segment(namespace)?;
865 validate_segment(name)?;
866 let url = self.url_v3(&["functions", tenant, namespace, name, "restart"])?;
867 let resp = self.send(self.http.request(Method::POST, url)).await?;
868 empty_ok(resp).await
869 }
870
871 /// Start one specific instance.
872 ///
873 /// `POST /admin/v3/functions/{tenant}/{namespace}/{name}/{instance_id}/start`.
874 /// Java: `FunctionsBase#startFunctionInstance`.
875 pub async fn function_start_instance(
876 &self,
877 tenant: &str,
878 namespace: &str,
879 name: &str,
880 instance_id: i32,
881 ) -> Result<(), AdminError> {
882 validate_segment(tenant)?;
883 validate_segment(namespace)?;
884 validate_segment(name)?;
885 let id = instance_id.to_string();
886 let url = self.url_v3(&["functions", tenant, namespace, name, &id, "start"])?;
887 let resp = self.send(self.http.request(Method::POST, url)).await?;
888 empty_ok(resp).await
889 }
890
891 /// Stop one specific instance.
892 ///
893 /// `POST /admin/v3/functions/{tenant}/{namespace}/{name}/{instance_id}/stop`.
894 /// Java: `FunctionsBase#stopFunctionInstance`.
895 pub async fn function_stop_instance(
896 &self,
897 tenant: &str,
898 namespace: &str,
899 name: &str,
900 instance_id: i32,
901 ) -> Result<(), AdminError> {
902 validate_segment(tenant)?;
903 validate_segment(namespace)?;
904 validate_segment(name)?;
905 let id = instance_id.to_string();
906 let url = self.url_v3(&["functions", tenant, namespace, name, &id, "stop"])?;
907 let resp = self.send(self.http.request(Method::POST, url)).await?;
908 empty_ok(resp).await
909 }
910
911 // --- Tenants ---------------------------------------------------------
912
913 /// List tenants.
914 ///
915 /// `GET /admin/v2/tenants`.
916 /// Java: `pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Tenants.java`
917 /// (`@Path("/tenants")`) + `admin/impl/TenantsBase.java#getTenants`.
918 pub async fn tenants_list(&self) -> Result<Vec<String>, AdminError> {
919 let url = self.url(&["tenants"])?;
920 let resp = self.send(self.http.request(Method::GET, url)).await?;
921 json_ok(resp).await
922 }
923
924 /// Create a tenant.
925 ///
926 /// `PUT /admin/v2/tenants/{tenant}` with a JSON [`TenantInfo`] body.
927 /// Java: `pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantsBase.java#
928 /// createTenant`.
929 pub async fn tenant_create(&self, name: &str, info: TenantInfo) -> Result<(), AdminError> {
930 let url = self.url(&["tenants", name])?;
931 let resp = self
932 .send(self.http.request(Method::PUT, url).json(&info))
933 .await?;
934 empty_ok(resp).await
935 }
936
937 /// Delete a tenant.
938 ///
939 /// `DELETE /admin/v2/tenants/{tenant}`.
940 /// Java: `pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantsBase.java#
941 /// deleteTenant`.
942 pub async fn tenant_delete(&self, name: &str) -> Result<(), AdminError> {
943 let url = self.url(&["tenants", name])?;
944 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
945 empty_ok(resp).await
946 }
947
948 // --- Namespaces ------------------------------------------------------
949
950 /// List namespaces under a tenant.
951 ///
952 /// `GET /admin/v2/namespaces/{tenant}`.
953 /// Java: `pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java`
954 /// (`@Path("/namespaces")` + `@Path("/{tenant}")`).
955 pub async fn namespaces_list(&self, tenant: &str) -> Result<Vec<String>, AdminError> {
956 let url = self.url(&["namespaces", tenant])?;
957 let resp = self.send(self.http.request(Method::GET, url)).await?;
958 json_ok(resp).await
959 }
960
961 /// Create a namespace.
962 ///
963 /// `PUT /admin/v2/namespaces/{tenant}/{namespace}`. The namespace argument
964 /// is `tenant/namespace`, matching how Pulsar expresses fully qualified
965 /// namespace names on the wire and CLI.
966 /// Java: `pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java`
967 /// (`@PUT @Path("/{tenant}/{namespace}")`).
968 pub async fn namespace_create(&self, ns: &str) -> Result<(), AdminError> {
969 let (tenant, namespace) = split_namespace(ns)?;
970 let url = self.url(&["namespaces", tenant, namespace])?;
971 let resp = self.send(self.http.request(Method::PUT, url)).await?;
972 empty_ok(resp).await
973 }
974
975 /// Delete a namespace.
976 ///
977 /// `DELETE /admin/v2/namespaces/{tenant}/{namespace}`.
978 /// Java: `pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java`
979 /// (`@DELETE @Path("/{tenant}/{namespace}")`).
980 pub async fn namespace_delete(&self, ns: &str) -> Result<(), AdminError> {
981 let (tenant, namespace) = split_namespace(ns)?;
982 let url = self.url(&["namespaces", tenant, namespace])?;
983 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
984 empty_ok(resp).await
985 }
986
987 /// Get a namespace's retention policy.
988 ///
989 /// `GET /admin/v2/namespaces/{tenant}/{ns}/retention`.
990 /// Returns `RetentionPolicies { retentionTimeInMinutes, retentionSizeInMB }`.
991 /// A fresh namespace, or a namespace whose retention was just
992 /// removed, surfaces as 204 / empty body / `null` — we fold those
993 /// to `RetentionPolicies::default()` (broker semantic).
994 /// Java: `NamespacesBase#getRetention`.
995 pub async fn namespace_get_retention(&self, ns: &str) -> Result<RetentionPolicies, AdminError> {
996 let (tenant, namespace) = split_namespace(ns)?;
997 let url = self.url(&["namespaces", tenant, namespace, "retention"])?;
998 let resp = self.send(self.http.request(Method::GET, url)).await?;
999 json_ok_or_default(resp).await
1000 }
1001
1002 /// Set a namespace's retention policy.
1003 ///
1004 /// `POST /admin/v2/namespaces/{tenant}/{ns}/retention` with a JSON
1005 /// `RetentionPolicies` body. `-1` means infinite (size or time).
1006 /// Java: `NamespacesBase#setRetention`.
1007 pub async fn namespace_set_retention(
1008 &self,
1009 ns: &str,
1010 policy: RetentionPolicies,
1011 ) -> Result<(), AdminError> {
1012 let (tenant, namespace) = split_namespace(ns)?;
1013 let url = self.url(&["namespaces", tenant, namespace, "retention"])?;
1014 let resp = self
1015 .send(self.http.request(Method::POST, url).json(&policy))
1016 .await?;
1017 empty_ok(resp).await
1018 }
1019
1020 /// Remove a namespace's retention policy (fall back to broker default).
1021 ///
1022 /// `DELETE /admin/v2/namespaces/{tenant}/{ns}/retention`.
1023 /// Java: `NamespacesBase#removeRetention`.
1024 pub async fn namespace_remove_retention(&self, ns: &str) -> Result<(), AdminError> {
1025 let (tenant, namespace) = split_namespace(ns)?;
1026 let url = self.url(&["namespaces", tenant, namespace, "retention"])?;
1027 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
1028 empty_ok(resp).await
1029 }
1030
1031 /// Get all backlog-quota policies on a namespace.
1032 ///
1033 /// `GET /admin/v2/namespaces/{tenant}/{ns}/backlogQuotaMap`. Returns
1034 /// `Map<BacklogQuotaType, BacklogQuota>` — kept as raw JSON because
1035 /// broker versions add quota types (`message_age` since 2.10).
1036 /// Java: `NamespacesBase#getBacklogQuotaMap`.
1037 pub async fn namespace_get_backlog_quotas(
1038 &self,
1039 ns: &str,
1040 ) -> Result<serde_json::Value, AdminError> {
1041 let (tenant, namespace) = split_namespace(ns)?;
1042 let url = self.url(&["namespaces", tenant, namespace, "backlogQuotaMap"])?;
1043 let resp = self.send(self.http.request(Method::GET, url)).await?;
1044 json_ok(resp).await
1045 }
1046
1047 /// Set a backlog-quota policy on a namespace.
1048 ///
1049 /// `POST /admin/v2/namespaces/{tenant}/{ns}/backlogQuota?backlogQuotaType={type}`
1050 /// with a JSON `BacklogQuota` body. `backlog_quota_type` selects which
1051 /// dimension to limit (`destination_storage` for byte size, `message_age`
1052 /// for wall-clock TTL).
1053 /// Java: `NamespacesBase#setBacklogQuota`.
1054 pub async fn namespace_set_backlog_quota(
1055 &self,
1056 ns: &str,
1057 backlog_quota_type: BacklogQuotaType,
1058 quota: BacklogQuota,
1059 ) -> Result<(), AdminError> {
1060 let (tenant, namespace) = split_namespace(ns)?;
1061 let mut url = self.url(&["namespaces", tenant, namespace, "backlogQuota"])?;
1062 url.query_pairs_mut()
1063 .append_pair("backlogQuotaType", backlog_quota_type.as_query_value());
1064 let resp = self
1065 .send(self.http.request(Method::POST, url).json("a))
1066 .await?;
1067 empty_ok(resp).await
1068 }
1069
1070 /// Remove a backlog-quota policy from a namespace.
1071 ///
1072 /// `DELETE /admin/v2/namespaces/{tenant}/{ns}/backlogQuota?backlogQuotaType={type}`.
1073 /// Java: `NamespacesBase#removeBacklogQuota`.
1074 pub async fn namespace_remove_backlog_quota(
1075 &self,
1076 ns: &str,
1077 backlog_quota_type: BacklogQuotaType,
1078 ) -> Result<(), AdminError> {
1079 let (tenant, namespace) = split_namespace(ns)?;
1080 let mut url = self.url(&["namespaces", tenant, namespace, "backlogQuota"])?;
1081 url.query_pairs_mut()
1082 .append_pair("backlogQuotaType", backlog_quota_type.as_query_value());
1083 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
1084 empty_ok(resp).await
1085 }
1086
1087 /// Get a namespace's message-TTL (seconds).
1088 ///
1089 /// `GET /admin/v2/namespaces/{tenant}/{ns}/messageTTL`. Returns a
1090 /// bare integer (or `null` if no TTL is set — which decodes as
1091 /// `Option::None`).
1092 /// Java: `NamespacesBase#getNamespaceMessageTTL`.
1093 pub async fn namespace_get_message_ttl(&self, ns: &str) -> Result<Option<i32>, AdminError> {
1094 let (tenant, namespace) = split_namespace(ns)?;
1095 let url = self.url(&["namespaces", tenant, namespace, "messageTTL"])?;
1096 let resp = self.send(self.http.request(Method::GET, url)).await?;
1097 json_ok_optional(resp).await
1098 }
1099
1100 /// Set a namespace's message-TTL (seconds).
1101 ///
1102 /// `POST /admin/v2/namespaces/{tenant}/{ns}/messageTTL` with a bare
1103 /// integer body. `0` disables (broker treats as no TTL).
1104 /// Java: `NamespacesBase#setNamespaceMessageTTL`.
1105 pub async fn namespace_set_message_ttl(
1106 &self,
1107 ns: &str,
1108 ttl_seconds: i32,
1109 ) -> Result<(), AdminError> {
1110 let (tenant, namespace) = split_namespace(ns)?;
1111 let url = self.url(&["namespaces", tenant, namespace, "messageTTL"])?;
1112 let resp = self
1113 .send(self.http.request(Method::POST, url).json(&ttl_seconds))
1114 .await?;
1115 empty_ok(resp).await
1116 }
1117
1118 /// Remove a namespace's message-TTL (fall back to broker default).
1119 ///
1120 /// `DELETE /admin/v2/namespaces/{tenant}/{ns}/messageTTL`.
1121 /// Java: `NamespacesBase#removeNamespaceMessageTTL`.
1122 pub async fn namespace_remove_message_ttl(&self, ns: &str) -> Result<(), AdminError> {
1123 let (tenant, namespace) = split_namespace(ns)?;
1124 let url = self.url(&["namespaces", tenant, namespace, "messageTTL"])?;
1125 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
1126 empty_ok(resp).await
1127 }
1128
1129 // --- Namespace policies — persistence + rates ----------------------
1130
1131 /// Get a namespace's persistence policy.
1132 ///
1133 /// `GET /admin/v2/namespaces/{tenant}/{ns}/persistence`. Returns the
1134 /// `BookKeeper` ensemble / write-quorum / ack-quorum triple plus the
1135 /// managed-ledger mark-delete rate cap. `null` body decodes to
1136 /// `PersistencePolicies::default()` via `#[serde(default)]`.
1137 /// Java: `NamespacesBase#getPersistence`.
1138 pub async fn namespace_get_persistence(
1139 &self,
1140 ns: &str,
1141 ) -> Result<PersistencePolicies, AdminError> {
1142 let (tenant, namespace) = split_namespace(ns)?;
1143 let url = self.url(&["namespaces", tenant, namespace, "persistence"])?;
1144 let resp = self.send(self.http.request(Method::GET, url)).await?;
1145 json_ok_or_default(resp).await
1146 }
1147
1148 /// Set a namespace's persistence policy.
1149 ///
1150 /// `POST /admin/v2/namespaces/{tenant}/{ns}/persistence` with a JSON
1151 /// `PersistencePolicies` body.
1152 /// Java: `NamespacesBase#setPersistence`.
1153 pub async fn namespace_set_persistence(
1154 &self,
1155 ns: &str,
1156 policy: PersistencePolicies,
1157 ) -> Result<(), AdminError> {
1158 let (tenant, namespace) = split_namespace(ns)?;
1159 let url = self.url(&["namespaces", tenant, namespace, "persistence"])?;
1160 let resp = self
1161 .send(self.http.request(Method::POST, url).json(&policy))
1162 .await?;
1163 empty_ok(resp).await
1164 }
1165
1166 /// Remove a namespace's persistence policy (fall back to broker default).
1167 ///
1168 /// `DELETE /admin/v2/namespaces/{tenant}/{ns}/persistence`.
1169 /// Java: `NamespacesBase#deletePersistence`.
1170 pub async fn namespace_remove_persistence(&self, ns: &str) -> Result<(), AdminError> {
1171 let (tenant, namespace) = split_namespace(ns)?;
1172 let url = self.url(&["namespaces", tenant, namespace, "persistence"])?;
1173 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
1174 empty_ok(resp).await
1175 }
1176
1177 /// Get a namespace's consumer dispatch-rate policy.
1178 ///
1179 /// `GET /admin/v2/namespaces/{tenant}/{ns}/dispatchRate`. Returns
1180 /// the per-namespace consumer-dispatch throttle (msg/sec, byte/sec,
1181 /// window in seconds). `-1` on either dimension means unlimited.
1182 /// Java: `NamespacesBase#getDispatchRate`.
1183 pub async fn namespace_get_dispatch_rate(&self, ns: &str) -> Result<DispatchRate, AdminError> {
1184 let (tenant, namespace) = split_namespace(ns)?;
1185 let url = self.url(&["namespaces", tenant, namespace, "dispatchRate"])?;
1186 let resp = self.send(self.http.request(Method::GET, url)).await?;
1187 json_ok_or_default(resp).await
1188 }
1189
1190 /// Set a namespace's consumer dispatch-rate policy.
1191 ///
1192 /// `POST /admin/v2/namespaces/{tenant}/{ns}/dispatchRate` with a
1193 /// JSON `DispatchRate` body.
1194 /// Java: `NamespacesBase#setDispatchRate`.
1195 pub async fn namespace_set_dispatch_rate(
1196 &self,
1197 ns: &str,
1198 rate: DispatchRate,
1199 ) -> Result<(), AdminError> {
1200 let (tenant, namespace) = split_namespace(ns)?;
1201 let url = self.url(&["namespaces", tenant, namespace, "dispatchRate"])?;
1202 let resp = self
1203 .send(self.http.request(Method::POST, url).json(&rate))
1204 .await?;
1205 empty_ok(resp).await
1206 }
1207
1208 /// Remove a namespace's consumer dispatch-rate policy.
1209 ///
1210 /// `DELETE /admin/v2/namespaces/{tenant}/{ns}/dispatchRate`.
1211 /// Java: `NamespacesBase#deleteDispatchRate`.
1212 pub async fn namespace_remove_dispatch_rate(&self, ns: &str) -> Result<(), AdminError> {
1213 let (tenant, namespace) = split_namespace(ns)?;
1214 let url = self.url(&["namespaces", tenant, namespace, "dispatchRate"])?;
1215 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
1216 empty_ok(resp).await
1217 }
1218
1219 /// Get a namespace's per-subscription dispatch-rate policy.
1220 ///
1221 /// `GET /admin/v2/namespaces/{tenant}/{ns}/subscriptionDispatchRate`.
1222 /// Reuses the [`DispatchRate`] body shape — the policy applies per
1223 /// subscription rather than aggregated across all consumers.
1224 /// Java: `NamespacesBase#getSubscriptionDispatchRate`.
1225 pub async fn namespace_get_subscription_dispatch_rate(
1226 &self,
1227 ns: &str,
1228 ) -> Result<DispatchRate, AdminError> {
1229 let (tenant, namespace) = split_namespace(ns)?;
1230 let url = self.url(&["namespaces", tenant, namespace, "subscriptionDispatchRate"])?;
1231 let resp = self.send(self.http.request(Method::GET, url)).await?;
1232 json_ok(resp).await
1233 }
1234
1235 /// Set a namespace's per-subscription dispatch-rate policy.
1236 ///
1237 /// `POST /admin/v2/namespaces/{tenant}/{ns}/subscriptionDispatchRate`
1238 /// with a JSON `DispatchRate` body.
1239 /// Java: `NamespacesBase#setSubscriptionDispatchRate`.
1240 pub async fn namespace_set_subscription_dispatch_rate(
1241 &self,
1242 ns: &str,
1243 rate: DispatchRate,
1244 ) -> Result<(), AdminError> {
1245 let (tenant, namespace) = split_namespace(ns)?;
1246 let url = self.url(&["namespaces", tenant, namespace, "subscriptionDispatchRate"])?;
1247 let resp = self
1248 .send(self.http.request(Method::POST, url).json(&rate))
1249 .await?;
1250 empty_ok(resp).await
1251 }
1252
1253 /// Remove a namespace's per-subscription dispatch-rate policy.
1254 ///
1255 /// `DELETE /admin/v2/namespaces/{tenant}/{ns}/subscriptionDispatchRate`.
1256 /// Java: `NamespacesBase#deleteSubscriptionDispatchRate`.
1257 pub async fn namespace_remove_subscription_dispatch_rate(
1258 &self,
1259 ns: &str,
1260 ) -> Result<(), AdminError> {
1261 let (tenant, namespace) = split_namespace(ns)?;
1262 let url = self.url(&["namespaces", tenant, namespace, "subscriptionDispatchRate"])?;
1263 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
1264 empty_ok(resp).await
1265 }
1266
1267 /// Get a namespace's cross-cluster replicator dispatch-rate policy.
1268 ///
1269 /// `GET /admin/v2/namespaces/{tenant}/{ns}/replicatorDispatchRate`.
1270 /// Reuses the [`DispatchRate`] body shape — the policy throttles
1271 /// outbound geo-replication traffic from this cluster.
1272 /// Java: `NamespacesBase#getReplicatorDispatchRate`.
1273 pub async fn namespace_get_replicator_dispatch_rate(
1274 &self,
1275 ns: &str,
1276 ) -> Result<DispatchRate, AdminError> {
1277 let (tenant, namespace) = split_namespace(ns)?;
1278 let url = self.url(&["namespaces", tenant, namespace, "replicatorDispatchRate"])?;
1279 let resp = self.send(self.http.request(Method::GET, url)).await?;
1280 json_ok(resp).await
1281 }
1282
1283 /// Set a namespace's cross-cluster replicator dispatch-rate policy.
1284 ///
1285 /// `POST /admin/v2/namespaces/{tenant}/{ns}/replicatorDispatchRate`
1286 /// with a JSON `DispatchRate` body.
1287 /// Java: `NamespacesBase#setReplicatorDispatchRate`.
1288 pub async fn namespace_set_replicator_dispatch_rate(
1289 &self,
1290 ns: &str,
1291 rate: DispatchRate,
1292 ) -> Result<(), AdminError> {
1293 let (tenant, namespace) = split_namespace(ns)?;
1294 let url = self.url(&["namespaces", tenant, namespace, "replicatorDispatchRate"])?;
1295 let resp = self
1296 .send(self.http.request(Method::POST, url).json(&rate))
1297 .await?;
1298 empty_ok(resp).await
1299 }
1300
1301 /// Remove a namespace's cross-cluster replicator dispatch-rate policy.
1302 ///
1303 /// `DELETE /admin/v2/namespaces/{tenant}/{ns}/replicatorDispatchRate`.
1304 /// Java: `NamespacesBase#removeReplicatorDispatchRate`.
1305 pub async fn namespace_remove_replicator_dispatch_rate(
1306 &self,
1307 ns: &str,
1308 ) -> Result<(), AdminError> {
1309 let (tenant, namespace) = split_namespace(ns)?;
1310 let url = self.url(&["namespaces", tenant, namespace, "replicatorDispatchRate"])?;
1311 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
1312 empty_ok(resp).await
1313 }
1314
1315 /// Get a namespace's publish-rate policy.
1316 ///
1317 /// `GET /admin/v2/namespaces/{tenant}/{ns}/publishRate`. Returns
1318 /// the producer-side throttle (msg/sec + byte/sec). `-1` on either
1319 /// dimension means unlimited.
1320 /// Java: `NamespacesBase#getPublishRate`.
1321 pub async fn namespace_get_publish_rate(&self, ns: &str) -> Result<PublishRate, AdminError> {
1322 let (tenant, namespace) = split_namespace(ns)?;
1323 let url = self.url(&["namespaces", tenant, namespace, "publishRate"])?;
1324 let resp = self.send(self.http.request(Method::GET, url)).await?;
1325 json_ok_or_default(resp).await
1326 }
1327
1328 /// Set a namespace's publish-rate policy.
1329 ///
1330 /// `POST /admin/v2/namespaces/{tenant}/{ns}/publishRate` with a JSON
1331 /// `PublishRate` body.
1332 /// Java: `NamespacesBase#setPublishRate`.
1333 pub async fn namespace_set_publish_rate(
1334 &self,
1335 ns: &str,
1336 rate: PublishRate,
1337 ) -> Result<(), AdminError> {
1338 let (tenant, namespace) = split_namespace(ns)?;
1339 let url = self.url(&["namespaces", tenant, namespace, "publishRate"])?;
1340 let resp = self
1341 .send(self.http.request(Method::POST, url).json(&rate))
1342 .await?;
1343 empty_ok(resp).await
1344 }
1345
1346 /// Remove a namespace's publish-rate policy.
1347 ///
1348 /// `DELETE /admin/v2/namespaces/{tenant}/{ns}/publishRate`.
1349 /// Java: `NamespacesBase#removePublishRate`.
1350 pub async fn namespace_remove_publish_rate(&self, ns: &str) -> Result<(), AdminError> {
1351 let (tenant, namespace) = split_namespace(ns)?;
1352 let url = self.url(&["namespaces", tenant, namespace, "publishRate"])?;
1353 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
1354 empty_ok(resp).await
1355 }
1356
1357 // --- Namespace policies — limits + dedup + delayed delivery -----
1358
1359 /// Get a namespace's broker-side message deduplication flag.
1360 ///
1361 /// `GET /admin/v2/namespaces/{tenant}/{ns}/deduplication`. Returns a
1362 /// bare JSON boolean, or `null` (decoded as `None`) when the policy
1363 /// is unset and the broker default applies.
1364 /// Java: `NamespacesBase#getDeduplication`.
1365 pub async fn namespace_get_deduplication(&self, ns: &str) -> Result<Option<bool>, AdminError> {
1366 let (tenant, namespace) = split_namespace(ns)?;
1367 let url = self.url(&["namespaces", tenant, namespace, "deduplication"])?;
1368 let resp = self.send(self.http.request(Method::GET, url)).await?;
1369 json_ok_optional(resp).await
1370 }
1371
1372 /// Set a namespace's broker-side message deduplication flag.
1373 ///
1374 /// `POST /admin/v2/namespaces/{tenant}/{ns}/deduplication` with a
1375 /// bare JSON boolean body.
1376 /// Java: `NamespacesBase#modifyDeduplication`.
1377 pub async fn namespace_set_deduplication(
1378 &self,
1379 ns: &str,
1380 enabled: bool,
1381 ) -> Result<(), AdminError> {
1382 let (tenant, namespace) = split_namespace(ns)?;
1383 let url = self.url(&["namespaces", tenant, namespace, "deduplication"])?;
1384 let resp = self
1385 .send(self.http.request(Method::POST, url).json(&enabled))
1386 .await?;
1387 empty_ok(resp).await
1388 }
1389
1390 /// Remove a namespace's deduplication flag (fall back to broker default).
1391 ///
1392 /// `DELETE /admin/v2/namespaces/{tenant}/{ns}/deduplication`.
1393 /// Java: `NamespacesBase#removeDeduplication`.
1394 pub async fn namespace_remove_deduplication(&self, ns: &str) -> Result<(), AdminError> {
1395 let (tenant, namespace) = split_namespace(ns)?;
1396 let url = self.url(&["namespaces", tenant, namespace, "deduplication"])?;
1397 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
1398 empty_ok(resp).await
1399 }
1400
1401 /// Get a namespace's deduplication-snapshot interval (entries).
1402 ///
1403 /// `GET /admin/v2/namespaces/{tenant}/{ns}/deduplicationSnapshotInterval`.
1404 /// Returns a bare integer (the entry count between dedup cursor
1405 /// snapshots), or `null` (decoded as `None`) when the broker default
1406 /// applies.
1407 /// Java: `NamespacesBase#getDeduplicationSnapshotInterval`.
1408 pub async fn namespace_get_deduplication_snapshot_interval(
1409 &self,
1410 ns: &str,
1411 ) -> Result<Option<i32>, AdminError> {
1412 let (tenant, namespace) = split_namespace(ns)?;
1413 let url = self.url(&[
1414 "namespaces",
1415 tenant,
1416 namespace,
1417 "deduplicationSnapshotInterval",
1418 ])?;
1419 let resp = self.send(self.http.request(Method::GET, url)).await?;
1420 json_ok_optional(resp).await
1421 }
1422
1423 /// Set a namespace's deduplication-snapshot interval (entries).
1424 ///
1425 /// `POST /admin/v2/namespaces/{tenant}/{ns}/deduplicationSnapshotInterval`
1426 /// with a bare JSON integer body.
1427 /// Java: `NamespacesBase#setDeduplicationSnapshotInterval`.
1428 pub async fn namespace_set_deduplication_snapshot_interval(
1429 &self,
1430 ns: &str,
1431 interval_entries: i32,
1432 ) -> Result<(), AdminError> {
1433 let (tenant, namespace) = split_namespace(ns)?;
1434 let url = self.url(&[
1435 "namespaces",
1436 tenant,
1437 namespace,
1438 "deduplicationSnapshotInterval",
1439 ])?;
1440 let resp = self
1441 .send(self.http.request(Method::POST, url).json(&interval_entries))
1442 .await?;
1443 empty_ok(resp).await
1444 }
1445
1446 /// Remove a namespace's deduplication-snapshot interval override.
1447 ///
1448 /// `DELETE /admin/v2/namespaces/{tenant}/{ns}/deduplicationSnapshotInterval`.
1449 /// Java: `NamespacesBase#deleteDeduplicationSnapshotInterval`.
1450 pub async fn namespace_remove_deduplication_snapshot_interval(
1451 &self,
1452 ns: &str,
1453 ) -> Result<(), AdminError> {
1454 let (tenant, namespace) = split_namespace(ns)?;
1455 let url = self.url(&[
1456 "namespaces",
1457 tenant,
1458 namespace,
1459 "deduplicationSnapshotInterval",
1460 ])?;
1461 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
1462 empty_ok(resp).await
1463 }
1464
1465 /// Get a namespace's compaction threshold (bytes).
1466 ///
1467 /// `GET /admin/v2/namespaces/{tenant}/{ns}/compactionThreshold`. Returns
1468 /// a bare integer (bytes of accumulated topic backlog above which the
1469 /// broker triggers automatic compaction), or `null` (decoded as `None`)
1470 /// when the broker default applies.
1471 /// Java: `NamespacesBase#getCompactionThreshold`.
1472 pub async fn namespace_get_compaction_threshold(
1473 &self,
1474 ns: &str,
1475 ) -> Result<Option<i64>, AdminError> {
1476 let (tenant, namespace) = split_namespace(ns)?;
1477 let url = self.url(&["namespaces", tenant, namespace, "compactionThreshold"])?;
1478 let resp = self.send(self.http.request(Method::GET, url)).await?;
1479 json_ok_optional(resp).await
1480 }
1481
1482 /// Set a namespace's compaction threshold (bytes).
1483 ///
1484 /// `PUT /admin/v2/namespaces/{tenant}/{ns}/compactionThreshold` with
1485 /// a bare JSON long body. `0` disables automatic compaction.
1486 ///
1487 /// Note: this endpoint is `@PUT` in Pulsar 4 (`Namespaces.java`
1488 /// declares `@PUT @Path("/{tenant}/{namespace}/compactionThreshold")`),
1489 /// not POST — using POST yields a `405 Method Not Allowed`.
1490 /// Java: `NamespacesBase#setCompactionThreshold`.
1491 pub async fn namespace_set_compaction_threshold(
1492 &self,
1493 ns: &str,
1494 threshold_bytes: i64,
1495 ) -> Result<(), AdminError> {
1496 let (tenant, namespace) = split_namespace(ns)?;
1497 let url = self.url(&["namespaces", tenant, namespace, "compactionThreshold"])?;
1498 let resp = self
1499 .send(self.http.request(Method::PUT, url).json(&threshold_bytes))
1500 .await?;
1501 empty_ok(resp).await
1502 }
1503
1504 /// Remove a namespace's compaction threshold override.
1505 ///
1506 /// `DELETE /admin/v2/namespaces/{tenant}/{ns}/compactionThreshold`.
1507 /// Java: `NamespacesBase#deleteCompactionThreshold`.
1508 pub async fn namespace_remove_compaction_threshold(&self, ns: &str) -> Result<(), AdminError> {
1509 let (tenant, namespace) = split_namespace(ns)?;
1510 let url = self.url(&["namespaces", tenant, namespace, "compactionThreshold"])?;
1511 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
1512 empty_ok(resp).await
1513 }
1514
1515 /// Get a namespace's delayed-delivery policy.
1516 ///
1517 /// `GET /admin/v2/namespaces/{tenant}/{ns}/delayedDelivery`. Returns
1518 /// the active flag + tick time (the broker's index-tick granularity
1519 /// for delivering delayed messages). `null` decodes as `None`.
1520 /// Java: `NamespacesBase#getDelayedDeliveryPolicies`.
1521 pub async fn namespace_get_delayed_delivery(
1522 &self,
1523 ns: &str,
1524 ) -> Result<Option<DelayedDeliveryPolicies>, AdminError> {
1525 let (tenant, namespace) = split_namespace(ns)?;
1526 let url = self.url(&["namespaces", tenant, namespace, "delayedDelivery"])?;
1527 let resp = self.send(self.http.request(Method::GET, url)).await?;
1528 json_ok_optional(resp).await
1529 }
1530
1531 /// Set a namespace's delayed-delivery policy.
1532 ///
1533 /// `POST /admin/v2/namespaces/{tenant}/{ns}/delayedDelivery` with a
1534 /// JSON `DelayedDeliveryPolicies` body.
1535 /// Java: `NamespacesBase#setDelayedDeliveryPolicies`.
1536 pub async fn namespace_set_delayed_delivery(
1537 &self,
1538 ns: &str,
1539 policy: DelayedDeliveryPolicies,
1540 ) -> Result<(), AdminError> {
1541 let (tenant, namespace) = split_namespace(ns)?;
1542 let url = self.url(&["namespaces", tenant, namespace, "delayedDelivery"])?;
1543 let resp = self
1544 .send(self.http.request(Method::POST, url).json(&policy))
1545 .await?;
1546 empty_ok(resp).await
1547 }
1548
1549 /// Remove a namespace's delayed-delivery policy override.
1550 ///
1551 /// `DELETE /admin/v2/namespaces/{tenant}/{ns}/delayedDelivery`.
1552 /// Java: `NamespacesBase#removeDelayedDeliveryPolicies`.
1553 pub async fn namespace_remove_delayed_delivery(&self, ns: &str) -> Result<(), AdminError> {
1554 let (tenant, namespace) = split_namespace(ns)?;
1555 let url = self.url(&["namespaces", tenant, namespace, "delayedDelivery"])?;
1556 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
1557 empty_ok(resp).await
1558 }
1559
1560 /// Get a namespace's max-producers-per-topic limit.
1561 ///
1562 /// `GET /admin/v2/namespaces/{tenant}/{ns}/maxProducersPerTopic`. Returns
1563 /// a bare integer (the per-topic ceiling on concurrent producer
1564 /// connections), or `null` (decoded as `None`) when the broker default
1565 /// applies.
1566 /// Java: `NamespacesBase#getMaxProducersPerTopic`.
1567 pub async fn namespace_get_max_producers_per_topic(
1568 &self,
1569 ns: &str,
1570 ) -> Result<Option<i32>, AdminError> {
1571 let (tenant, namespace) = split_namespace(ns)?;
1572 let url = self.url(&["namespaces", tenant, namespace, "maxProducersPerTopic"])?;
1573 let resp = self.send(self.http.request(Method::GET, url)).await?;
1574 json_ok_optional(resp).await
1575 }
1576
1577 /// Set a namespace's max-producers-per-topic limit.
1578 ///
1579 /// `POST /admin/v2/namespaces/{tenant}/{ns}/maxProducersPerTopic` with
1580 /// a bare JSON integer body. `0` disables the limit.
1581 /// Java: `NamespacesBase#setMaxProducersPerTopic`.
1582 pub async fn namespace_set_max_producers_per_topic(
1583 &self,
1584 ns: &str,
1585 max_producers: i32,
1586 ) -> Result<(), AdminError> {
1587 let (tenant, namespace) = split_namespace(ns)?;
1588 let url = self.url(&["namespaces", tenant, namespace, "maxProducersPerTopic"])?;
1589 let resp = self
1590 .send(self.http.request(Method::POST, url).json(&max_producers))
1591 .await?;
1592 empty_ok(resp).await
1593 }
1594
1595 /// Remove a namespace's max-producers-per-topic limit override.
1596 ///
1597 /// `DELETE /admin/v2/namespaces/{tenant}/{ns}/maxProducersPerTopic`.
1598 /// Java: `NamespacesBase#removeMaxProducersPerTopic`.
1599 pub async fn namespace_remove_max_producers_per_topic(
1600 &self,
1601 ns: &str,
1602 ) -> Result<(), AdminError> {
1603 let (tenant, namespace) = split_namespace(ns)?;
1604 let url = self.url(&["namespaces", tenant, namespace, "maxProducersPerTopic"])?;
1605 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
1606 empty_ok(resp).await
1607 }
1608
1609 /// Get a namespace's max-consumers-per-topic limit.
1610 ///
1611 /// `GET /admin/v2/namespaces/{tenant}/{ns}/maxConsumersPerTopic`. Returns
1612 /// a bare integer (the per-topic ceiling on concurrent consumer
1613 /// connections across all subscriptions), or `null` (decoded as `None`)
1614 /// when the broker default applies.
1615 /// Java: `NamespacesBase#getMaxConsumersPerTopic`.
1616 pub async fn namespace_get_max_consumers_per_topic(
1617 &self,
1618 ns: &str,
1619 ) -> Result<Option<i32>, AdminError> {
1620 let (tenant, namespace) = split_namespace(ns)?;
1621 let url = self.url(&["namespaces", tenant, namespace, "maxConsumersPerTopic"])?;
1622 let resp = self.send(self.http.request(Method::GET, url)).await?;
1623 json_ok_optional(resp).await
1624 }
1625
1626 /// Set a namespace's max-consumers-per-topic limit.
1627 ///
1628 /// `POST /admin/v2/namespaces/{tenant}/{ns}/maxConsumersPerTopic` with
1629 /// a bare JSON integer body. `0` disables the limit.
1630 /// Java: `NamespacesBase#setMaxConsumersPerTopic`.
1631 pub async fn namespace_set_max_consumers_per_topic(
1632 &self,
1633 ns: &str,
1634 max_consumers: i32,
1635 ) -> Result<(), AdminError> {
1636 let (tenant, namespace) = split_namespace(ns)?;
1637 let url = self.url(&["namespaces", tenant, namespace, "maxConsumersPerTopic"])?;
1638 let resp = self
1639 .send(self.http.request(Method::POST, url).json(&max_consumers))
1640 .await?;
1641 empty_ok(resp).await
1642 }
1643
1644 /// Remove a namespace's max-consumers-per-topic limit override.
1645 ///
1646 /// `DELETE /admin/v2/namespaces/{tenant}/{ns}/maxConsumersPerTopic`.
1647 /// Java: `NamespacesBase#removeMaxConsumersPerTopic`.
1648 pub async fn namespace_remove_max_consumers_per_topic(
1649 &self,
1650 ns: &str,
1651 ) -> Result<(), AdminError> {
1652 let (tenant, namespace) = split_namespace(ns)?;
1653 let url = self.url(&["namespaces", tenant, namespace, "maxConsumersPerTopic"])?;
1654 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
1655 empty_ok(resp).await
1656 }
1657
1658 /// Get a namespace's max-unacked-messages-per-consumer limit.
1659 ///
1660 /// `GET /admin/v2/namespaces/{tenant}/{ns}/maxUnackedMessagesPerConsumer`.
1661 /// Returns a bare integer (the broker's per-consumer permit-pool cap
1662 /// before it stops dispatching), or `null` (decoded as `None`) when
1663 /// the broker default applies.
1664 /// Java: `NamespacesBase#getMaxUnackedMessagesPerConsumer`.
1665 pub async fn namespace_get_max_unacked_messages_per_consumer(
1666 &self,
1667 ns: &str,
1668 ) -> Result<Option<i32>, AdminError> {
1669 let (tenant, namespace) = split_namespace(ns)?;
1670 let url = self.url(&[
1671 "namespaces",
1672 tenant,
1673 namespace,
1674 "maxUnackedMessagesPerConsumer",
1675 ])?;
1676 let resp = self.send(self.http.request(Method::GET, url)).await?;
1677 json_ok_optional(resp).await
1678 }
1679
1680 /// Set a namespace's max-unacked-messages-per-consumer limit.
1681 ///
1682 /// `POST /admin/v2/namespaces/{tenant}/{ns}/maxUnackedMessagesPerConsumer`
1683 /// with a bare JSON integer body. `0` disables the limit.
1684 /// Java: `NamespacesBase#setMaxUnackedMessagesPerConsumer`.
1685 pub async fn namespace_set_max_unacked_messages_per_consumer(
1686 &self,
1687 ns: &str,
1688 max_unacked: i32,
1689 ) -> Result<(), AdminError> {
1690 let (tenant, namespace) = split_namespace(ns)?;
1691 let url = self.url(&[
1692 "namespaces",
1693 tenant,
1694 namespace,
1695 "maxUnackedMessagesPerConsumer",
1696 ])?;
1697 let resp = self
1698 .send(self.http.request(Method::POST, url).json(&max_unacked))
1699 .await?;
1700 empty_ok(resp).await
1701 }
1702
1703 /// Remove a namespace's max-unacked-messages-per-consumer override.
1704 ///
1705 /// `DELETE /admin/v2/namespaces/{tenant}/{ns}/maxUnackedMessagesPerConsumer`.
1706 /// Java: `NamespacesBase#removeMaxUnackedMessagesPerConsumer`.
1707 pub async fn namespace_remove_max_unacked_messages_per_consumer(
1708 &self,
1709 ns: &str,
1710 ) -> Result<(), AdminError> {
1711 let (tenant, namespace) = split_namespace(ns)?;
1712 let url = self.url(&[
1713 "namespaces",
1714 tenant,
1715 namespace,
1716 "maxUnackedMessagesPerConsumer",
1717 ])?;
1718 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
1719 empty_ok(resp).await
1720 }
1721
1722 /// Get a namespace's max-unacked-messages-per-subscription limit.
1723 ///
1724 /// `GET /admin/v2/namespaces/{tenant}/{ns}/maxUnackedMessagesPerSubscription`.
1725 /// Returns a bare integer (the broker's per-subscription unacked
1726 /// ceiling — once exceeded the broker stops dispatching to every
1727 /// consumer on that subscription), or `null` (decoded as `None`)
1728 /// when the broker default applies.
1729 /// Java: `NamespacesBase#getMaxUnackedMessagesPerSubscription`.
1730 pub async fn namespace_get_max_unacked_messages_per_subscription(
1731 &self,
1732 ns: &str,
1733 ) -> Result<Option<i32>, AdminError> {
1734 let (tenant, namespace) = split_namespace(ns)?;
1735 let url = self.url(&[
1736 "namespaces",
1737 tenant,
1738 namespace,
1739 "maxUnackedMessagesPerSubscription",
1740 ])?;
1741 let resp = self.send(self.http.request(Method::GET, url)).await?;
1742 json_ok_optional(resp).await
1743 }
1744
1745 /// Set a namespace's max-unacked-messages-per-subscription limit.
1746 ///
1747 /// `POST /admin/v2/namespaces/{tenant}/{ns}/maxUnackedMessagesPerSubscription`
1748 /// with a bare JSON integer body. `0` disables the limit.
1749 /// Java: `NamespacesBase#setMaxUnackedMessagesPerSubscription`.
1750 pub async fn namespace_set_max_unacked_messages_per_subscription(
1751 &self,
1752 ns: &str,
1753 max_unacked: i32,
1754 ) -> Result<(), AdminError> {
1755 let (tenant, namespace) = split_namespace(ns)?;
1756 let url = self.url(&[
1757 "namespaces",
1758 tenant,
1759 namespace,
1760 "maxUnackedMessagesPerSubscription",
1761 ])?;
1762 let resp = self
1763 .send(self.http.request(Method::POST, url).json(&max_unacked))
1764 .await?;
1765 empty_ok(resp).await
1766 }
1767
1768 /// Remove a namespace's max-unacked-messages-per-subscription override.
1769 ///
1770 /// `DELETE /admin/v2/namespaces/{tenant}/{ns}/maxUnackedMessagesPerSubscription`.
1771 /// Java: `NamespacesBase#removeMaxUnackedMessagesPerSubscription`.
1772 pub async fn namespace_remove_max_unacked_messages_per_subscription(
1773 &self,
1774 ns: &str,
1775 ) -> Result<(), AdminError> {
1776 let (tenant, namespace) = split_namespace(ns)?;
1777 let url = self.url(&[
1778 "namespaces",
1779 tenant,
1780 namespace,
1781 "maxUnackedMessagesPerSubscription",
1782 ])?;
1783 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
1784 empty_ok(resp).await
1785 }
1786
1787 // --- Topics ----------------------------------------------------------
1788
1789 /// List persistent topics in a namespace.
1790 ///
1791 /// `GET /admin/v2/persistent/{tenant}/{namespace}`.
1792 /// Java: `pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java`
1793 /// (`@Path("/persistent")` + `@GET @Path("/{tenant}/{namespace}")`).
1794 pub async fn topics_list(&self, namespace: &str) -> Result<Vec<String>, AdminError> {
1795 let (tenant, namespace) = split_namespace(namespace)?;
1796 let url = self.url(&["persistent", tenant, namespace])?;
1797 let resp = self.send(self.http.request(Method::GET, url)).await?;
1798 json_ok(resp).await
1799 }
1800
1801 /// Create a non-partitioned persistent topic.
1802 ///
1803 /// `PUT /admin/v2/persistent/{tenant}/{namespace}/{topic}`.
1804 /// Java: `PersistentTopics.java#createNonPartitionedTopic`
1805 /// (`@PUT @Path("/{tenant}/{namespace}/{topic}")`).
1806 pub async fn topic_create_non_partitioned(&self, topic: &str) -> Result<(), AdminError> {
1807 self.topic_create_non_partitioned_with_properties(topic, &HashMap::new())
1808 .await
1809 }
1810
1811 async fn topic_create_non_partitioned_with_properties(
1812 &self,
1813 topic: &str,
1814 properties: &HashMap<String, String>,
1815 ) -> Result<(), AdminError> {
1816 let (tenant, namespace, name) = split_topic(topic)?;
1817 let url = self.url(&["persistent", tenant, namespace, name])?;
1818 let resp = self
1819 .send(self.http.request(Method::PUT, url).json(properties))
1820 .await?;
1821 empty_ok(resp).await
1822 }
1823
1824 /// Create a partitioned topic with `partitions` partitions.
1825 ///
1826 /// `PUT /admin/v2/persistent/{tenant}/{namespace}/{topic}/partitions`
1827 /// with the partition count as a JSON integer body.
1828 /// Java: `PersistentTopics.java#createPartitionedTopic`
1829 /// (`@PUT @Path("/{tenant}/{namespace}/{topic}/partitions")`).
1830 pub async fn topic_create_partitioned(
1831 &self,
1832 topic: &str,
1833 partitions: u32,
1834 ) -> Result<(), AdminError> {
1835 let (tenant, namespace, name) = split_topic(topic)?;
1836 let url = self.url(&["persistent", tenant, namespace, name, "partitions"])?;
1837 let resp = self
1838 .send(self.http.request(Method::PUT, url).json(&partitions))
1839 .await?;
1840 empty_ok(resp).await
1841 }
1842
1843 /// Delete a topic, auto-detecting partitioned vs non-partitioned.
1844 ///
1845 /// Pulsar exposes two distinct delete endpoints — the partitioned
1846 /// parent uses `DELETE .../{topic}/partitions?force=…` and the
1847 /// non-partitioned topic uses `DELETE .../{topic}?force=…`. Hitting
1848 /// the partitioned endpoint on a non-partitioned topic returns 404
1849 /// ("Topic is not partitioned"), which used to surface as "the
1850 /// topic doesn't exist" to operators using `magnetar admin topics
1851 /// delete`.
1852 ///
1853 /// Probe via `topic_partitions_count` first (`GET .../partitions`
1854 /// returns `partitions: 0` for non-partitioned topics, `> 0` for a
1855 /// partitioned parent) and route to the matching endpoint. Same
1856 /// behaviour as pulsarctl's `topics delete`.
1857 ///
1858 /// Java: `PersistentTopics.java#deletePartitionedTopic` /
1859 /// `PersistentTopics.java#deleteTopic`.
1860 pub async fn topic_delete(&self, topic: &str, force: bool) -> Result<(), AdminError> {
1861 let (tenant, namespace, name) = split_topic(topic)?;
1862 let partitions = self.topic_partitions_count(topic).await?;
1863 let force_str = if force { "true" } else { "false" };
1864 let mut url = if partitions > 0 {
1865 self.url(&["persistent", tenant, namespace, name, "partitions"])?
1866 } else {
1867 self.url(&["persistent", tenant, namespace, name])?
1868 };
1869 url.query_pairs_mut().append_pair("force", force_str);
1870 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
1871 empty_ok(resp).await
1872 }
1873
1874 /// Get topic stats.
1875 ///
1876 /// `GET /admin/v2/persistent/{tenant}/{namespace}/{topic}/stats`.
1877 /// Java: `PersistentTopics.java#getStats`
1878 /// (`@GET @Path("/{tenant}/{namespace}/{topic}/stats")`,
1879 /// response shape `PersistentTopicStats`).
1880 ///
1881 /// For a **partitioned** topic, the broker returns 404 on this endpoint
1882 /// because there is no ledger backing the parent name. Call
1883 /// [`Self::topic_partitioned_stats`] instead, or look up the count via
1884 /// [`Self::topic_partitions_count`] first.
1885 pub async fn topic_stats(&self, topic: &str) -> Result<TopicStats, AdminError> {
1886 let (tenant, namespace, name) = split_topic(topic)?;
1887 let url = self.url(&["persistent", tenant, namespace, name, "stats"])?;
1888 let resp = self.send(self.http.request(Method::GET, url)).await?;
1889 json_ok(resp).await
1890 }
1891
1892 /// Get aggregated stats for a partitioned topic.
1893 ///
1894 /// `GET /admin/v2/persistent/{tenant}/{namespace}/{topic}/partitioned-stats?
1895 /// perPartition=false`. Java: `PersistentTopics.java#getPartitionedStats`
1896 /// (`@GET @Path("/{tenant}/{namespace}/{topic}/partitioned-stats")`,
1897 /// response shape `PartitionedTopicStats` which extends
1898 /// `PersistentTopicStats` with `partitions: Map<String, TopicStats>`
1899 /// and `metadata: PartitionedTopicMetadata`).
1900 ///
1901 /// magnetar exposes only the aggregated top-level counters through the
1902 /// same [`TopicStats`] shape — the broker populates `msgInCounter`,
1903 /// `bytesInCounter`, `publishers`, `subscriptions` at the response root
1904 /// summed across partitions. The `partitions` and `metadata` fields are
1905 /// dropped on deserialisation; for per-partition detail call
1906 /// [`Self::topic_stats`] on each `<topic>-partition-N` instead. We pass
1907 /// `perPartition=false` to keep the wire response small.
1908 pub async fn topic_partitioned_stats(&self, topic: &str) -> Result<TopicStats, AdminError> {
1909 let (tenant, namespace, name) = split_topic(topic)?;
1910 let mut url = self.url(&["persistent", tenant, namespace, name, "partitioned-stats"])?;
1911 url.query_pairs_mut().append_pair("perPartition", "false");
1912 let resp = self.send(self.http.request(Method::GET, url)).await?;
1913 json_ok(resp).await
1914 }
1915
1916 /// Resolve the partition count of a topic.
1917 ///
1918 /// `GET /admin/v2/persistent/{tenant}/{namespace}/{topic}/partitions`.
1919 /// Java: `PersistentTopics.java#getPartitionedMetadata`
1920 /// (`@GET @Path("/{tenant}/{namespace}/{topic}/partitions")`,
1921 /// response shape `PartitionedTopicMetadata{ partitions: int }`).
1922 ///
1923 /// Returns `0` for non-partitioned topics; lets a caller disambiguate
1924 /// between [`Self::topic_stats`] and [`Self::topic_partitioned_stats`]
1925 /// when the topology is not known in advance.
1926 pub async fn topic_partitions_count(&self, topic: &str) -> Result<u32, AdminError> {
1927 let (tenant, namespace, name) = split_topic(topic)?;
1928 let url = self.url(&["persistent", tenant, namespace, name, "partitions"])?;
1929 let resp = self.send(self.http.request(Method::GET, url)).await?;
1930 let meta: PartitionedTopicMetadata = json_ok(resp).await?;
1931 Ok(meta.partitions)
1932 }
1933
1934 /// Resolve a broker-entry-metadata `index` to a [`MessageId`] (PIP-415).
1935 ///
1936 /// `GET /admin/v2/persistent/{tenant}/{namespace}/{topic}/getMessageIdByIndex?index={index}`.
1937 /// Per [PIP-415](https://github.com/apache/pulsar/blob/master/pip/pip-415.md)
1938 /// this is **REST-only** — the spec's "Binary protocol" section is
1939 /// intentionally empty and the canonical implementation PR
1940 /// [`apache/pulsar#24222`](https://github.com/apache/pulsar/pull/24222)
1941 /// (merged 2025-06-23) touches only admin / broker / CLI Java code.
1942 ///
1943 /// Java:
1944 /// `pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java`
1945 /// (`@GET @Path("/{tenant}/{namespace}/{topic}/getMessageIdByIndex")`,
1946 /// `@QueryParam("index") long`); admin-client side is
1947 /// `pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/
1948 /// TopicsImpl.java#getMessageIdByIndexAsync` which deserialises the
1949 /// response into `MessageIdImpl` (i.e. `{ledgerId, entryId, partitionIndex}`).
1950 ///
1951 /// `topic` follows the same rule as every other topic-scoped method:
1952 /// either `persistent://tenant/ns/topic` or `tenant/ns/topic`. For a
1953 /// partitioned topic, pass the specific partition (`my-topic-partition-0`).
1954 ///
1955 /// The response carries only `(ledgerId, entryId, partitionIndex)`. The
1956 /// returned [`MessageId`] sets `batch_index = -1` and `batch_size = -1`
1957 /// because the broker resolves at entry granularity — see PIP-415 §"Why
1958 /// Precise Index Matching Isn't Implemented on the Broker Side".
1959 pub async fn topic_get_message_id_by_index(
1960 &self,
1961 topic: &str,
1962 index: i64,
1963 ) -> Result<MessageId, AdminError> {
1964 let (tenant, namespace, name) = split_topic(topic)?;
1965 let mut url = self.url(&["persistent", tenant, namespace, name, "getMessageIdByIndex"])?;
1966 url.query_pairs_mut()
1967 .append_pair("index", &index.to_string());
1968 let resp = self.send(self.http.request(Method::GET, url)).await?;
1969 let dto: MessageIdResponse = json_ok(resp).await?;
1970 dto.try_into_message_id()
1971 }
1972
1973 /// Trigger ledger compaction for a topic.
1974 ///
1975 /// `PUT /admin/v2/persistent/{tenant}/{namespace}/{topic}/compaction`.
1976 /// Returns 204 on success; the broker queues the work asynchronously —
1977 /// poll [`Self::topic_compaction_status`] to observe progress.
1978 /// Java: `PersistentTopics#triggerCompaction`.
1979 pub async fn topic_compact(&self, topic: &str) -> Result<(), AdminError> {
1980 let (tenant, namespace, name) = split_topic(topic)?;
1981 let url = self.url(&["persistent", tenant, namespace, name, "compaction"])?;
1982 let resp = self.send(self.http.request(Method::PUT, url)).await?;
1983 empty_ok(resp).await
1984 }
1985
1986 /// Get the current compaction status for a topic.
1987 ///
1988 /// `GET /admin/v2/persistent/{tenant}/{namespace}/{topic}/compaction`.
1989 /// Returns Java's `LongRunningProcessStatus`: `status` ∈ {`NOT_RUN`,
1990 /// `RUNNING`, `SUCCESS`, `ERROR`} plus an optional `lastError` string.
1991 /// Java: `PersistentTopics#compactionStatus`.
1992 pub async fn topic_compaction_status(
1993 &self,
1994 topic: &str,
1995 ) -> Result<LongRunningProcessStatus, AdminError> {
1996 let (tenant, namespace, name) = split_topic(topic)?;
1997 let url = self.url(&["persistent", tenant, namespace, name, "compaction"])?;
1998 let resp = self.send(self.http.request(Method::GET, url)).await?;
1999 json_ok(resp).await
2000 }
2001
2002 /// Unload a topic from its current broker (forces rebalancing).
2003 ///
2004 /// `PUT /admin/v2/persistent/{tenant}/{namespace}/{topic}/unload`.
2005 /// Operators use this to drain a hot broker or to re-elect ownership
2006 /// after a configuration change. Java: `PersistentTopics#unloadTopic`.
2007 pub async fn topic_unload(&self, topic: &str) -> Result<(), AdminError> {
2008 let (tenant, namespace, name) = split_topic(topic)?;
2009 let url = self.url(&["persistent", tenant, namespace, name, "unload"])?;
2010 let resp = self.send(self.http.request(Method::PUT, url)).await?;
2011 empty_ok(resp).await
2012 }
2013
2014 /// Terminate (seal) a topic — no further produces succeed.
2015 ///
2016 /// `POST /admin/v2/persistent/{tenant}/{namespace}/{topic}/terminate`.
2017 /// Returns the [`MessageId`] of the last message that landed before
2018 /// the seal, or `None` when the broker reports the
2019 /// `MessageIdImpl(-1, -1)` sentinel — meaning the topic was sealed
2020 /// before any confirmed entry was written (a freshly-created topic,
2021 /// or a topic whose owner was just re-elected). The Java client
2022 /// surfaces that case as `MessageId.earliest`; we use `Option` so
2023 /// callers don't have to special-case a magic value.
2024 /// Java: `PersistentTopics#terminate`.
2025 pub async fn topic_terminate(&self, topic: &str) -> Result<Option<MessageId>, AdminError> {
2026 let (tenant, namespace, name) = split_topic(topic)?;
2027 let url = self.url(&["persistent", tenant, namespace, name, "terminate"])?;
2028 let resp = self.send(self.http.request(Method::POST, url)).await?;
2029 let dto: MessageIdResponse = json_ok(resp).await?;
2030 if dto.ledger_id < 0 && dto.entry_id < 0 {
2031 return Ok(None);
2032 }
2033 dto.try_into_message_id().map(Some)
2034 }
2035
2036 /// Grow a partitioned topic's partition count.
2037 ///
2038 /// `POST /admin/v2/persistent/{tenant}/{namespace}/{topic}/partitions`
2039 /// with a bare JSON integer body. Only forward (grow) is supported by
2040 /// the broker — shrinking returns 409. Java:
2041 /// `PersistentTopics#updatePartitionedTopic`.
2042 pub async fn topic_update_partitions(
2043 &self,
2044 topic: &str,
2045 new_partitions: u32,
2046 ) -> Result<(), AdminError> {
2047 let (tenant, namespace, name) = split_topic(topic)?;
2048 let url = self.url(&["persistent", tenant, namespace, name, "partitions"])?;
2049 let resp = self
2050 .send(self.http.request(Method::POST, url).json(&new_partitions))
2051 .await?;
2052 empty_ok(resp).await
2053 }
2054
2055 // --- Topic policies — per-topic overrides ---------------------------
2056
2057 /// Get a topic's retention policy.
2058 ///
2059 /// `GET /admin/v2/persistent/{tenant}/{ns}/{topic}/retention`.
2060 /// Returns the per-topic [`RetentionPolicies`] override; the broker
2061 /// emits a `RetentionPolicies` JSON when the policy is set and a bare
2062 /// `null` (decoded as `RetentionPolicies::default()` via `#[serde(default)]`)
2063 /// when no override is in place — callers fall back to the namespace
2064 /// policy in that case. Java: `PersistentTopicsBase#getRetention`.
2065 pub async fn topic_get_retention(&self, topic: &str) -> Result<RetentionPolicies, AdminError> {
2066 let (tenant, namespace, name) = split_topic(topic)?;
2067 let url = self.url(&["persistent", tenant, namespace, name, "retention"])?;
2068 let resp = self.send(self.http.request(Method::GET, url)).await?;
2069 json_ok_or_default(resp).await
2070 }
2071
2072 /// Set a topic's retention policy (overrides the namespace default).
2073 ///
2074 /// `POST /admin/v2/persistent/{tenant}/{ns}/{topic}/retention` with a
2075 /// JSON `RetentionPolicies` body. `-1` means infinite (size or time).
2076 /// Java: `PersistentTopicsBase#setRetention`.
2077 pub async fn topic_set_retention(
2078 &self,
2079 topic: &str,
2080 policy: RetentionPolicies,
2081 ) -> Result<(), AdminError> {
2082 let (tenant, namespace, name) = split_topic(topic)?;
2083 let url = self.url(&["persistent", tenant, namespace, name, "retention"])?;
2084 let resp = self
2085 .send(self.http.request(Method::POST, url).json(&policy))
2086 .await?;
2087 empty_ok(resp).await
2088 }
2089
2090 /// Remove a topic's retention policy (fall back to namespace default).
2091 ///
2092 /// `DELETE /admin/v2/persistent/{tenant}/{ns}/{topic}/retention`.
2093 /// Java: `PersistentTopicsBase#removeRetention`.
2094 pub async fn topic_remove_retention(&self, topic: &str) -> Result<(), AdminError> {
2095 let (tenant, namespace, name) = split_topic(topic)?;
2096 let url = self.url(&["persistent", tenant, namespace, name, "retention"])?;
2097 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
2098 empty_ok(resp).await
2099 }
2100
2101 /// Get all backlog-quota policies on a topic.
2102 ///
2103 /// `GET /admin/v2/persistent/{tenant}/{ns}/{topic}/backlogQuotaMap`.
2104 /// Returns `Map<BacklogQuotaType, BacklogQuota>` — kept as raw JSON
2105 /// for the same reason as [`Self::namespace_get_backlog_quotas`]:
2106 /// broker minor versions add quota types.
2107 /// Java: `PersistentTopicsBase#getBacklogQuotaMap`.
2108 pub async fn topic_get_backlog_quotas(
2109 &self,
2110 topic: &str,
2111 ) -> Result<serde_json::Value, AdminError> {
2112 let (tenant, namespace, name) = split_topic(topic)?;
2113 let url = self.url(&["persistent", tenant, namespace, name, "backlogQuotaMap"])?;
2114 let resp = self.send(self.http.request(Method::GET, url)).await?;
2115 json_ok(resp).await
2116 }
2117
2118 /// Set a backlog-quota policy on a topic (overrides the namespace
2119 /// default for the matching `backlogQuotaType`).
2120 ///
2121 /// `POST /admin/v2/persistent/{tenant}/{ns}/{topic}/backlogQuota
2122 /// ?backlogQuotaType={type}` with a JSON `BacklogQuota` body.
2123 /// Java: `PersistentTopicsBase#setBacklogQuota`.
2124 pub async fn topic_set_backlog_quota(
2125 &self,
2126 topic: &str,
2127 backlog_quota_type: BacklogQuotaType,
2128 quota: BacklogQuota,
2129 ) -> Result<(), AdminError> {
2130 let (tenant, namespace, name) = split_topic(topic)?;
2131 let mut url = self.url(&["persistent", tenant, namespace, name, "backlogQuota"])?;
2132 url.query_pairs_mut()
2133 .append_pair("backlogQuotaType", backlog_quota_type.as_query_value());
2134 let resp = self
2135 .send(self.http.request(Method::POST, url).json("a))
2136 .await?;
2137 empty_ok(resp).await
2138 }
2139
2140 /// Remove a backlog-quota policy from a topic.
2141 ///
2142 /// `DELETE /admin/v2/persistent/{tenant}/{ns}/{topic}/backlogQuota
2143 /// ?backlogQuotaType={type}`.
2144 /// Java: `PersistentTopicsBase#removeBacklogQuota`.
2145 pub async fn topic_remove_backlog_quota(
2146 &self,
2147 topic: &str,
2148 backlog_quota_type: BacklogQuotaType,
2149 ) -> Result<(), AdminError> {
2150 let (tenant, namespace, name) = split_topic(topic)?;
2151 let mut url = self.url(&["persistent", tenant, namespace, name, "backlogQuota"])?;
2152 url.query_pairs_mut()
2153 .append_pair("backlogQuotaType", backlog_quota_type.as_query_value());
2154 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
2155 empty_ok(resp).await
2156 }
2157
2158 /// Get a topic's message-TTL (seconds, or `null` if unset).
2159 ///
2160 /// `GET /admin/v2/persistent/{tenant}/{ns}/{topic}/messageTTL`. Returns
2161 /// a bare integer when the override is set, `null` (decoded as
2162 /// `Option::None`) when no topic-level override is in place.
2163 /// Java: `PersistentTopicsBase#getMessageTTL`.
2164 pub async fn topic_get_message_ttl(&self, topic: &str) -> Result<Option<i32>, AdminError> {
2165 let (tenant, namespace, name) = split_topic(topic)?;
2166 let url = self.url(&["persistent", tenant, namespace, name, "messageTTL"])?;
2167 let resp = self.send(self.http.request(Method::GET, url)).await?;
2168 json_ok_optional(resp).await
2169 }
2170
2171 /// Set a topic's message-TTL (seconds).
2172 ///
2173 /// `POST /admin/v2/persistent/{tenant}/{ns}/{topic}/messageTTL?messageTTL={n}`.
2174 /// `0` disables (broker treats as no TTL).
2175 ///
2176 /// Note: unlike the **namespace**-level `setNamespaceMessageTTL` (which
2177 /// takes the TTL as a JSON int body), the topic-level setter binds
2178 /// `@QueryParam("messageTTL") Integer messageTTL` on both Pulsar 4.0 and
2179 /// 4.2 (`pulsar-broker/.../v2/PersistentTopics.java#setMessageTTL`).
2180 /// Sending the value as a JSON body returns 204 No Content but the
2181 /// broker reads the query param as `null` and silently treats the call
2182 /// as "no override" — the topic policy never persists, and a subsequent
2183 /// `topic_get_message_ttl` surfaces `Ok(None)`. Older Pulsar releases
2184 /// tolerated both encodings; 4.2 enforces the query-param shape.
2185 /// Java: `PersistentTopicsBase#setMessageTTL`.
2186 pub async fn topic_set_message_ttl(
2187 &self,
2188 topic: &str,
2189 ttl_seconds: i32,
2190 ) -> Result<(), AdminError> {
2191 let (tenant, namespace, name) = split_topic(topic)?;
2192 let mut url = self.url(&["persistent", tenant, namespace, name, "messageTTL"])?;
2193 url.query_pairs_mut()
2194 .append_pair("messageTTL", &ttl_seconds.to_string());
2195 let resp = self.send(self.http.request(Method::POST, url)).await?;
2196 empty_ok(resp).await
2197 }
2198
2199 /// Remove a topic's message-TTL (fall back to namespace default).
2200 ///
2201 /// `DELETE /admin/v2/persistent/{tenant}/{ns}/{topic}/messageTTL`.
2202 /// Java: `PersistentTopicsBase#removeMessageTTL`.
2203 pub async fn topic_remove_message_ttl(&self, topic: &str) -> Result<(), AdminError> {
2204 let (tenant, namespace, name) = split_topic(topic)?;
2205 let url = self.url(&["persistent", tenant, namespace, name, "messageTTL"])?;
2206 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
2207 empty_ok(resp).await
2208 }
2209
2210 /// Get a topic's persistence policy.
2211 ///
2212 /// `GET /admin/v2/persistent/{tenant}/{ns}/{topic}/persistence`. The
2213 /// broker emits a `PersistencePolicies` JSON when the topic override
2214 /// is set and `null` (decoded as `Option::None`) when no override is
2215 /// in place — callers fall back to the namespace policy in that case.
2216 /// Java: `PersistentTopicsBase#getPersistence`.
2217 pub async fn topic_get_persistence(
2218 &self,
2219 topic: &str,
2220 ) -> Result<Option<PersistencePolicies>, AdminError> {
2221 let (tenant, namespace, name) = split_topic(topic)?;
2222 let url = self.url(&["persistent", tenant, namespace, name, "persistence"])?;
2223 let resp = self.send(self.http.request(Method::GET, url)).await?;
2224 json_ok_optional(resp).await
2225 }
2226
2227 /// Set a topic's persistence policy (overrides the namespace default).
2228 ///
2229 /// `POST /admin/v2/persistent/{tenant}/{ns}/{topic}/persistence` with a
2230 /// JSON `PersistencePolicies` body.
2231 /// Java: `PersistentTopicsBase#setPersistence`.
2232 pub async fn topic_set_persistence(
2233 &self,
2234 topic: &str,
2235 policy: PersistencePolicies,
2236 ) -> Result<(), AdminError> {
2237 let (tenant, namespace, name) = split_topic(topic)?;
2238 let url = self.url(&["persistent", tenant, namespace, name, "persistence"])?;
2239 let resp = self
2240 .send(self.http.request(Method::POST, url).json(&policy))
2241 .await?;
2242 empty_ok(resp).await
2243 }
2244
2245 /// Remove a topic's persistence policy (fall back to namespace default).
2246 ///
2247 /// `DELETE /admin/v2/persistent/{tenant}/{ns}/{topic}/persistence`.
2248 /// Java: `PersistentTopicsBase#removePersistence`.
2249 pub async fn topic_remove_persistence(&self, topic: &str) -> Result<(), AdminError> {
2250 let (tenant, namespace, name) = split_topic(topic)?;
2251 let url = self.url(&["persistent", tenant, namespace, name, "persistence"])?;
2252 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
2253 empty_ok(resp).await
2254 }
2255
2256 /// Get a topic's consumer dispatch-rate policy (or `null` if no override).
2257 ///
2258 /// `GET /admin/v2/persistent/{tenant}/{ns}/{topic}/dispatchRate`. The
2259 /// broker emits the per-topic [`DispatchRate`] override or `null` when
2260 /// no override is set; callers fall back to the namespace policy in the
2261 /// `None` case. Java: `PersistentTopicsBase#getDispatchRate`.
2262 pub async fn topic_get_dispatch_rate(
2263 &self,
2264 topic: &str,
2265 ) -> Result<Option<DispatchRate>, AdminError> {
2266 let (tenant, namespace, name) = split_topic(topic)?;
2267 let url = self.url(&["persistent", tenant, namespace, name, "dispatchRate"])?;
2268 let resp = self.send(self.http.request(Method::GET, url)).await?;
2269 json_ok_optional(resp).await
2270 }
2271
2272 /// Set a topic's consumer dispatch-rate policy (overrides namespace default).
2273 ///
2274 /// `POST /admin/v2/persistent/{tenant}/{ns}/{topic}/dispatchRate` with a
2275 /// JSON `DispatchRate` body. Java: `PersistentTopicsBase#setDispatchRate`.
2276 pub async fn topic_set_dispatch_rate(
2277 &self,
2278 topic: &str,
2279 rate: DispatchRate,
2280 ) -> Result<(), AdminError> {
2281 let (tenant, namespace, name) = split_topic(topic)?;
2282 let url = self.url(&["persistent", tenant, namespace, name, "dispatchRate"])?;
2283 let resp = self
2284 .send(self.http.request(Method::POST, url).json(&rate))
2285 .await?;
2286 empty_ok(resp).await
2287 }
2288
2289 /// Remove a topic's consumer dispatch-rate policy.
2290 ///
2291 /// `DELETE /admin/v2/persistent/{tenant}/{ns}/{topic}/dispatchRate`.
2292 /// Java: `PersistentTopicsBase#removeDispatchRate`.
2293 pub async fn topic_remove_dispatch_rate(&self, topic: &str) -> Result<(), AdminError> {
2294 let (tenant, namespace, name) = split_topic(topic)?;
2295 let url = self.url(&["persistent", tenant, namespace, name, "dispatchRate"])?;
2296 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
2297 empty_ok(resp).await
2298 }
2299
2300 /// Get a topic's per-subscription dispatch-rate policy (or `null`).
2301 ///
2302 /// `GET /admin/v2/persistent/{tenant}/{ns}/{topic}/subscriptionDispatchRate`.
2303 /// Reuses the [`DispatchRate`] body shape — the policy applies per
2304 /// subscription rather than aggregated across all consumers.
2305 /// Java: `PersistentTopicsBase#getSubscriptionDispatchRate`.
2306 pub async fn topic_get_subscription_dispatch_rate(
2307 &self,
2308 topic: &str,
2309 ) -> Result<Option<DispatchRate>, AdminError> {
2310 let (tenant, namespace, name) = split_topic(topic)?;
2311 let url = self.url(&[
2312 "persistent",
2313 tenant,
2314 namespace,
2315 name,
2316 "subscriptionDispatchRate",
2317 ])?;
2318 let resp = self.send(self.http.request(Method::GET, url)).await?;
2319 json_ok_optional(resp).await
2320 }
2321
2322 /// Set a topic's per-subscription dispatch-rate policy (overrides namespace default).
2323 ///
2324 /// `POST /admin/v2/persistent/{tenant}/{ns}/{topic}/subscriptionDispatchRate`
2325 /// with a JSON `DispatchRate` body.
2326 /// Java: `PersistentTopicsBase#setSubscriptionDispatchRate`.
2327 pub async fn topic_set_subscription_dispatch_rate(
2328 &self,
2329 topic: &str,
2330 rate: DispatchRate,
2331 ) -> Result<(), AdminError> {
2332 let (tenant, namespace, name) = split_topic(topic)?;
2333 let url = self.url(&[
2334 "persistent",
2335 tenant,
2336 namespace,
2337 name,
2338 "subscriptionDispatchRate",
2339 ])?;
2340 let resp = self
2341 .send(self.http.request(Method::POST, url).json(&rate))
2342 .await?;
2343 empty_ok(resp).await
2344 }
2345
2346 /// Remove a topic's per-subscription dispatch-rate policy.
2347 ///
2348 /// `DELETE /admin/v2/persistent/{tenant}/{ns}/{topic}/subscriptionDispatchRate`.
2349 /// Java: `PersistentTopicsBase#removeSubscriptionDispatchRate`.
2350 pub async fn topic_remove_subscription_dispatch_rate(
2351 &self,
2352 topic: &str,
2353 ) -> Result<(), AdminError> {
2354 let (tenant, namespace, name) = split_topic(topic)?;
2355 let url = self.url(&[
2356 "persistent",
2357 tenant,
2358 namespace,
2359 name,
2360 "subscriptionDispatchRate",
2361 ])?;
2362 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
2363 empty_ok(resp).await
2364 }
2365
2366 /// Get a topic's cross-cluster replicator dispatch-rate policy (or `null`).
2367 ///
2368 /// `GET /admin/v2/persistent/{tenant}/{ns}/{topic}/replicatorDispatchRate`.
2369 /// Reuses the [`DispatchRate`] body shape — the policy throttles
2370 /// outbound geo-replication traffic from this cluster.
2371 /// Java: `PersistentTopicsBase#getReplicatorDispatchRate`.
2372 pub async fn topic_get_replicator_dispatch_rate(
2373 &self,
2374 topic: &str,
2375 ) -> Result<Option<DispatchRate>, AdminError> {
2376 let (tenant, namespace, name) = split_topic(topic)?;
2377 let url = self.url(&[
2378 "persistent",
2379 tenant,
2380 namespace,
2381 name,
2382 "replicatorDispatchRate",
2383 ])?;
2384 let resp = self.send(self.http.request(Method::GET, url)).await?;
2385 json_ok_optional(resp).await
2386 }
2387
2388 /// Set a topic's cross-cluster replicator dispatch-rate policy.
2389 ///
2390 /// `POST /admin/v2/persistent/{tenant}/{ns}/{topic}/replicatorDispatchRate`
2391 /// with a JSON `DispatchRate` body.
2392 /// Java: `PersistentTopicsBase#setReplicatorDispatchRate`.
2393 pub async fn topic_set_replicator_dispatch_rate(
2394 &self,
2395 topic: &str,
2396 rate: DispatchRate,
2397 ) -> Result<(), AdminError> {
2398 let (tenant, namespace, name) = split_topic(topic)?;
2399 let url = self.url(&[
2400 "persistent",
2401 tenant,
2402 namespace,
2403 name,
2404 "replicatorDispatchRate",
2405 ])?;
2406 let resp = self
2407 .send(self.http.request(Method::POST, url).json(&rate))
2408 .await?;
2409 empty_ok(resp).await
2410 }
2411
2412 /// Remove a topic's cross-cluster replicator dispatch-rate policy.
2413 ///
2414 /// `DELETE /admin/v2/persistent/{tenant}/{ns}/{topic}/replicatorDispatchRate`.
2415 /// Java: `PersistentTopicsBase#removeReplicatorDispatchRate`.
2416 pub async fn topic_remove_replicator_dispatch_rate(
2417 &self,
2418 topic: &str,
2419 ) -> Result<(), AdminError> {
2420 let (tenant, namespace, name) = split_topic(topic)?;
2421 let url = self.url(&[
2422 "persistent",
2423 tenant,
2424 namespace,
2425 name,
2426 "replicatorDispatchRate",
2427 ])?;
2428 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
2429 empty_ok(resp).await
2430 }
2431
2432 /// Get a topic's publish-rate policy (or `null` if no override).
2433 ///
2434 /// `GET /admin/v2/persistent/{tenant}/{ns}/{topic}/publishRate`. Returns
2435 /// the per-topic [`PublishRate`] producer-side throttle (msg/sec +
2436 /// byte/sec). `-1` on either dimension means unlimited.
2437 /// Java: `PersistentTopicsBase#getPublishRate`.
2438 pub async fn topic_get_publish_rate(
2439 &self,
2440 topic: &str,
2441 ) -> Result<Option<PublishRate>, AdminError> {
2442 let (tenant, namespace, name) = split_topic(topic)?;
2443 let url = self.url(&["persistent", tenant, namespace, name, "publishRate"])?;
2444 let resp = self.send(self.http.request(Method::GET, url)).await?;
2445 json_ok_optional(resp).await
2446 }
2447
2448 /// Set a topic's publish-rate policy (overrides namespace default).
2449 ///
2450 /// `POST /admin/v2/persistent/{tenant}/{ns}/{topic}/publishRate` with a
2451 /// JSON `PublishRate` body. Java: `PersistentTopicsBase#setPublishRate`.
2452 pub async fn topic_set_publish_rate(
2453 &self,
2454 topic: &str,
2455 rate: PublishRate,
2456 ) -> Result<(), AdminError> {
2457 let (tenant, namespace, name) = split_topic(topic)?;
2458 let url = self.url(&["persistent", tenant, namespace, name, "publishRate"])?;
2459 let resp = self
2460 .send(self.http.request(Method::POST, url).json(&rate))
2461 .await?;
2462 empty_ok(resp).await
2463 }
2464
2465 /// Remove a topic's publish-rate policy.
2466 ///
2467 /// `DELETE /admin/v2/persistent/{tenant}/{ns}/{topic}/publishRate`.
2468 /// Java: `PersistentTopicsBase#removePublishRate`.
2469 pub async fn topic_remove_publish_rate(&self, topic: &str) -> Result<(), AdminError> {
2470 let (tenant, namespace, name) = split_topic(topic)?;
2471 let url = self.url(&["persistent", tenant, namespace, name, "publishRate"])?;
2472 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
2473 empty_ok(resp).await
2474 }
2475
2476 /// Get a topic's max-producers cap (or `null` if no override).
2477 ///
2478 /// `GET /admin/v2/persistent/{tenant}/{ns}/{topic}/maxProducers`. Returns
2479 /// a bare integer when the override is set, `null` (decoded as
2480 /// `Option::None`) when no topic-level cap is in place.
2481 /// Java: `PersistentTopicsBase#getMaxProducers`.
2482 pub async fn topic_get_max_producers(&self, topic: &str) -> Result<Option<i32>, AdminError> {
2483 let (tenant, namespace, name) = split_topic(topic)?;
2484 let url = self.url(&["persistent", tenant, namespace, name, "maxProducers"])?;
2485 let resp = self.send(self.http.request(Method::GET, url)).await?;
2486 json_ok_optional(resp).await
2487 }
2488
2489 /// Set a topic's max-producers cap.
2490 ///
2491 /// `POST /admin/v2/persistent/{tenant}/{ns}/{topic}/maxProducers` with
2492 /// a bare integer body. `0` disables (broker treats as unlimited).
2493 /// Java: `PersistentTopicsBase#setMaxProducers`.
2494 pub async fn topic_set_max_producers(
2495 &self,
2496 topic: &str,
2497 max_producers: i32,
2498 ) -> Result<(), AdminError> {
2499 let (tenant, namespace, name) = split_topic(topic)?;
2500 let url = self.url(&["persistent", tenant, namespace, name, "maxProducers"])?;
2501 let resp = self
2502 .send(self.http.request(Method::POST, url).json(&max_producers))
2503 .await?;
2504 empty_ok(resp).await
2505 }
2506
2507 /// Remove a topic's max-producers cap (fall back to namespace / broker default).
2508 ///
2509 /// `DELETE /admin/v2/persistent/{tenant}/{ns}/{topic}/maxProducers`.
2510 /// Java: `PersistentTopicsBase#removeMaxProducers`.
2511 pub async fn topic_remove_max_producers(&self, topic: &str) -> Result<(), AdminError> {
2512 let (tenant, namespace, name) = split_topic(topic)?;
2513 let url = self.url(&["persistent", tenant, namespace, name, "maxProducers"])?;
2514 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
2515 empty_ok(resp).await
2516 }
2517
2518 /// Get a topic's max-consumers cap (or `null` if no override).
2519 ///
2520 /// `GET /admin/v2/persistent/{tenant}/{ns}/{topic}/maxConsumers`. Returns
2521 /// a bare integer when the override is set, `null` (decoded as
2522 /// `Option::None`) when no topic-level cap is in place.
2523 /// Java: `PersistentTopicsBase#getMaxConsumers`.
2524 pub async fn topic_get_max_consumers(&self, topic: &str) -> Result<Option<i32>, AdminError> {
2525 let (tenant, namespace, name) = split_topic(topic)?;
2526 let url = self.url(&["persistent", tenant, namespace, name, "maxConsumers"])?;
2527 let resp = self.send(self.http.request(Method::GET, url)).await?;
2528 json_ok_optional(resp).await
2529 }
2530
2531 /// Set a topic's max-consumers cap.
2532 ///
2533 /// `POST /admin/v2/persistent/{tenant}/{ns}/{topic}/maxConsumers` with
2534 /// a bare integer body. `0` disables (broker treats as unlimited).
2535 /// Java: `PersistentTopicsBase#setMaxConsumers`.
2536 pub async fn topic_set_max_consumers(
2537 &self,
2538 topic: &str,
2539 max_consumers: i32,
2540 ) -> Result<(), AdminError> {
2541 let (tenant, namespace, name) = split_topic(topic)?;
2542 let url = self.url(&["persistent", tenant, namespace, name, "maxConsumers"])?;
2543 let resp = self
2544 .send(self.http.request(Method::POST, url).json(&max_consumers))
2545 .await?;
2546 empty_ok(resp).await
2547 }
2548
2549 /// Remove a topic's max-consumers cap (fall back to namespace / broker default).
2550 ///
2551 /// `DELETE /admin/v2/persistent/{tenant}/{ns}/{topic}/maxConsumers`.
2552 /// Java: `PersistentTopicsBase#removeMaxConsumers`.
2553 pub async fn topic_remove_max_consumers(&self, topic: &str) -> Result<(), AdminError> {
2554 let (tenant, namespace, name) = split_topic(topic)?;
2555 let url = self.url(&["persistent", tenant, namespace, name, "maxConsumers"])?;
2556 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
2557 empty_ok(resp).await
2558 }
2559
2560 // --- Subscriptions ---------------------------------------------------
2561
2562 /// List subscription names on a topic.
2563 ///
2564 /// `GET /admin/v2/persistent/{tenant}/{namespace}/{topic}/subscriptions`.
2565 /// Java: `PersistentTopics#getSubscriptions`.
2566 pub async fn subscriptions_list(&self, topic: &str) -> Result<Vec<String>, AdminError> {
2567 let (tenant, namespace, name) = split_topic(topic)?;
2568 let url = self.url(&["persistent", tenant, namespace, name, "subscriptions"])?;
2569 let resp = self.send(self.http.request(Method::GET, url)).await?;
2570 json_ok(resp).await
2571 }
2572
2573 /// Reset a subscription's cursor to a specific message-id position.
2574 ///
2575 /// `POST /admin/v2/persistent/{tenant}/{namespace}/{topic}/subscription/{sub}/resetcursor`
2576 /// with body `{ledgerId, entryId, partitionIndex, batchIndex, isExcluded}`.
2577 /// `is_excluded = true` skips the message at `message_id` itself; `false`
2578 /// leaves it eligible for redelivery. Java: `PersistentTopics#resetCursorOnPosition`.
2579 pub async fn subscription_reset_cursor_to_position(
2580 &self,
2581 topic: &str,
2582 subscription: &str,
2583 message_id: MessageId,
2584 is_excluded: bool,
2585 ) -> Result<(), AdminError> {
2586 let (tenant, namespace, name) = split_topic(topic)?;
2587 validate_segment(subscription)?;
2588 let url = self.url(&[
2589 "persistent",
2590 tenant,
2591 namespace,
2592 name,
2593 "subscription",
2594 subscription,
2595 "resetcursor",
2596 ])?;
2597 let body = ResetCursorData {
2598 ledger_id: message_id_field_for_wire(message_id.ledger_id),
2599 entry_id: message_id_field_for_wire(message_id.entry_id),
2600 partition_index: message_id.partition,
2601 batch_index: message_id.batch_index,
2602 is_excluded,
2603 };
2604 let resp = self
2605 .send(self.http.request(Method::POST, url).json(&body))
2606 .await?;
2607 empty_ok(resp).await
2608 }
2609
2610 /// Reset a subscription's cursor to a wall-clock timestamp (millis since epoch).
2611 ///
2612 /// `POST /admin/v2/persistent/{tenant}/{namespace}/{topic}/subscription/{sub}/resetcursor/
2613 /// {timestamp}`. Java: `PersistentTopics#resetCursor(topic, sub, timestamp)`.
2614 pub async fn subscription_reset_cursor_to_timestamp(
2615 &self,
2616 topic: &str,
2617 subscription: &str,
2618 timestamp_millis: u64,
2619 ) -> Result<(), AdminError> {
2620 let (tenant, namespace, name) = split_topic(topic)?;
2621 validate_segment(subscription)?;
2622 let timestamp = timestamp_millis.to_string();
2623 let url = self.url(&[
2624 "persistent",
2625 tenant,
2626 namespace,
2627 name,
2628 "subscription",
2629 subscription,
2630 "resetcursor",
2631 ×tamp,
2632 ])?;
2633 let resp = self.send(self.http.request(Method::POST, url)).await?;
2634 empty_ok(resp).await
2635 }
2636
2637 /// Advance a subscription's cursor past N undelivered messages.
2638 ///
2639 /// `POST /admin/v2/persistent/{tenant}/{namespace}/{topic}/subscription/{sub}/skip/
2640 /// {numMessages}`. Java: `PersistentTopics#skipMessages`.
2641 pub async fn subscription_skip_messages(
2642 &self,
2643 topic: &str,
2644 subscription: &str,
2645 num_messages: u64,
2646 ) -> Result<(), AdminError> {
2647 let (tenant, namespace, name) = split_topic(topic)?;
2648 validate_segment(subscription)?;
2649 let n = num_messages.to_string();
2650 let url = self.url(&[
2651 "persistent",
2652 tenant,
2653 namespace,
2654 name,
2655 "subscription",
2656 subscription,
2657 "skip",
2658 &n,
2659 ])?;
2660 let resp = self.send(self.http.request(Method::POST, url)).await?;
2661 empty_ok(resp).await
2662 }
2663
2664 /// Drain the entire backlog of a subscription (clear-backlog).
2665 ///
2666 /// `POST /admin/v2/persistent/{tenant}/{namespace}/{topic}/subscription/{sub}/skip_all`.
2667 /// Java: `PersistentTopics#skipAllMessages`.
2668 pub async fn subscription_skip_all_messages(
2669 &self,
2670 topic: &str,
2671 subscription: &str,
2672 ) -> Result<(), AdminError> {
2673 let (tenant, namespace, name) = split_topic(topic)?;
2674 validate_segment(subscription)?;
2675 let url = self.url(&[
2676 "persistent",
2677 tenant,
2678 namespace,
2679 name,
2680 "subscription",
2681 subscription,
2682 "skip_all",
2683 ])?;
2684 let resp = self.send(self.http.request(Method::POST, url)).await?;
2685 empty_ok(resp).await
2686 }
2687
2688 /// Expire all messages older than `expire_time_seconds` for a subscription.
2689 ///
2690 /// `POST /admin/v2/persistent/{tenant}/{namespace}/{topic}/subscription/{sub}/expireMessages/
2691 /// {seconds}`. Java: `PersistentTopics#expireMessagesForSubscription`.
2692 pub async fn subscription_expire_messages(
2693 &self,
2694 topic: &str,
2695 subscription: &str,
2696 expire_time_seconds: u64,
2697 ) -> Result<(), AdminError> {
2698 let (tenant, namespace, name) = split_topic(topic)?;
2699 validate_segment(subscription)?;
2700 let s = expire_time_seconds.to_string();
2701 let url = self.url(&[
2702 "persistent",
2703 tenant,
2704 namespace,
2705 name,
2706 "subscription",
2707 subscription,
2708 "expireMessages",
2709 &s,
2710 ])?;
2711 let resp = self.send(self.http.request(Method::POST, url)).await?;
2712 empty_ok(resp).await
2713 }
2714
2715 /// Delete (unsubscribe) a subscription.
2716 ///
2717 /// `DELETE /admin/v2/persistent/{tenant}/{namespace}/{topic}/subscription/{sub}?force={force}`.
2718 /// `force = true` disconnects active consumers before deletion. Java:
2719 /// `PersistentTopics#deleteSubscription`.
2720 pub async fn subscription_delete(
2721 &self,
2722 topic: &str,
2723 subscription: &str,
2724 force: bool,
2725 ) -> Result<(), AdminError> {
2726 let (tenant, namespace, name) = split_topic(topic)?;
2727 validate_segment(subscription)?;
2728 let mut url = self.url(&[
2729 "persistent",
2730 tenant,
2731 namespace,
2732 name,
2733 "subscription",
2734 subscription,
2735 ])?;
2736 if force {
2737 url.query_pairs_mut().append_pair("force", "true");
2738 }
2739 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
2740 empty_ok(resp).await
2741 }
2742
2743 // --- Shadow topics (PIP-180 / ADR-0033) ------------------------------
2744
2745 /// Create a shadow topic ([PIP-180](https://github.com/apache/pulsar/blob/master/pip/pip-180.md)).
2746 ///
2747 /// Creates the shadow as a regular non-partitioned topic with the broker-reserved
2748 /// `PULSAR.SHADOW_SOURCE` topic property pointing at the source topic:
2749 /// `PUT /admin/v2/persistent/{tenant}/{namespace}/{shadow}` with body
2750 /// `{ "PULSAR.SHADOW_SOURCE": "persistent://tenant/ns/source" }`.
2751 /// Then links the created shadow in the source topic policy list via
2752 /// `PUT /admin/v2/persistent/{tenant}/{namespace}/{source}/shadowTopics`.
2753 ///
2754 /// Java:
2755 /// `pulsar-client-admin/.../TopicsImpl.java#createShadowTopicAsync` builds the same property
2756 /// map before calling `createNonPartitionedTopicAsync` for non-partitioned sources.
2757 ///
2758 /// Errors mirror the existing `AdminError` taxonomy: 404 → `Status { code:
2759 /// 404, .. }` (the source topic does not exist), 409 → `Status { code:
2760 /// 409, .. }` (the shadow topic already exists on this source),
2761 /// 401/403 → `Status { code: 401|403, .. }` (auth).
2762 pub async fn create_shadow_topic(&self, source: &str, shadow: &str) -> Result<(), AdminError> {
2763 let (source_tenant, source_namespace, source_name) = split_topic(source)?;
2764 // Validate the shadow name eagerly so a misformatted argument errors
2765 // out with `InvalidName` rather than as a broker 4xx after we've
2766 // already crossed the wire.
2767 let (shadow_tenant, shadow_namespace, shadow_name) = split_topic(shadow)?;
2768 let source = format!("persistent://{source_tenant}/{source_namespace}/{source_name}");
2769 let shadow = format!("persistent://{shadow_tenant}/{shadow_namespace}/{shadow_name}");
2770 let mut properties = HashMap::new();
2771 properties.insert("PULSAR.SHADOW_SOURCE".to_owned(), source.clone());
2772 self.topic_create_non_partitioned_with_properties(&shadow, &properties)
2773 .await?;
2774 self.set_shadow_topics(&source, &[shadow]).await
2775 }
2776
2777 async fn set_shadow_topics(&self, source: &str, shadows: &[String]) -> Result<(), AdminError> {
2778 let (tenant, namespace, name) = split_topic(source)?;
2779 let url = self.url(&["persistent", tenant, namespace, name, "shadowTopics"])?;
2780 let resp = self
2781 .send(self.http.request(Method::PUT, url).json(shadows))
2782 .await?;
2783 empty_ok(resp).await
2784 }
2785
2786 /// Delete a shadow topic (PIP-180).
2787 ///
2788 /// `DELETE /admin/v2/persistent/{tenant}/{namespace}/{topic}` where
2789 /// `{topic}` is the **shadow** topic name. PIP-180's deletion contract
2790 /// goes through the regular topic-delete path on the shadow itself —
2791 /// the broker recognises the topic as a shadow and detaches it from
2792 /// the source ledger atomically with the metadata delete.
2793 ///
2794 /// `force` controls whether active subscribers are kicked off before
2795 /// the delete (`?force=true`) or whether the broker rejects the
2796 /// request when subscribers exist (`?force=false`, the default).
2797 ///
2798 /// Java: `org.apache.pulsar.client.admin.Topics#deleteShadowTopic` calls
2799 /// the same `@DELETE @Path("/{tenant}/{namespace}/{topic}")` endpoint.
2800 pub async fn delete_shadow_topic(&self, shadow: &str, force: bool) -> Result<(), AdminError> {
2801 let (tenant, namespace, name) = split_topic(shadow)?;
2802 let mut url = self.url(&["persistent", tenant, namespace, name])?;
2803 url.query_pairs_mut()
2804 .append_pair("force", if force { "true" } else { "false" });
2805 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
2806 empty_ok(resp).await
2807 }
2808
2809 /// List the shadow topics created on a source topic (PIP-180).
2810 ///
2811 /// `GET /admin/v2/persistent/{tenant}/{namespace}/{topic}/shadowTopics`
2812 /// where `{topic}` is the **source** topic name. The broker returns a
2813 /// JSON array of fully-qualified shadow topic names.
2814 ///
2815 /// Java:
2816 /// `pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java`
2817 /// (`@GET @Path("/{tenant}/{namespace}/{topic}/shadowTopics")`).
2818 ///
2819 /// Used by the runtime engine at consumer subscribe time: when the user
2820 /// subscribes to a topic the runtime cannot yet classify, a single
2821 /// `get_shadow_topics` lookup on every other topic in the namespace is
2822 /// expensive; instead the runtime calls `get_shadow_topics(subscribed)`
2823 /// on the topic itself — a non-shadow topic returns an empty array, a
2824 /// shadow topic surfaces nothing but the broker has already populated
2825 /// the consumer's `shadow_metadata` via the topic's policy.
2826 /// (See `crates/magnetar-runtime-tokio/src/client.rs::subscribe`.)
2827 pub async fn get_shadow_topics(&self, source: &str) -> Result<Vec<String>, AdminError> {
2828 let (tenant, namespace, name) = split_topic(source)?;
2829 let url = self.url(&["persistent", tenant, namespace, name, "shadowTopics"])?;
2830 let resp = self.send(self.http.request(Method::GET, url)).await?;
2831 json_ok_or_default(resp).await
2832 }
2833
2834 /// Resolve the **source** topic of a shadow topic (PIP-180).
2835 ///
2836 /// `GET /admin/v2/persistent/{tenant}/{namespace}/{topic}/properties`.
2837 /// Returns the `PULSAR.SHADOW_SOURCE` property when the queried topic is a
2838 /// shadow; returns `None` when it is a regular topic. Used by the runtime
2839 /// at subscribe time to populate
2840 /// [`magnetar_proto::ShadowTopicMetadata::source_topic`] on the new
2841 /// consumer (so the receive path can emit
2842 /// [`magnetar_proto::ConnectionEvent::MessageReceivedFromShadow`]
2843 /// without an out-of-band lookup per message).
2844 ///
2845 /// Java: `org.apache.pulsar.client.admin.Topics#getShadowSource` —
2846 /// `TopicsImpl#getShadowSourceAsync` delegates to `getPropertiesAsync`.
2847 pub async fn get_shadow_source(&self, shadow: &str) -> Result<Option<String>, AdminError> {
2848 let (tenant, namespace, name) = split_topic(shadow)?;
2849 let url = self.url(&["persistent", tenant, namespace, name, "properties"])?;
2850 let resp = self.send(self.http.request(Method::GET, url)).await?;
2851 let mut properties: HashMap<String, String> = json_ok_or_default(resp).await?;
2852 Ok(properties
2853 .remove("PULSAR.SHADOW_SOURCE")
2854 .filter(|source| !source.is_empty()))
2855 }
2856
2857 // --- Pulsar IO Sources (/admin/v3/sources/...) ----------------------
2858
2859 /// List sources configured under a namespace.
2860 ///
2861 /// `GET /admin/v3/sources/{tenant}/{namespace}`. Returns the list of
2862 /// source names (the broker emits a JSON array of strings — one
2863 /// entry per declared source, regardless of running state).
2864 /// Java:
2865 /// `pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Sources.java`
2866 /// (`@Path("/sources")`) +
2867 /// `pulsar-broker/.../admin/impl/SourcesBase.java#listSources`.
2868 pub async fn sources_list_by_namespace(
2869 &self,
2870 tenant: &str,
2871 namespace: &str,
2872 ) -> Result<Vec<String>, AdminError> {
2873 validate_segment(tenant)?;
2874 validate_segment(namespace)?;
2875 let url = self.url_v3(&["sources", tenant, namespace])?;
2876 let resp = self.send(self.http.request(Method::GET, url)).await?;
2877 json_ok(resp).await
2878 }
2879
2880 /// Get one source's configuration.
2881 ///
2882 /// `GET /admin/v3/sources/{tenant}/{namespace}/{name}`. Returns the
2883 /// stored `SourceConfig` envelope as raw JSON — minor broker
2884 /// versions extend the shape (new connector knobs, secret refs)
2885 /// faster than a typed Rust DTO can keep up.
2886 /// Java: `SourcesBase#getSourceInfo`.
2887 pub async fn source_get(
2888 &self,
2889 tenant: &str,
2890 namespace: &str,
2891 name: &str,
2892 ) -> Result<serde_json::Value, AdminError> {
2893 validate_segment(tenant)?;
2894 validate_segment(namespace)?;
2895 validate_segment(name)?;
2896 let url = self.url_v3(&["sources", tenant, namespace, name])?;
2897 let resp = self.send(self.http.request(Method::GET, url)).await?;
2898 json_ok(resp).await
2899 }
2900
2901 /// Get a source's running status (per-instance worker telemetry).
2902 ///
2903 /// `GET /admin/v3/sources/{tenant}/{namespace}/{name}/status`.
2904 /// Returns the broker's `SourceStatus` envelope (`numInstances`,
2905 /// `numRunning`, per-instance `workerId` + `running` + last
2906 /// received timestamp). Exposed as raw JSON for forward-compat.
2907 /// Java: `SourcesBase#getSourceStatus`.
2908 pub async fn source_status(
2909 &self,
2910 tenant: &str,
2911 namespace: &str,
2912 name: &str,
2913 ) -> Result<serde_json::Value, AdminError> {
2914 validate_segment(tenant)?;
2915 validate_segment(namespace)?;
2916 validate_segment(name)?;
2917 let url = self.url_v3(&["sources", tenant, namespace, name, "status"])?;
2918 let resp = self.send(self.http.request(Method::GET, url)).await?;
2919 json_ok(resp).await
2920 }
2921
2922 /// Register a source from a remote package URL.
2923 ///
2924 /// `POST /admin/v3/sources/{tenant}/{namespace}/{name}` with
2925 /// `multipart/form-data` carrying two parts: a `url` text part (the
2926 /// package URL — `http(s)://`, `file://`, or `function://` per the
2927 /// broker's `WorkerUtils#downloadFileFromPackageUrl`) and a
2928 /// `sourceConfig` JSON part with the [`SourceConfig`] body.
2929 /// A sibling `source_create` (binary upload) is intentionally not
2930 /// yet exposed — this URL-based variant covers every
2931 /// CI/operator scenario that does not need to ship a JAR through
2932 /// the admin client itself.
2933 /// Java: `SourcesBase#registerSource`.
2934 pub async fn source_create_with_url(
2935 &self,
2936 tenant: &str,
2937 namespace: &str,
2938 name: &str,
2939 url: &str,
2940 config: SourceConfig,
2941 ) -> Result<(), AdminError> {
2942 validate_segment(tenant)?;
2943 validate_segment(namespace)?;
2944 validate_segment(name)?;
2945 let endpoint = self.url_v3(&["sources", tenant, namespace, name])?;
2946 let form = build_url_config_multipart(url, "sourceConfig", &config)?;
2947 let resp = self
2948 .send(
2949 self.http
2950 .request(Method::POST, endpoint)
2951 .multipart(form)
2952 .timeout(PACKAGE_REGISTER_TIMEOUT),
2953 )
2954 .await?;
2955 empty_ok(resp).await
2956 }
2957
2958 /// Update a source from a remote package URL.
2959 ///
2960 /// `PUT /admin/v3/sources/{tenant}/{namespace}/{name}` with the
2961 /// same multipart shape as [`Self::source_create_with_url`].
2962 /// Java: `SourcesBase#updateSource`.
2963 pub async fn source_update_with_url(
2964 &self,
2965 tenant: &str,
2966 namespace: &str,
2967 name: &str,
2968 url: &str,
2969 config: SourceConfig,
2970 ) -> Result<(), AdminError> {
2971 validate_segment(tenant)?;
2972 validate_segment(namespace)?;
2973 validate_segment(name)?;
2974 let endpoint = self.url_v3(&["sources", tenant, namespace, name])?;
2975 let form = build_url_config_multipart(url, "sourceConfig", &config)?;
2976 let resp = self
2977 .send(
2978 self.http
2979 .request(Method::PUT, endpoint)
2980 .multipart(form)
2981 .timeout(PACKAGE_REGISTER_TIMEOUT),
2982 )
2983 .await?;
2984 empty_ok(resp).await
2985 }
2986
2987 /// Delete a source.
2988 ///
2989 /// `DELETE /admin/v3/sources/{tenant}/{namespace}/{name}`. Removes
2990 /// the source declaration and tears the running instances down.
2991 /// Java: `SourcesBase#deregisterSource`.
2992 pub async fn source_delete(
2993 &self,
2994 tenant: &str,
2995 namespace: &str,
2996 name: &str,
2997 ) -> Result<(), AdminError> {
2998 validate_segment(tenant)?;
2999 validate_segment(namespace)?;
3000 validate_segment(name)?;
3001 let url = self.url_v3(&["sources", tenant, namespace, name])?;
3002 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
3003 empty_ok(resp).await
3004 }
3005
3006 /// Start every instance of a source.
3007 ///
3008 /// `POST /admin/v3/sources/{tenant}/{namespace}/{name}/start`.
3009 /// Java: `SourcesBase#startSource`.
3010 pub async fn source_start(
3011 &self,
3012 tenant: &str,
3013 namespace: &str,
3014 name: &str,
3015 ) -> Result<(), AdminError> {
3016 validate_segment(tenant)?;
3017 validate_segment(namespace)?;
3018 validate_segment(name)?;
3019 let url = self.url_v3(&["sources", tenant, namespace, name, "start"])?;
3020 let resp = self.send(self.http.request(Method::POST, url)).await?;
3021 empty_ok(resp).await
3022 }
3023
3024 /// Stop every instance of a source.
3025 ///
3026 /// `POST /admin/v3/sources/{tenant}/{namespace}/{name}/stop`.
3027 /// Java: `SourcesBase#stopSource`.
3028 pub async fn source_stop(
3029 &self,
3030 tenant: &str,
3031 namespace: &str,
3032 name: &str,
3033 ) -> Result<(), AdminError> {
3034 validate_segment(tenant)?;
3035 validate_segment(namespace)?;
3036 validate_segment(name)?;
3037 let url = self.url_v3(&["sources", tenant, namespace, name, "stop"])?;
3038 let resp = self.send(self.http.request(Method::POST, url)).await?;
3039 empty_ok(resp).await
3040 }
3041
3042 /// Restart every instance of a source.
3043 ///
3044 /// `POST /admin/v3/sources/{tenant}/{namespace}/{name}/restart`.
3045 /// Java: `SourcesBase#restartSource`.
3046 pub async fn source_restart(
3047 &self,
3048 tenant: &str,
3049 namespace: &str,
3050 name: &str,
3051 ) -> Result<(), AdminError> {
3052 validate_segment(tenant)?;
3053 validate_segment(namespace)?;
3054 validate_segment(name)?;
3055 let url = self.url_v3(&["sources", tenant, namespace, name, "restart"])?;
3056 let resp = self.send(self.http.request(Method::POST, url)).await?;
3057 empty_ok(resp).await
3058 }
3059
3060 // --- Pulsar IO Sinks (/admin/v3/sinks/...) --------------------------
3061
3062 /// List sinks configured under a namespace.
3063 ///
3064 /// `GET /admin/v3/sinks/{tenant}/{namespace}`. Returns the list of
3065 /// sink names. Mirrors [`Self::sources_list_by_namespace`] —
3066 /// Pulsar's Sources / Sinks REST surfaces are intentionally
3067 /// symmetric.
3068 /// Java:
3069 /// `pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Sinks.java`
3070 /// (`@Path("/sinks")`) +
3071 /// `pulsar-broker/.../admin/impl/SinksBase.java#listSinks`.
3072 pub async fn sinks_list_by_namespace(
3073 &self,
3074 tenant: &str,
3075 namespace: &str,
3076 ) -> Result<Vec<String>, AdminError> {
3077 validate_segment(tenant)?;
3078 validate_segment(namespace)?;
3079 let url = self.url_v3(&["sinks", tenant, namespace])?;
3080 let resp = self.send(self.http.request(Method::GET, url)).await?;
3081 json_ok(resp).await
3082 }
3083
3084 /// Get one sink's configuration.
3085 ///
3086 /// `GET /admin/v3/sinks/{tenant}/{namespace}/{name}`. Returns the
3087 /// stored `SinkConfig` as raw JSON for the same forward-compat
3088 /// reason as [`Self::source_get`]. Java: `SinksBase#getSinkInfo`.
3089 pub async fn sink_get(
3090 &self,
3091 tenant: &str,
3092 namespace: &str,
3093 name: &str,
3094 ) -> Result<serde_json::Value, AdminError> {
3095 validate_segment(tenant)?;
3096 validate_segment(namespace)?;
3097 validate_segment(name)?;
3098 let url = self.url_v3(&["sinks", tenant, namespace, name])?;
3099 let resp = self.send(self.http.request(Method::GET, url)).await?;
3100 json_ok(resp).await
3101 }
3102
3103 /// Get a sink's running status (per-instance worker telemetry).
3104 ///
3105 /// `GET /admin/v3/sinks/{tenant}/{namespace}/{name}/status`.
3106 /// Same envelope shape as the Sources status. Raw JSON.
3107 /// Java: `SinksBase#getSinkStatus`.
3108 pub async fn sink_status(
3109 &self,
3110 tenant: &str,
3111 namespace: &str,
3112 name: &str,
3113 ) -> Result<serde_json::Value, AdminError> {
3114 validate_segment(tenant)?;
3115 validate_segment(namespace)?;
3116 validate_segment(name)?;
3117 let url = self.url_v3(&["sinks", tenant, namespace, name, "status"])?;
3118 let resp = self.send(self.http.request(Method::GET, url)).await?;
3119 json_ok(resp).await
3120 }
3121
3122 /// Register a sink from a remote package URL.
3123 ///
3124 /// `POST /admin/v3/sinks/{tenant}/{namespace}/{name}` with
3125 /// `multipart/form-data` carrying a `url` text part and a
3126 /// `sinkConfig` JSON part. Mirrors
3127 /// [`Self::source_create_with_url`]; the only wire-level
3128 /// difference is the JSON-part field name (`sinkConfig` vs
3129 /// `sourceConfig`). Java: `SinksBase#registerSink`.
3130 pub async fn sink_create_with_url(
3131 &self,
3132 tenant: &str,
3133 namespace: &str,
3134 name: &str,
3135 url: &str,
3136 config: SinkConfig,
3137 ) -> Result<(), AdminError> {
3138 validate_segment(tenant)?;
3139 validate_segment(namespace)?;
3140 validate_segment(name)?;
3141 let endpoint = self.url_v3(&["sinks", tenant, namespace, name])?;
3142 let form = build_url_config_multipart(url, "sinkConfig", &config)?;
3143 let resp = self
3144 .send(
3145 self.http
3146 .request(Method::POST, endpoint)
3147 .multipart(form)
3148 .timeout(PACKAGE_REGISTER_TIMEOUT),
3149 )
3150 .await?;
3151 empty_ok(resp).await
3152 }
3153
3154 /// Update a sink from a remote package URL.
3155 ///
3156 /// `PUT /admin/v3/sinks/{tenant}/{namespace}/{name}` with the same
3157 /// multipart shape as [`Self::sink_create_with_url`].
3158 /// Java: `SinksBase#updateSink`.
3159 pub async fn sink_update_with_url(
3160 &self,
3161 tenant: &str,
3162 namespace: &str,
3163 name: &str,
3164 url: &str,
3165 config: SinkConfig,
3166 ) -> Result<(), AdminError> {
3167 validate_segment(tenant)?;
3168 validate_segment(namespace)?;
3169 validate_segment(name)?;
3170 let endpoint = self.url_v3(&["sinks", tenant, namespace, name])?;
3171 let form = build_url_config_multipart(url, "sinkConfig", &config)?;
3172 let resp = self
3173 .send(
3174 self.http
3175 .request(Method::PUT, endpoint)
3176 .multipart(form)
3177 .timeout(PACKAGE_REGISTER_TIMEOUT),
3178 )
3179 .await?;
3180 empty_ok(resp).await
3181 }
3182
3183 /// Delete a sink.
3184 ///
3185 /// `DELETE /admin/v3/sinks/{tenant}/{namespace}/{name}`.
3186 /// Java: `SinksBase#deregisterSink`.
3187 pub async fn sink_delete(
3188 &self,
3189 tenant: &str,
3190 namespace: &str,
3191 name: &str,
3192 ) -> Result<(), AdminError> {
3193 validate_segment(tenant)?;
3194 validate_segment(namespace)?;
3195 validate_segment(name)?;
3196 let url = self.url_v3(&["sinks", tenant, namespace, name])?;
3197 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
3198 empty_ok(resp).await
3199 }
3200
3201 /// Start every instance of a sink.
3202 ///
3203 /// `POST /admin/v3/sinks/{tenant}/{namespace}/{name}/start`.
3204 /// Java: `SinksBase#startSink`.
3205 pub async fn sink_start(
3206 &self,
3207 tenant: &str,
3208 namespace: &str,
3209 name: &str,
3210 ) -> Result<(), AdminError> {
3211 validate_segment(tenant)?;
3212 validate_segment(namespace)?;
3213 validate_segment(name)?;
3214 let url = self.url_v3(&["sinks", tenant, namespace, name, "start"])?;
3215 let resp = self.send(self.http.request(Method::POST, url)).await?;
3216 empty_ok(resp).await
3217 }
3218
3219 /// Stop every instance of a sink.
3220 ///
3221 /// `POST /admin/v3/sinks/{tenant}/{namespace}/{name}/stop`.
3222 /// Java: `SinksBase#stopSink`.
3223 pub async fn sink_stop(
3224 &self,
3225 tenant: &str,
3226 namespace: &str,
3227 name: &str,
3228 ) -> Result<(), AdminError> {
3229 validate_segment(tenant)?;
3230 validate_segment(namespace)?;
3231 validate_segment(name)?;
3232 let url = self.url_v3(&["sinks", tenant, namespace, name, "stop"])?;
3233 let resp = self.send(self.http.request(Method::POST, url)).await?;
3234 empty_ok(resp).await
3235 }
3236
3237 /// Restart every instance of a sink.
3238 ///
3239 /// `POST /admin/v3/sinks/{tenant}/{namespace}/{name}/restart`.
3240 /// Java: `SinksBase#restartSink`.
3241 pub async fn sink_restart(
3242 &self,
3243 tenant: &str,
3244 namespace: &str,
3245 name: &str,
3246 ) -> Result<(), AdminError> {
3247 validate_segment(tenant)?;
3248 validate_segment(namespace)?;
3249 validate_segment(name)?;
3250 let url = self.url_v3(&["sinks", tenant, namespace, name, "restart"])?;
3251 let resp = self.send(self.http.request(Method::POST, url)).await?;
3252 empty_ok(resp).await
3253 }
3254
3255 // --- Pulsar Packages (/admin/v3/packages/...) -----------------------
3256
3257 /// List package names declared under (`type`, `tenant`, `namespace`).
3258 ///
3259 /// `GET /admin/v3/packages/{type}/{tenant}/{namespace}`. Returns the
3260 /// list of package names — *not* versions; one entry per declared
3261 /// package. Use [`Self::package_versions_list`] to enumerate the
3262 /// versions of one package.
3263 /// Java: `pulsar-broker/src/main/java/org/apache/pulsar/broker/
3264 /// admin/v3/Packages.java#listPackages`.
3265 pub async fn packages_list(
3266 &self,
3267 pkg_type: PackageType,
3268 tenant: &str,
3269 namespace: &str,
3270 ) -> Result<Vec<String>, AdminError> {
3271 validate_segment(tenant)?;
3272 validate_segment(namespace)?;
3273 let url = self.url_v3(&["packages", pkg_type.as_str(), tenant, namespace])?;
3274 let resp = self.send(self.http.request(Method::GET, url)).await?;
3275 json_ok(resp).await
3276 }
3277
3278 /// List the versions declared for one package.
3279 ///
3280 /// `GET /admin/v3/packages/{type}/{tenant}/{namespace}/{name}`.
3281 /// Returns the list of version strings (Pulsar treats versions as
3282 /// opaque strings — `1.0.0`, `latest`, build hashes — and only the
3283 /// metadata endpoints understand them).
3284 /// Java: `PackagesBase#listPackageVersions`.
3285 pub async fn package_versions_list(
3286 &self,
3287 pkg_type: PackageType,
3288 tenant: &str,
3289 namespace: &str,
3290 name: &str,
3291 ) -> Result<Vec<String>, AdminError> {
3292 validate_segment(tenant)?;
3293 validate_segment(namespace)?;
3294 validate_segment(name)?;
3295 let url = self.url_v3(&["packages", pkg_type.as_str(), tenant, namespace, name])?;
3296 let resp = self.send(self.http.request(Method::GET, url)).await?;
3297 json_ok(resp).await
3298 }
3299
3300 /// Get the metadata envelope for one package version.
3301 ///
3302 /// `GET /admin/v3/packages/{type}/{tenant}/{namespace}/{name}/
3303 /// {version}/metadata`. Returns the `PackageMetadata` envelope as
3304 /// raw JSON for forward-compat — broker minor versions extend the
3305 /// shape with `tags`, `documentationUrl`, etc.
3306 /// Java: `PackagesBase#getPackageMetadata`.
3307 pub async fn package_metadata_get(
3308 &self,
3309 pkg_type: PackageType,
3310 tenant: &str,
3311 namespace: &str,
3312 name: &str,
3313 version: &str,
3314 ) -> Result<serde_json::Value, AdminError> {
3315 validate_segment(tenant)?;
3316 validate_segment(namespace)?;
3317 validate_segment(name)?;
3318 validate_segment(version)?;
3319 let url = self.url_v3(&[
3320 "packages",
3321 pkg_type.as_str(),
3322 tenant,
3323 namespace,
3324 name,
3325 version,
3326 "metadata",
3327 ])?;
3328 let resp = self.send(self.http.request(Method::GET, url)).await?;
3329 json_ok(resp).await
3330 }
3331
3332 /// Replace the metadata envelope for one package version.
3333 ///
3334 /// `PUT /admin/v3/packages/{type}/{tenant}/{namespace}/{name}/
3335 /// {version}/metadata` with a [`PackageMetadata`] JSON body.
3336 /// The broker rejects this verb with 404 if the package version
3337 /// does not exist — `package_metadata_set` is *update*, never
3338 /// *create*. Java: `PackagesBase#updatePackageMetadata`.
3339 pub async fn package_metadata_set(
3340 &self,
3341 pkg_type: PackageType,
3342 tenant: &str,
3343 namespace: &str,
3344 name: &str,
3345 version: &str,
3346 metadata: PackageMetadata,
3347 ) -> Result<(), AdminError> {
3348 validate_segment(tenant)?;
3349 validate_segment(namespace)?;
3350 validate_segment(name)?;
3351 validate_segment(version)?;
3352 let url = self.url_v3(&[
3353 "packages",
3354 pkg_type.as_str(),
3355 tenant,
3356 namespace,
3357 name,
3358 version,
3359 "metadata",
3360 ])?;
3361 let resp = self
3362 .send(self.http.request(Method::PUT, url).json(&metadata))
3363 .await?;
3364 empty_ok(resp).await
3365 }
3366
3367 /// Delete one package version.
3368 ///
3369 /// `DELETE /admin/v3/packages/{type}/{tenant}/{namespace}/{name}/
3370 /// {version}`. The broker drops the version's metadata + storage
3371 /// atomically; other versions of the same package are untouched.
3372 /// Java: `PackagesBase#deletePackage`.
3373 pub async fn package_delete(
3374 &self,
3375 pkg_type: PackageType,
3376 tenant: &str,
3377 namespace: &str,
3378 name: &str,
3379 version: &str,
3380 ) -> Result<(), AdminError> {
3381 validate_segment(tenant)?;
3382 validate_segment(namespace)?;
3383 validate_segment(name)?;
3384 validate_segment(version)?;
3385 let url = self.url_v3(&[
3386 "packages",
3387 pkg_type.as_str(),
3388 tenant,
3389 namespace,
3390 name,
3391 version,
3392 ])?;
3393 let resp = self.send(self.http.request(Method::DELETE, url)).await?;
3394 empty_ok(resp).await
3395 }
3396
3397 // --- Internal --------------------------------------------------------
3398
3399 /// Build a request URL by joining `segments` onto `base_url`. Each segment
3400 /// is percent-encoded for the URL path.
3401 fn url(&self, segments: &[&str]) -> Result<Url, AdminError> {
3402 Self::url_for(&self.base_url, segments)
3403 }
3404
3405 /// Build a request URL by joining `segments` onto the `/admin/v3/`
3406 /// base. Used by Pulsar Functions / IO Sources / IO Sinks /
3407 /// Packages, which Pulsar moved off of `/admin/v2/` historically.
3408 fn url_v3(&self, segments: &[&str]) -> Result<Url, AdminError> {
3409 Self::url_for(&self.base_url_v3, segments)
3410 }
3411
3412 /// Shared URL-builder body for the v2 / v3 helpers.
3413 fn url_for(base: &Url, segments: &[&str]) -> Result<Url, AdminError> {
3414 let mut url = base.clone();
3415 {
3416 // `Url::path_segments_mut` only fails for cannot-be-a-base URLs;
3417 // builder already rejected those.
3418 let mut path = url
3419 .path_segments_mut()
3420 .map_err(|()| AdminError::Builder("base url is cannot-be-a-base".into()))?;
3421 // Both `base_url` (anchored at `/admin/v2/`) and `base_url_v3`
3422 // (`/admin/v3/`) carry a trailing slash, which produces a
3423 // sentinel empty trailing segment in `path_segments_mut`. Drop
3424 // it before appending API segments — otherwise pushes land
3425 // after the empty, producing `/admin/v2//persistent/...`. Real
3426 // brokers tolerate the double slash; strict mocks (wiremock)
3427 // do not, and Java's `PulsarAdmin` emits the single-slash
3428 // form.
3429 path.pop_if_empty();
3430 for segment in segments {
3431 path.push(segment);
3432 }
3433 }
3434 Ok(url)
3435 }
3436
3437 /// Apply auth headers and dispatch.
3438 ///
3439 /// Returns an [`ApiResponse`] carrying the response alongside the
3440 /// resolved method + URL, so the decode helpers can attribute a
3441 /// non-JSON body to the exact request that produced it
3442 /// (see [`AdminError::Decode`] / [`AdminError::Status`]).
3443 async fn send(&self, req: RequestBuilder) -> Result<ApiResponse, AdminError> {
3444 let req = match &self.auth {
3445 AdminAuth::None => req,
3446 AdminAuth::Token(tok) => bearer(req, tok)?,
3447 AdminAuth::OAuth2(flow) => {
3448 // Refresh the cached token if it is missing or near expiry,
3449 // then attach it. `ensure_fresh` is a no-op when the cached
3450 // token is still valid, so the steady-state path is a single
3451 // mutex read.
3452 flow.ensure_fresh()
3453 .await
3454 .map_err(|err| AdminError::Auth(format!("oauth2 token refresh: {err}")))?;
3455 // A cache miss (None) and an IDP that handed back an empty
3456 // `access_token` (Some(empty)) are both "no usable token" —
3457 // collapse them into one guard.
3458 let token = flow
3459 .cached_access_token()
3460 .filter(|t| !t.is_empty())
3461 .ok_or_else(|| {
3462 AdminError::Auth("oauth2 returned an empty access token".to_owned())
3463 })?;
3464 // The access token is base64url JWT text — valid UTF-8 — but
3465 // guard the conversion rather than assume it.
3466 let tok = std::str::from_utf8(&token).map_err(|err| {
3467 AdminError::Auth(format!("oauth2 access token is not valid utf-8: {err}"))
3468 })?;
3469 bearer(req, tok)?
3470 }
3471 };
3472 // Build the request so the resolved method + URL are captured for
3473 // diagnostics, then execute on the shared client. This is
3474 // behaviorally identical to `req.send()`.
3475 let request = req.build()?;
3476 let method = request.method().clone();
3477 let url = request.url().clone();
3478 let resp = self.http.execute(request).await?;
3479 Ok(ApiResponse { method, url, resp })
3480 }
3481}
3482
3483/// A dispatched response paired with the request's resolved method + URL.
3484///
3485/// Threaded from [`AdminClient::send`] into the decode helpers so a
3486/// decode / status failure can name the exact request that produced it
3487/// (see [`AdminError::Decode`] and [`AdminError::Status`]).
3488struct ApiResponse {
3489 method: Method,
3490 url: Url,
3491 resp: Response,
3492}
3493
3494/// Pulsar Functions configuration — the subset of Java's
3495/// `org.apache.pulsar.common.functions.FunctionConfig` that the URL-based
3496/// `register` / `update` calls actually require. The Java type carries
3497/// ~30 fields (process / k8s runtime tuning, secrets, deadletter
3498/// topics, …); we expose the load-bearing ones an operator running a
3499/// pre-compiled JAR needs. Unknown fields on the wire are tolerated by
3500/// the broker, so adding extra knobs is an additive change later.
3501///
3502/// `tenant` / `namespace` / `name` are duplicated in the body because
3503/// the broker re-validates them against the URL path (Java's
3504/// `WorkerUtils.validateFunctionName`). The CLI fills these in from the
3505/// fully qualified name the operator passes on the command line.
3506///
3507/// `runtime` is the string `"JAVA"`, `"PYTHON"`, or `"GO"` — matches
3508/// Java's `org.apache.pulsar.common.functions.FunctionConfig.Runtime`
3509/// enum serialised by name.
3510#[derive(Debug, Clone, Default, Deserialize, Serialize)]
3511#[serde(default, rename_all = "camelCase")]
3512pub struct FunctionConfig {
3513 /// Owning tenant.
3514 pub tenant: String,
3515 /// Owning namespace (bare name, not `tenant/namespace`).
3516 pub namespace: String,
3517 /// Function name (unique within the namespace).
3518 pub name: String,
3519 /// Fully qualified entry-point class name (`com.acme.MyFunction`
3520 /// for Java; `module.fn` for Python; `main` for Go).
3521 #[serde(rename = "className")]
3522 pub class_name: String,
3523 /// Input topics the function subscribes to.
3524 pub inputs: Vec<String>,
3525 /// Output topic the function produces to. Empty when the function
3526 /// has no output sink.
3527 pub output: String,
3528 /// Runtime — `"JAVA"`, `"PYTHON"`, or `"GO"`.
3529 pub runtime: String,
3530 /// Number of parallel instances the worker schedules.
3531 pub parallelism: i32,
3532 /// Optional opaque user-config map passed to the function's
3533 /// `Context#getUserConfigValue`. JSON object on the wire.
3534 #[serde(rename = "userConfig", skip_serializing_if = "Option::is_none")]
3535 pub user_config: Option<serde_json::Value>,
3536}
3537
3538/// Tenant policy info — admin roles and allowed clusters.
3539///
3540/// Mirrors Java's `org.apache.pulsar.common.policies.data.TenantInfoImpl` —
3541/// the JSON keys (`adminRoles`, `allowedClusters`) match upstream verbatim.
3542#[derive(Debug, Clone, Default, Serialize, Deserialize)]
3543pub struct TenantInfo {
3544 /// Roles permitted to administrate the tenant.
3545 #[serde(rename = "adminRoles")]
3546 pub admin_roles: Vec<String>,
3547 /// Cluster names the tenant may use.
3548 #[serde(rename = "allowedClusters")]
3549 pub allowed_clusters: Vec<String>,
3550}
3551
3552/// Wire shape of the PIP-415 `getMessageIdByIndex` response.
3553///
3554/// Mirrors Java's `MessageIdImpl` JSON shape (Jackson default property-name
3555/// serialisation): `{ledgerId, entryId, partitionIndex}`. See
3556/// `pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java`.
3557///
3558/// Kept as a deserialise-only DTO and converted into
3559/// [`magnetar_proto::MessageId`] at the boundary so callers do not see this
3560/// wire detail. Not exposed publicly.
3561#[derive(Debug, Clone, Deserialize)]
3562#[serde(rename_all = "camelCase")]
3563struct MessageIdResponse {
3564 ledger_id: i64,
3565 entry_id: i64,
3566 #[serde(default = "default_partition_index")]
3567 partition_index: i32,
3568}
3569
3570fn default_partition_index() -> i32 {
3571 -1
3572}
3573
3574impl MessageIdResponse {
3575 /// Convert the REST response into the canonical [`MessageId`]. The broker
3576 /// resolves at entry granularity, so `batch_index` / `batch_size` are not
3577 /// part of the JSON — they default to `-1` (the same sentinel
3578 /// `MessageId::from_pb` uses for `MessageIdData` without batch fields).
3579 ///
3580 /// Returns `AdminError::Protocol` if the broker emits a negative
3581 /// `ledgerId` or `entryId` — both fields are `u64` in the canonical type
3582 /// (matching the proto wire format) and Java's `MessageIdImpl` cannot
3583 /// represent negative values either, so a negative wire value is a
3584 /// broker bug we must surface rather than silently wrap.
3585 fn try_into_message_id(self) -> Result<MessageId, AdminError> {
3586 let ledger_id = u64::try_from(self.ledger_id).map_err(|_| {
3587 AdminError::Protocol(format!("negative ledgerId from broker: {}", self.ledger_id))
3588 })?;
3589 let entry_id = u64::try_from(self.entry_id).map_err(|_| {
3590 AdminError::Protocol(format!("negative entryId from broker: {}", self.entry_id))
3591 })?;
3592 Ok(MessageId {
3593 ledger_id,
3594 entry_id,
3595 partition: self.partition_index,
3596 batch_index: -1,
3597 batch_size: -1,
3598 // PIP-460 (ADR-0031): admin REST never resolves a scalable
3599 // segment id; the field only exists under `scalable-topics`.
3600 #[cfg(feature = "scalable-topics")]
3601 segment_id: None,
3602 })
3603 }
3604}
3605
3606/// Topic stats. Intentionally permissive: the Java
3607/// `PersistentTopicStatsImpl` shape is large and shifts between releases;
3608/// we extract the high-signal counters and pass the rest through as raw JSON.
3609#[derive(Debug, Clone, Default, Deserialize)]
3610#[serde(default)]
3611pub struct TopicStats {
3612 /// Total messages received.
3613 #[serde(rename = "msgInCounter")]
3614 pub msg_in_counter: i64,
3615 /// Total bytes received.
3616 #[serde(rename = "bytesInCounter")]
3617 pub bytes_in_counter: i64,
3618 /// Publishers, raw JSON because the schema is large and version-dependent.
3619 pub publishers: Vec<serde_json::Value>,
3620 /// Subscriptions map (raw JSON).
3621 pub subscriptions: serde_json::Value,
3622}
3623
3624/// Partitioned-topic metadata, as returned by
3625/// `GET /admin/v2/persistent/{tenant}/{namespace}/{topic}/partitions`.
3626/// Java: `org.apache.pulsar.common.partition.PartitionedTopicMetadata`.
3627/// Only the partition count is consumed; broker-side extensions are ignored.
3628#[derive(Debug, Clone, Default, Deserialize)]
3629#[serde(default)]
3630struct PartitionedTopicMetadata {
3631 partitions: u32,
3632}
3633
3634/// Java `RetentionPolicies` — namespace-level retention policy. `-1` for
3635/// either dimension means infinite. The broker applies whichever quota
3636/// becomes binding first (time OR size).
3637#[derive(Debug, Clone, Copy, Default, Deserialize, Serialize)]
3638#[serde(default, rename_all = "camelCase")]
3639pub struct RetentionPolicies {
3640 /// Maximum retention time in minutes. `-1` = infinite, `0` = none.
3641 pub retention_time_in_minutes: i32,
3642 /// Maximum retention size in megabytes. `-1` = infinite, `0` = none.
3643 #[serde(rename = "retentionSizeInMB")]
3644 pub retention_size_in_mb: i64,
3645}
3646
3647/// Java `PersistencePolicies` — namespace-level `BookKeeper` layout +
3648/// managed-ledger write-shaping knobs. Maps to the broker's
3649/// `org.apache.pulsar.common.policies.data.PersistencePolicies`.
3650///
3651/// `Default::default()` returns the broker's documented default for a
3652/// new namespace (`2/2/2/0.0`), NOT all zeros — the broker rejects
3653/// ensemble values < 1 on `set`, so an all-zero policy is unusable
3654/// for a round-trip. Missing fields on decode default the same way:
3655/// a partial body where the broker omits one field round-trips with
3656/// the legal default, never with the illegal `0`.
3657#[derive(Debug, Clone, Copy, Deserialize, Serialize)]
3658#[serde(rename_all = "camelCase")]
3659pub struct PersistencePolicies {
3660 /// `BookKeeper` ensemble size — the number of bookies the managed
3661 /// ledger striping is spread across. Default: 2.
3662 #[serde(default = "default_bookkeeper_quorum")]
3663 pub bookkeeper_ensemble: i32,
3664 /// `BookKeeper` write quorum — the number of bookies each entry is
3665 /// written to. Default: 2.
3666 #[serde(default = "default_bookkeeper_quorum")]
3667 pub bookkeeper_write_quorum: i32,
3668 /// `BookKeeper` ack quorum — the number of acks required before an
3669 /// add is considered durable. Default: 2.
3670 #[serde(default = "default_bookkeeper_quorum")]
3671 pub bookkeeper_ack_quorum: i32,
3672 /// Managed-ledger mark-delete-rate cap (ops/sec). `0.0` disables
3673 /// the throttle. Default: 0.0 (disabled).
3674 #[serde(default)]
3675 pub managed_ledger_max_mark_delete_rate: f64,
3676}
3677
3678impl Default for PersistencePolicies {
3679 fn default() -> Self {
3680 Self {
3681 bookkeeper_ensemble: 2,
3682 bookkeeper_write_quorum: 2,
3683 bookkeeper_ack_quorum: 2,
3684 managed_ledger_max_mark_delete_rate: 0.0,
3685 }
3686 }
3687}
3688
3689#[inline]
3690fn default_bookkeeper_quorum() -> i32 {
3691 2
3692}
3693
3694/// Java `DispatchRate` — a sliding-window throttle (msg/sec + byte/sec
3695/// over a `ratePeriodInSecond` window). Shared shape between the
3696/// per-namespace consumer dispatch rate, the per-subscription dispatch
3697/// rate, and the cross-cluster replicator dispatch rate.
3698///
3699/// `-1` on either rate dimension disables that axis of the throttle —
3700/// missing fields default to `-1` (not `0`) so a broker-omitted
3701/// dimension round-trips as "no throttle", never as "throttle to
3702/// zero" (which would block consumer dispatch on the namespace).
3703#[derive(Debug, Clone, Copy, Deserialize, Serialize)]
3704#[serde(rename_all = "camelCase")]
3705pub struct DispatchRate {
3706 /// Throttle in messages/sec. `-1` = unlimited.
3707 #[serde(default = "neg_one_i32")]
3708 pub dispatch_throttling_rate_in_msg: i32,
3709 /// Throttle in bytes/sec. `-1` = unlimited.
3710 #[serde(default = "neg_one_i64")]
3711 pub dispatch_throttling_rate_in_byte: i64,
3712 /// Window size in seconds the throttle averages over.
3713 #[serde(default = "default_rate_period_seconds")]
3714 pub rate_period_in_second: i32,
3715 /// If `true`, dispatch rate is interpreted as an addend on top of
3716 /// the namespace publish rate rather than an absolute cap.
3717 #[serde(default)]
3718 pub relative_to_publish_rate: bool,
3719}
3720
3721impl Default for DispatchRate {
3722 fn default() -> Self {
3723 Self {
3724 dispatch_throttling_rate_in_msg: -1,
3725 dispatch_throttling_rate_in_byte: -1,
3726 rate_period_in_second: 1,
3727 relative_to_publish_rate: false,
3728 }
3729 }
3730}
3731
3732/// Java `PublishRate` — producer-side throttle (msg/sec + byte/sec).
3733/// `-1` on either dimension disables that axis of the throttle.
3734/// Missing fields default to `-1` (not `0`) — same sentinel semantics
3735/// as [`DispatchRate`]. Unlike `DispatchRate`, there is no
3736/// rate-period field; the broker uses a fixed 1-second window.
3737#[derive(Debug, Clone, Copy, Deserialize, Serialize)]
3738#[serde(rename_all = "camelCase")]
3739pub struct PublishRate {
3740 /// Throttle in messages/sec. `-1` = unlimited.
3741 #[serde(default = "neg_one_i32")]
3742 pub publish_throttling_rate_in_msg: i32,
3743 /// Throttle in bytes/sec. `-1` = unlimited.
3744 #[serde(default = "neg_one_i64")]
3745 pub publish_throttling_rate_in_byte: i64,
3746}
3747
3748impl Default for PublishRate {
3749 fn default() -> Self {
3750 Self {
3751 publish_throttling_rate_in_msg: -1,
3752 publish_throttling_rate_in_byte: -1,
3753 }
3754 }
3755}
3756
3757#[inline]
3758fn neg_one_i32() -> i32 {
3759 -1
3760}
3761#[inline]
3762fn neg_one_i64() -> i64 {
3763 -1
3764}
3765#[inline]
3766fn default_rate_period_seconds() -> i32 {
3767 1
3768}
3769
3770/// Java `DelayedDeliveryPolicies` — namespace-level switch + index-tick
3771/// granularity for the broker's delayed-message delivery tracker.
3772/// Maps to `org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies`.
3773/// `tick_time` controls how often the broker's delay-index buckets are
3774/// re-evaluated; smaller values give tighter delivery accuracy at a
3775/// higher tracker cost.
3776///
3777/// The Java field name is `tickTime` (carrying a `@JsonProperty("tickTime")`
3778/// annotation), **not** `tickTimeMillis`. The unit is documented as
3779/// milliseconds (see the upstream class doc), but the wire key drops
3780/// the unit suffix — Jackson on the broker only binds the literal
3781/// `tickTime`.
3782#[derive(Debug, Clone, Copy, Default, Deserialize, Serialize)]
3783#[serde(default, rename_all = "camelCase")]
3784pub struct DelayedDeliveryPolicies {
3785 /// Whether delayed delivery is enabled for the namespace.
3786 pub active: bool,
3787 /// Index-tick granularity in milliseconds. Wire key `tickTime`.
3788 #[serde(rename = "tickTime")]
3789 pub tick_time_millis: i64,
3790}
3791
3792/// Java `BacklogQuota` — one entry in the namespace-level backlog quota
3793/// map. `policy` is a string (`producer_request_hold`,
3794/// `producer_exception`, `consumer_backlog_eviction`) rather than a
3795/// closed Rust enum so new broker enum values forward-decode cleanly.
3796///
3797/// `-1` on either limit dimension disables that axis — missing fields
3798/// default to `-1` (not `0`) so a broker-omitted dimension round-trips
3799/// as "no quota", never as "expire-everything" or "block-everything".
3800#[derive(Debug, Clone, Deserialize, Serialize)]
3801#[serde(rename_all = "camelCase")]
3802pub struct BacklogQuota {
3803 /// Maximum allowed backlog in bytes (when type=`destination_storage`).
3804 /// `-1` = unlimited.
3805 #[serde(default = "neg_one_i64")]
3806 pub limit_size: i64,
3807 /// Maximum allowed backlog age in seconds (when type=`message_age`).
3808 /// `-1` = unlimited.
3809 #[serde(default = "neg_one_i32")]
3810 pub limit_time: i32,
3811 /// Action when the quota is exceeded.
3812 #[serde(default)]
3813 pub policy: String,
3814}
3815
3816impl Default for BacklogQuota {
3817 fn default() -> Self {
3818 Self {
3819 limit_size: -1,
3820 limit_time: -1,
3821 policy: String::new(),
3822 }
3823 }
3824}
3825
3826/// Java `BookieInfo` — a single bookie's rack assignment, as stored in
3827/// the `racks-info` metadata path and shipped on
3828/// [`AdminClient::bookies_set_rack`]. Field names are camelCase on the
3829/// wire (matching `org.apache.pulsar.common.policies.data.BookieInfo`,
3830/// which carries only `rack` and `hostname`).
3831///
3832/// The placement group is **not** part of this body — Pulsar's
3833/// `BookiesBase#updateBookieRackInfo` exposes `group` as a
3834/// `@QueryParam`, and the JSON body Jackson-binds only to
3835/// `{rack, hostname}`. Treating it as a body field silently drops the
3836/// operator's group choice on the wire.
3837#[derive(Debug, Clone, Default, Deserialize, Serialize)]
3838#[serde(default, rename_all = "camelCase")]
3839pub struct BookieInfo {
3840 /// Rack identifier within the group — opaque to the broker, only
3841 /// the placement policy cares about it.
3842 pub rack: String,
3843 /// Resolved hostname for the bookie. The broker uses it for
3844 /// log lines; it does not have to match DNS.
3845 pub hostname: String,
3846}
3847
3848/// Java `PostSchemaPayload` — the request body for
3849/// [`AdminClient::schema_post`] and
3850/// [`AdminClient::schema_compatibility_check`]. The Java DTO has
3851/// (`type`, `schema`, `properties`); both keys travel as-is on the wire.
3852/// `schema` is the canonical-form blob for AVRO / JSON / PROTOBUF and
3853/// the protobuf descriptor for `PROTOBUF_NATIVE`.
3854#[derive(Debug, Clone, Default, Deserialize, Serialize)]
3855#[serde(default)]
3856pub struct PostSchemaPayload {
3857 /// Schema type (`AVRO` / `JSON` / `PROTOBUF` / `PROTOBUF_NATIVE` /
3858 /// `KEY_VALUE` / `STRING` / `BYTES` / ...).
3859 #[serde(rename = "type")]
3860 pub schema_type: String,
3861 /// Schema definition, encoded per the type axis.
3862 pub schema: String,
3863 /// User-defined per-schema properties.
3864 pub properties: std::collections::HashMap<String, String>,
3865}
3866
3867/// Java `SourceConfig` — declarative description of a Pulsar IO
3868/// Source. Mirrors `org.apache.pulsar.common.io.SourceConfig` (Jackson
3869/// camelCase on the wire). Only the fields the JAX-RS `create` /
3870/// `update` paths require are typed; per-connector knobs ride along in
3871/// the open-ended `configs` map so a forward broker can add fields
3872/// without a magnetar release.
3873#[derive(Debug, Clone, Default, Deserialize, Serialize)]
3874#[serde(default, rename_all = "camelCase")]
3875pub struct SourceConfig {
3876 /// Tenant owning the source. Must match the URL path tenant.
3877 pub tenant: String,
3878 /// Namespace owning the source. Must match the URL path namespace.
3879 pub namespace: String,
3880 /// Source name. Must match the URL path name.
3881 pub name: String,
3882 /// Fully-qualified connector class (e.g.
3883 /// `org.apache.pulsar.io.kafka.KafkaSource`).
3884 pub class_name: String,
3885 /// Destination topic the source writes to.
3886 pub topic_name: String,
3887 /// Number of source instances to schedule.
3888 pub parallelism: i32,
3889 /// Connector-specific configuration map. Skipped from JSON when
3890 /// `None` so a `null` does not override the broker default.
3891 #[serde(skip_serializing_if = "Option::is_none")]
3892 pub configs: Option<serde_json::Value>,
3893}
3894
3895/// Java `SinkConfig` — declarative description of a Pulsar IO Sink.
3896/// Mirrors `org.apache.pulsar.common.io.SinkConfig` (Jackson camelCase
3897/// on the wire). The `inputs` slot is the list of source topics the
3898/// sink reads from — the broker accepts either fully-qualified topic
3899/// names or `tenant/namespace/topic` shorthand. Per-connector knobs
3900/// ride in `configs` for the same forward-compat reason as
3901/// [`SourceConfig`].
3902#[derive(Debug, Clone, Default, Deserialize, Serialize)]
3903#[serde(default, rename_all = "camelCase")]
3904pub struct SinkConfig {
3905 /// Tenant owning the sink. Must match the URL path tenant.
3906 pub tenant: String,
3907 /// Namespace owning the sink. Must match the URL path namespace.
3908 pub namespace: String,
3909 /// Sink name. Must match the URL path name.
3910 pub name: String,
3911 /// Fully-qualified connector class (e.g.
3912 /// `org.apache.pulsar.io.jdbc.PostgresJdbcAutoSchemaSink`).
3913 pub class_name: String,
3914 /// Source topics the sink subscribes to.
3915 pub inputs: Vec<String>,
3916 /// Number of sink instances to schedule.
3917 pub parallelism: i32,
3918 /// Connector-specific configuration map. Skipped from JSON when
3919 /// `None` so a `null` does not override the broker default.
3920 #[serde(skip_serializing_if = "Option::is_none")]
3921 pub configs: Option<serde_json::Value>,
3922}
3923
3924/// Pulsar Packages namespace dimension — the `{type}` segment of the
3925/// `/admin/v3/packages/{type}/...` URL. Maps to upstream's
3926/// `PackageType` enum
3927/// (`pulsar-packages-management/pulsar-packages-management-core/.../
3928/// PackageType.java`): the broker only accepts the three lowercase
3929/// tokens `function`, `source`, `sink` and rejects everything else
3930/// with 400. Modelled as a closed Rust enum so the URL builder
3931/// cannot emit a value the broker will refuse.
3932#[derive(Debug, Clone, Copy, PartialEq, Eq)]
3933pub enum PackageType {
3934 /// `function` — Pulsar Functions JAR.
3935 Function,
3936 /// `source` — Pulsar IO Source NAR.
3937 Source,
3938 /// `sink` — Pulsar IO Sink NAR.
3939 Sink,
3940}
3941
3942impl PackageType {
3943 /// Render as the lowercase token the broker URL surface expects.
3944 #[must_use]
3945 pub fn as_str(self) -> &'static str {
3946 match self {
3947 Self::Function => "function",
3948 Self::Source => "source",
3949 Self::Sink => "sink",
3950 }
3951 }
3952}
3953
3954impl std::str::FromStr for PackageType {
3955 type Err = AdminError;
3956
3957 /// Parse from the lowercase tokens the broker emits (`function` /
3958 /// `source` / `sink`). Hyphenated aliases are accepted to make the
3959 /// CLI feel idiomatic — `package-type=source` vs the broker's
3960 /// `source` are equivalent.
3961 fn from_str(s: &str) -> Result<Self, AdminError> {
3962 match s.to_ascii_lowercase().as_str() {
3963 "function" | "functions" => Ok(Self::Function),
3964 "source" | "sources" => Ok(Self::Source),
3965 "sink" | "sinks" => Ok(Self::Sink),
3966 other => Err(AdminError::InvalidName(format!(
3967 "unknown package type {other:?} (expected: function | source | sink)"
3968 ))),
3969 }
3970 }
3971}
3972
3973/// Java `PackageMetadata` — the metadata envelope Pulsar Packages
3974/// attaches to each `(type, tenant, namespace, name, version)` tuple.
3975/// Mirrors `org.apache.pulsar.packages.management.core.common.PackageMetadata`
3976/// (Jackson camelCase on the wire). `modification_time` is a
3977/// broker-side timestamp in milliseconds-since-epoch — the broker
3978/// emits it on `GET` and ignores caller-supplied values on `PUT`
3979/// (overwriting them with the receive timestamp).
3980#[derive(Debug, Clone, Default, Deserialize, Serialize)]
3981#[serde(default, rename_all = "camelCase")]
3982pub struct PackageMetadata {
3983 /// Free-form package description.
3984 pub description: String,
3985 /// Maintainer contact (typically an email or team handle).
3986 pub contact: String,
3987 /// Last-modification timestamp in ms-since-epoch. Read-only for
3988 /// callers — the broker overwrites the value on `PUT`.
3989 pub modification_time: i64,
3990 /// Arbitrary key/value labels (release notes, CI ids, etc.).
3991 pub properties: std::collections::HashMap<String, String>,
3992}
3993
3994/// Java `BacklogQuotaType` — selects which dimension a `BacklogQuota`
3995/// entry limits.
3996#[derive(Debug, Clone, Copy, PartialEq, Eq)]
3997pub enum BacklogQuotaType {
3998 /// Bytes-on-disk dimension. Uses `BacklogQuota::limit_size`.
3999 DestinationStorage,
4000 /// Message-age dimension. Uses `BacklogQuota::limit_time`.
4001 MessageAge,
4002}
4003
4004impl BacklogQuotaType {
4005 /// Render as the lowercase `snake_case` value the broker REST surface
4006 /// expects in the `backlogQuotaType` query parameter.
4007 #[must_use]
4008 pub fn as_query_value(self) -> &'static str {
4009 match self {
4010 Self::DestinationStorage => "destination_storage",
4011 Self::MessageAge => "message_age",
4012 }
4013 }
4014}
4015
4016/// Java `LongRunningProcessStatus` — the polling shape for triggered
4017/// background jobs (compaction, offload). The broker returns one of four
4018/// `status` values: `NOT_RUN` (never triggered), `RUNNING`, `SUCCESS`,
4019/// `ERROR`. `last_error` is populated only on `ERROR`.
4020#[derive(Debug, Clone, Default, Deserialize, Serialize)]
4021#[serde(default, rename_all = "camelCase")]
4022pub struct LongRunningProcessStatus {
4023 /// Job state — `NOT_RUN`, `RUNNING`, `SUCCESS`, or `ERROR`.
4024 pub status: String,
4025 /// Human-readable error message, present on `ERROR`.
4026 pub last_error: String,
4027}
4028
4029/// Request body for `POST .../subscription/{sub}/resetcursor` (Java
4030/// `ResetCursorData`). The CLI exposes `message_id` and `is_excluded`;
4031/// Pulsar's `batchIndexes` / `properties` fields are not currently set
4032/// — they exist for transactional dedup metadata and would require
4033/// txn-aware callers anyway.
4034///
4035/// `ledger_id` / `entry_id` are `i64` on the wire because Pulsar
4036/// Jackson-binds them to Java `long`. `MessageId` on the Rust side uses
4037/// `u64` (matching the wire-protocol envelope), with `u64::MAX` as the
4038/// `EARLIEST` / `LATEST` sentinel — the conversion below maps those
4039/// sentinels to `-1` (the Java sentinel) so a reset-to-earliest /
4040/// reset-to-latest doesn't overflow Java's `long` parser.
4041#[derive(Debug, Serialize)]
4042#[serde(rename_all = "camelCase")]
4043struct ResetCursorData {
4044 ledger_id: i64,
4045 entry_id: i64,
4046 partition_index: i32,
4047 batch_index: i32,
4048 #[serde(rename = "isExcluded")]
4049 is_excluded: bool,
4050}
4051
4052/// Map a `MessageId` u64 ledger/entry id onto Pulsar's Java-`long`
4053/// wire field, translating the Rust-side `u64::MAX` sentinel (used by
4054/// `MessageId::EARLIEST` / `LATEST`) to Java's `-1` sentinel.
4055/// Non-sentinel values are passed through verbatim — Pulsar's
4056/// `LedgerHandle` / `EntryId` indices fit in `i64::MAX` long before
4057/// overflowing.
4058#[inline]
4059fn message_id_field_for_wire(value: u64) -> i64 {
4060 if value == u64::MAX {
4061 -1
4062 } else {
4063 // Pulsar entry indices fit in i63 — `as i64` cannot overflow
4064 // for any legitimate broker-emitted value.
4065 value as i64
4066 }
4067}
4068
4069/// Builder for [`AdminClient`].
4070#[derive(Debug, Default)]
4071pub struct AdminClientBuilder {
4072 base_url: Option<Url>,
4073 auth: AdminAuth,
4074 timeout: Option<Duration>,
4075 /// Extra CA root, PEM-encoded, added to reqwest's trust store. Mirrors
4076 /// pulsarctl's `tls_trust_certs_file_path`.
4077 tls_trust_cert_pem: Option<Vec<u8>>,
4078 /// Disable certificate verification. Mirrors pulsarctl's
4079 /// `tls_allow_insecure_connection`. **Insecure** — defeats MITM
4080 /// protection; only for self-signed dev brokers.
4081 tls_allow_insecure: bool,
4082}
4083
4084impl AdminClientBuilder {
4085 /// Set the service URL — the base for `/admin/v2/...`. Required.
4086 #[must_use]
4087 pub fn service_url(mut self, url: Url) -> Self {
4088 self.base_url = Some(url);
4089 self
4090 }
4091
4092 /// Configure bearer-token auth (`Authorization: Bearer <token>`).
4093 #[must_use]
4094 pub fn token(mut self, token: String) -> Self {
4095 self.auth = AdminAuth::Token(token);
4096 self
4097 }
4098
4099 /// Configure `OAuth2` `client_credentials` auth. The shared flow's token
4100 /// cache is refreshed on demand at request time (see [`AdminAuth::OAuth2`]).
4101 #[must_use]
4102 pub fn oauth2(mut self, flow: Arc<ClientCredentialsFlow>) -> Self {
4103 self.auth = AdminAuth::OAuth2(flow);
4104 self
4105 }
4106
4107 /// Add a custom CA root (PEM bytes) to the HTTPS trust store. Mirrors
4108 /// pulsarctl's `tls_trust_certs_file_path`. The CLI reads the file and
4109 /// passes its bytes here.
4110 #[must_use]
4111 pub fn tls_trust_cert_pem(mut self, pem: Vec<u8>) -> Self {
4112 self.tls_trust_cert_pem = Some(pem);
4113 self
4114 }
4115
4116 /// Disable TLS certificate verification (`danger_accept_invalid_certs`).
4117 /// Mirrors pulsarctl's `tls_allow_insecure_connection`. **Insecure** —
4118 /// only for self-signed dev brokers; never in production.
4119 #[must_use]
4120 pub fn tls_allow_insecure(mut self, allow: bool) -> Self {
4121 self.tls_allow_insecure = allow;
4122 self
4123 }
4124
4125 /// Override the request timeout. Defaults to [`DEFAULT_TIMEOUT`].
4126 #[must_use]
4127 pub fn timeout(mut self, dur: Duration) -> Self {
4128 self.timeout = Some(dur);
4129 self
4130 }
4131
4132 /// Build the client.
4133 pub fn build(self) -> Result<AdminClient, AdminError> {
4134 let base_url = self
4135 .base_url
4136 .ok_or_else(|| AdminError::Builder("service_url is required".into()))?;
4137 if base_url.cannot_be_a_base() {
4138 return Err(AdminError::Builder(format!(
4139 "service_url cannot be a base url: {base_url}"
4140 )));
4141 }
4142 // Anchor every V2 API call below `/admin/v2/` and every V3 call
4143 // below `/admin/v3/`. We append the suffix here so callers pass
4144 // plain `http://broker:8080` rather than baking either prefix in.
4145 //
4146 // `Url::join` follows WHATWG semantics: if `base_url` has no
4147 // trailing slash, its last path segment is replaceable. So
4148 // `http://broker/pulsar` + `admin/v2/` would yield
4149 // `http://broker/admin/v2/` — the `pulsar` prefix silently
4150 // dropped (common for path-prefixed K8s ingresses). Normalise
4151 // the base to end with `/` first so the join always appends.
4152 let base_url = {
4153 let mut b = base_url.clone();
4154 if !b.path().ends_with('/') {
4155 b.set_path(&format!("{}/", b.path()));
4156 }
4157 b
4158 };
4159 let base_url_v3 = base_url.join("admin/v3/")?;
4160 let base_url = base_url.join("admin/v2/")?;
4161
4162 // reqwest 0.13 panics in `Client::builder().build()` when the active
4163 // `rustls` flavor is `rustls-no-provider` and no global
4164 // `CryptoProvider` is installed. That happens whenever more than one
4165 // `crypto-*` feature is unified (e.g. default `crypto-aws-lc-rs`
4166 // plus an explicit `crypto-ring`), so install the default here —
4167 // the shim is idempotent and a no-op once a provider is set, which
4168 // covers parallel callers and processes that also boot the tokio
4169 // engine.
4170 tls_crypto::install_default_provider();
4171
4172 let timeout = self.timeout.unwrap_or(DEFAULT_TIMEOUT);
4173 let http_builder = reqwest::Client::builder().timeout(timeout);
4174 // The custom-CA / allow-insecure reqwest knobs only exist when a
4175 // rustls TLS feature (`__tls`) is compiled in, which every `crypto-*`
4176 // feature enables. The binary crate always selects one provider, so
4177 // this branch is live in production; a no-crypto library build that
4178 // asks for TLS options gets a clear builder error rather than a
4179 // silently-ignored option.
4180 let http = apply_tls_options(
4181 http_builder,
4182 self.tls_trust_cert_pem,
4183 self.tls_allow_insecure,
4184 )?
4185 .build()
4186 .map_err(AdminError::Http)?;
4187
4188 Ok(AdminClient {
4189 base_url,
4190 base_url_v3,
4191 http,
4192 auth: self.auth,
4193 })
4194 }
4195}
4196
4197/// Errors returned by the admin client.
4198#[derive(Debug, thiserror::Error)]
4199pub enum AdminError {
4200 /// Transport-layer error from `reqwest`.
4201 #[error("http error: {0}")]
4202 Http(#[from] reqwest::Error),
4203 /// API returned a non-success HTTP status.
4204 ///
4205 /// `method` and `url` identify the exact request that failed, so an
4206 /// operator can tell at a glance whether they hit the wrong endpoint,
4207 /// cluster, or proxy without re-deriving the URL from the call site.
4208 #[error("api error {code} from {method} {url}: {body}")]
4209 Status {
4210 /// HTTP method of the failed request (`GET`, `POST`, …).
4211 method: String,
4212 /// Full request URL the client dispatched.
4213 url: String,
4214 /// HTTP status code.
4215 code: u16,
4216 /// Response body (or a placeholder if reading the body failed).
4217 body: String,
4218 },
4219 /// A 2xx response body could not be decoded as the expected JSON.
4220 ///
4221 /// Carries the request `method` + `url`, the HTTP `status`, the
4222 /// response `content_type`, and a truncated body `snippet` so the
4223 /// failure is self-diagnosing: a `text/html` body or an empty
4224 /// `<none>` content-type on a 200 almost always means the request
4225 /// was answered by the wrong endpoint, a reverse proxy, or an auth
4226 /// redirect rather than the broker's admin API. Replaces the bare
4227 /// `serde_json` "expected value at line 1 column 1" message.
4228 #[error(
4229 "unexpected response from {method} {url}: HTTP {status}, content-type {content_type}, body: {snippet}"
4230 )]
4231 Decode {
4232 /// HTTP method of the request whose response failed to decode.
4233 method: String,
4234 /// Full request URL the client dispatched.
4235 url: String,
4236 /// HTTP status code of the response (always 2xx here).
4237 status: u16,
4238 /// Response `Content-Type` header, or `<none>` when absent.
4239 content_type: String,
4240 /// First ~256 bytes of the body (UTF-8 lossy), truncated with an
4241 /// ellipsis marker when longer.
4242 snippet: String,
4243 /// The underlying serde decode error.
4244 #[source]
4245 source: serde_json::Error,
4246 },
4247 /// JSON encode error — building a request body from a Rust value.
4248 ///
4249 /// Response-decode failures surface as [`AdminError::Decode`]; this
4250 /// variant covers the rare serialization-side failure (e.g.
4251 /// `build_url_config_multipart`).
4252 #[error("json encode: {0}")]
4253 Json(#[from] serde_json::Error),
4254 /// URL parse / construction error.
4255 #[error("invalid url: {0}")]
4256 Url(#[from] url::ParseError),
4257 /// Builder configuration error (missing service URL, invalid argument...).
4258 #[error("invalid builder: {0}")]
4259 Builder(String),
4260 /// Authentication failure — e.g. the `OAuth2` `client_credentials` exchange
4261 /// failed or returned an empty access token.
4262 #[error("auth error: {0}")]
4263 Auth(String),
4264 /// Caller passed a namespace or topic name that the client could not parse.
4265 #[error("invalid name: {0}")]
4266 InvalidName(String),
4267 /// Broker returned a response that violates the documented wire contract
4268 /// (e.g. negative `ledgerId` from `getMessageIdByIndex`, which Java
4269 /// `MessageIdImpl` cannot represent either).
4270 #[error("broker protocol violation: {0}")]
4271 Protocol(String),
4272}
4273
4274/// Apply the pulsarctl-derived TLS options (custom CA root + allow-insecure)
4275/// to a reqwest [`ClientBuilder`].
4276///
4277/// `add_root_certificate` / `danger_accept_invalid_certs` / `Certificate`
4278/// live behind reqwest's `__tls` cfg, enabled by every `crypto-*` feature.
4279/// The two-variant cfg split keeps a no-crypto library build compiling: with
4280/// no TLS feature, asking for either option is a hard builder error rather
4281/// than a silently-dropped knob.
4282#[cfg(any(
4283 feature = "crypto-aws-lc-rs",
4284 feature = "crypto-ring",
4285 feature = "crypto-openssl",
4286 feature = "crypto-fips",
4287))]
4288fn apply_tls_options(
4289 mut builder: reqwest::ClientBuilder,
4290 trust_cert_pem: Option<Vec<u8>>,
4291 allow_insecure: bool,
4292) -> Result<reqwest::ClientBuilder, AdminError> {
4293 // Custom CA trust (pulsarctl `tls_trust_certs_file_path`). reqwest's
4294 // `add_root_certificate` adds the cert *alongside* the platform roots,
4295 // matching Pulsar's "extra trust anchor" semantics.
4296 if let Some(pem) = trust_cert_pem {
4297 let cert = reqwest::Certificate::from_pem(&pem)
4298 .map_err(|err| AdminError::Builder(format!("invalid tls trust cert PEM: {err}")))?;
4299 builder = builder.add_root_certificate(cert);
4300 }
4301 // Allow-insecure (pulsarctl `tls_allow_insecure_connection`). Defeats
4302 // certificate verification — only meaningful against self-signed dev
4303 // brokers, hence gated behind the explicit opt-in flag.
4304 if allow_insecure {
4305 builder = builder.danger_accept_invalid_certs(true);
4306 }
4307 Ok(builder)
4308}
4309
4310/// No-TLS fallback: a library build with no `crypto-*` feature cannot honour
4311/// TLS options. Accept the builder unchanged when no option is requested;
4312/// error clearly when one is.
4313#[cfg(not(any(
4314 feature = "crypto-aws-lc-rs",
4315 feature = "crypto-ring",
4316 feature = "crypto-openssl",
4317 feature = "crypto-fips",
4318)))]
4319fn apply_tls_options(
4320 builder: reqwest::ClientBuilder,
4321 trust_cert_pem: Option<Vec<u8>>,
4322 allow_insecure: bool,
4323) -> Result<reqwest::ClientBuilder, AdminError> {
4324 if trust_cert_pem.is_some() || allow_insecure {
4325 return Err(AdminError::Builder(
4326 "TLS options (trust cert / allow-insecure) require a crypto-* feature".to_owned(),
4327 ));
4328 }
4329 Ok(builder)
4330}
4331
4332/// Attach `Authorization: Bearer <tok>` to a request builder.
4333///
4334/// Shared by the `Token` and `OAuth2` auth arms — both ultimately set a
4335/// bearer header; only the source of the token bytes differs.
4336fn bearer(req: RequestBuilder, tok: &str) -> Result<RequestBuilder, AdminError> {
4337 let value = format!("Bearer {tok}");
4338 let mut headers = HeaderMap::new();
4339 let header_value = HeaderValue::from_str(&value)
4340 .map_err(|err| AdminError::Builder(format!("invalid bearer token: {err}")))?;
4341 headers.insert(AUTHORIZATION, header_value);
4342 Ok(req.headers(headers))
4343}
4344
4345/// Maximum number of body bytes echoed back in [`AdminError::Decode`].
4346/// Enough to recognise an HTML error page or a plain-text proxy banner
4347/// without dumping a whole payload into the error message.
4348const DECODE_SNIPPET_LIMIT: usize = 256;
4349
4350/// Build an [`AdminError::Decode`] from a failed-to-decode response's
4351/// parts. Centralises content-type extraction and body-snippet
4352/// truncation so every JSON decoder reports the same enriched context.
4353fn decode_error(
4354 method: &Method,
4355 url: &Url,
4356 status: u16,
4357 headers: &HeaderMap,
4358 body: &[u8],
4359 source: serde_json::Error,
4360) -> AdminError {
4361 let content_type = headers
4362 .get(reqwest::header::CONTENT_TYPE)
4363 .and_then(|v| v.to_str().ok())
4364 .map_or_else(|| "<none>".to_owned(), ToOwned::to_owned);
4365 let snippet = if body.len() > DECODE_SNIPPET_LIMIT {
4366 format!(
4367 "{}… (truncated)",
4368 String::from_utf8_lossy(&body[..DECODE_SNIPPET_LIMIT])
4369 )
4370 } else {
4371 String::from_utf8_lossy(body).into_owned()
4372 };
4373 AdminError::Decode {
4374 method: method.to_string(),
4375 url: url.to_string(),
4376 status,
4377 content_type,
4378 snippet,
4379 source,
4380 }
4381}
4382
4383/// Decode a non-error JSON response body.
4384async fn json_ok<T>(api: ApiResponse) -> Result<T, AdminError>
4385where
4386 T: for<'de> Deserialize<'de>,
4387{
4388 let api = ensure_status(api).await?;
4389 let ApiResponse { method, url, resp } = api;
4390 let status = resp.status().as_u16();
4391 let headers = resp.headers().clone();
4392 let bytes = resp.bytes().await?;
4393 serde_json::from_slice(&bytes)
4394 .map_err(|err| decode_error(&method, &url, status, &headers, &bytes, err))
4395}
4396
4397/// Decode a JSON response body, defaulting to `T::default()` when the
4398/// broker returns 204 / empty body / literal `null`. Pulsar's
4399/// "policy-unset" pattern collapses to one of those three encodings:
4400///
4401/// - `getRetention` / `getPersistence` / `getDispatchRate` / `getPublishRate` on a fresh namespace
4402/// surface `null` through Jersey's `resume(null)`, which travels as 204 No Content;
4403/// - `getTopicPolicies` on a topic with no explicit policy returns either 204 or the literal JSON
4404/// `null`.
4405///
4406/// `json_ok::<RetentionPolicies>` errors with `EOF while parsing a
4407/// value` for either case; this helper maps them to
4408/// `RetentionPolicies::default()` (broker semantic: missing fields ==
4409/// broker default). Callers asserting "post-remove returns the
4410/// default" stay correct without changing the public signature.
4411async fn json_ok_or_default<T>(api: ApiResponse) -> Result<T, AdminError>
4412where
4413 T: for<'de> Deserialize<'de> + Default,
4414{
4415 let api = ensure_status(api).await?;
4416 let ApiResponse { method, url, resp } = api;
4417 // Tolerant short-circuits MUST run before any serde attempt.
4418 if resp.status() == StatusCode::NO_CONTENT {
4419 return Ok(T::default());
4420 }
4421 let status = resp.status().as_u16();
4422 let headers = resp.headers().clone();
4423 let bytes = resp.bytes().await?;
4424 if bytes.is_empty() {
4425 return Ok(T::default());
4426 }
4427 // `null` body — broker says "policy unset" — also folds to default.
4428 if bytes.as_ref().trim_ascii() == b"null" {
4429 return Ok(T::default());
4430 }
4431 serde_json::from_slice::<T>(&bytes)
4432 .map_err(|err| decode_error(&method, &url, status, &headers, &bytes, err))
4433}
4434
4435/// Decode a JSON response body that the broker may emit as an empty
4436/// payload to mean "no value here". Pulsar's pattern for "no override
4437/// set" on the policy GET endpoints is to return `204 No Content` (or
4438/// a 200 with an empty body) rather than the literal JSON `null`;
4439/// `serde_json::from_slice::<Option<T>>(b"")` errors with `EOF while
4440/// parsing a value`, so every `namespace_get_*` / `topic_get_*` that
4441/// returns `Option<T>` needs this tolerant decoder.
4442///
4443/// Decoding rules:
4444/// - 204 No Content → `Ok(None)`
4445/// - 2xx with empty body bytes → `Ok(None)`
4446/// - 2xx with the literal `null` → `Ok(None)`
4447/// - 2xx with a JSON value → `Ok(Some(value))` via serde
4448async fn json_ok_optional<T>(api: ApiResponse) -> Result<Option<T>, AdminError>
4449where
4450 T: for<'de> Deserialize<'de>,
4451{
4452 let api = ensure_status(api).await?;
4453 let ApiResponse { method, url, resp } = api;
4454 // Tolerant short-circuits MUST run before any serde attempt.
4455 if resp.status() == StatusCode::NO_CONTENT {
4456 return Ok(None);
4457 }
4458 let status = resp.status().as_u16();
4459 let headers = resp.headers().clone();
4460 let bytes = resp.bytes().await?;
4461 if bytes.is_empty() {
4462 return Ok(None);
4463 }
4464 serde_json::from_slice::<Option<T>>(&bytes)
4465 .map_err(|err| decode_error(&method, &url, status, &headers, &bytes, err))
4466}
4467
4468/// Discard a successful no-content response body.
4469async fn empty_ok(api: ApiResponse) -> Result<(), AdminError> {
4470 let _ = ensure_status(api).await?;
4471 Ok(())
4472}
4473
4474/// Convert a non-success response into [`AdminError::Status`]. Returns the
4475/// original [`ApiResponse`] on 2xx so the caller can decode the body.
4476///
4477/// The `Status` error carries the request method + URL so a failure
4478/// names the exact endpoint hit, not just the status code and body.
4479async fn ensure_status(api: ApiResponse) -> Result<ApiResponse, AdminError> {
4480 let status = api.resp.status();
4481 if status.is_success() || status == StatusCode::NO_CONTENT {
4482 return Ok(api);
4483 }
4484 let code = status.as_u16();
4485 let ApiResponse { method, url, resp } = api;
4486 let body = resp
4487 .text()
4488 .await
4489 .unwrap_or_else(|err| format!("<failed to read body: {err}>"));
4490 Err(AdminError::Status {
4491 method: method.to_string(),
4492 url: url.to_string(),
4493 code,
4494 body,
4495 })
4496}
4497
4498/// Split a `tenant/namespace` string into its two segments.
4499/// Reject path segments the `url` crate would silently rewrite. `.` and `..`
4500/// disappear under RFC 3986 dot-segment normalisation; percent-encoded slash
4501/// (`%2F` / `%2f`) lets a hostile name escape its segment; NUL / ASCII
4502/// control bytes have no place in an admin path. Refusing all of these at
4503/// the input boundary keeps the URL the client builds in lock-step with the
4504/// path the broker eventually parses.
4505fn validate_segment(segment: &str) -> Result<(), AdminError> {
4506 if segment.is_empty() {
4507 return Err(AdminError::InvalidName("empty path segment".into()));
4508 }
4509 if segment == "." || segment == ".." {
4510 return Err(AdminError::InvalidName(format!(
4511 "dot segment is not a valid name: {segment:?}",
4512 )));
4513 }
4514 if segment.contains("%2F") || segment.contains("%2f") {
4515 return Err(AdminError::InvalidName(format!(
4516 "percent-encoded slash in segment: {segment:?}",
4517 )));
4518 }
4519 if segment.bytes().any(|b| b < 0x20 || b == 0x7f) {
4520 return Err(AdminError::InvalidName(format!(
4521 "control byte in segment: {segment:?}",
4522 )));
4523 }
4524 Ok(())
4525}
4526
4527/// Split a `tenant/namespace` string into its two segments.
4528///
4529/// Exposed for the CLI (and any other admin-client wrapper) so the
4530/// `tenant/namespace` shape used by the namespace-scoped list verbs
4531/// (`functions list`, `sources list`, `sinks list`, `packages list`)
4532/// validates against the same `validate_segment` rules every admin
4533/// method enforces internally — no parallel parsers, no divergent
4534/// error categories.
4535pub fn split_namespace(ns: &str) -> Result<(&str, &str), AdminError> {
4536 let (tenant, namespace) = ns.split_once('/').ok_or_else(|| {
4537 AdminError::InvalidName(format!("expected tenant/namespace, got {ns:?} (no '/')"))
4538 })?;
4539 if tenant.is_empty() || namespace.is_empty() || namespace.contains('/') {
4540 return Err(AdminError::InvalidName(format!(
4541 "expected tenant/namespace, got {ns:?}"
4542 )));
4543 }
4544 validate_segment(tenant)?;
4545 validate_segment(namespace)?;
4546 Ok((tenant, namespace))
4547}
4548
4549/// Split a `persistent://tenant/namespace/topic` (or `tenant/namespace/topic`)
4550/// into its three path segments. The scheme is optional; if present it must
4551/// be `persistent://`.
4552fn split_topic(topic: &str) -> Result<(&str, &str, &str), AdminError> {
4553 let rest = topic.strip_prefix("persistent://").unwrap_or(topic);
4554 let mut parts = rest.splitn(3, '/');
4555 let tenant = parts.next().unwrap_or("");
4556 let namespace = parts.next().unwrap_or("");
4557 let name = parts.next().unwrap_or("");
4558 if tenant.is_empty() || namespace.is_empty() || name.is_empty() || name.contains('/') {
4559 return Err(AdminError::InvalidName(format!(
4560 "expected [persistent://]tenant/namespace/topic, got {topic:?}"
4561 )));
4562 }
4563 validate_segment(tenant)?;
4564 validate_segment(namespace)?;
4565 validate_segment(name)?;
4566 Ok((tenant, namespace, name))
4567}
4568
4569/// Build the two-part `multipart/form-data` envelope the broker expects
4570/// on URL-based function register / update calls. Order is fixed (`url`
4571/// then `functionConfig`) so wire-level tests can pin the body shape.
4572/// The `functionConfig` part carries an explicit `application/json`
4573/// content type; without it the broker's Jersey
4574/// `FormDataMultiPartFeature` falls back to `text/plain` and refuses
4575/// the JSON.
4576fn function_pkg_form(
4577 pkg_url: &str,
4578 config: &FunctionConfig,
4579) -> Result<reqwest::multipart::Form, AdminError> {
4580 build_url_config_multipart(pkg_url, "functionConfig", config)
4581}
4582
4583/// Split a `tenant/namespace/name` Functions / IO identifier into its
4584/// three segments. Pulsar Functions never carry a `persistent://`
4585/// scheme prefix (functions are not topics), so the parser is stricter
4586/// than the internal `split_topic` (rustdoc cannot resolve the bare
4587/// identifier because `split_topic` is module-private).
4588///
4589/// Exposed for the CLI, which parses the fully qualified name out of a
4590/// single positional argument before calling the admin methods (which
4591/// take `tenant`, `namespace`, `name` separately so the broker's
4592/// per-segment validation maps 1:1 to the URL path).
4593pub fn split_function_id(id: &str) -> Result<(&str, &str, &str), AdminError> {
4594 let mut parts = id.splitn(3, '/');
4595 let tenant = parts.next().unwrap_or("");
4596 let namespace = parts.next().unwrap_or("");
4597 let name = parts.next().unwrap_or("");
4598 if tenant.is_empty() || namespace.is_empty() || name.is_empty() || name.contains('/') {
4599 return Err(AdminError::InvalidName(format!(
4600 "expected tenant/namespace/name, got {id:?}"
4601 )));
4602 }
4603 validate_segment(tenant)?;
4604 validate_segment(namespace)?;
4605 validate_segment(name)?;
4606 Ok((tenant, namespace, name))
4607}
4608
4609/// Build the two-part `multipart/form-data` body Pulsar IO's
4610/// `register*` / `update*` endpoints expect when the package is
4611/// referenced by URL (`http(s)://`, `file://`, `function://`): a `url`
4612/// text part and a `<config_field>` JSON part. The broker enforces
4613/// both parts at the dispatcher boundary
4614/// (`SourcesBase#registerSource`, `SinksBase#registerSink`) — missing
4615/// either yields 400. Generic over the config type so Functions,
4616/// Sources, and Sinks share one helper (`functionConfig` /
4617/// `sourceConfig` / `sinkConfig` via the `config_field` argument).
4618fn build_url_config_multipart<T: Serialize>(
4619 pkg_url: &str,
4620 config_field: &str,
4621 config: &T,
4622) -> Result<reqwest::multipart::Form, AdminError> {
4623 let body = serde_json::to_string(config)?;
4624 // `mime_str` only fails on a malformed string; the literal we pass
4625 // is well-formed so the `expect` is on a never-taken branch.
4626 let config_part = reqwest::multipart::Part::text(body)
4627 .mime_str("application/json")
4628 .expect("application/json is a well-formed media type");
4629 let form = reqwest::multipart::Form::new()
4630 .text("url", pkg_url.to_owned())
4631 .part(config_field.to_owned(), config_part);
4632 Ok(form)
4633}
4634
4635#[cfg(test)]
4636mod tests {
4637 use super::*;
4638
4639 #[test]
4640 fn builder_requires_service_url() {
4641 let err = AdminClient::builder().build().unwrap_err();
4642 assert!(matches!(err, AdminError::Builder(_)));
4643 }
4644
4645 #[test]
4646 fn builder_appends_admin_v2_prefix() {
4647 let client = AdminClient::builder()
4648 .service_url("http://localhost:8080".parse().unwrap())
4649 .build()
4650 .unwrap();
4651 assert_eq!(
4652 client.base_url().as_str(),
4653 "http://localhost:8080/admin/v2/"
4654 );
4655 }
4656
4657 #[test]
4658 fn builder_carries_token() {
4659 let client = AdminClient::builder()
4660 .service_url("http://localhost:8080".parse().unwrap())
4661 .token("abc".into())
4662 .build()
4663 .unwrap();
4664 assert!(matches!(client.auth(), AdminAuth::Token(t) if t == "abc"));
4665 }
4666
4667 #[test]
4668 fn admin_auth_token_debug_redacts_secret() {
4669 let auth = AdminAuth::Token("super-secret-jwt".to_owned());
4670 let rendered = format!("{auth:?}");
4671 assert!(
4672 !rendered.contains("super-secret-jwt"),
4673 "raw token leaked through Debug: {rendered}",
4674 );
4675 assert!(
4676 rendered.contains("<redacted>"),
4677 "expected redaction sentinel in {rendered}"
4678 );
4679 assert!(
4680 rendered.contains("Token"),
4681 "expected variant name in {rendered}"
4682 );
4683
4684 let none_rendered = format!("{:?}", AdminAuth::None);
4685 assert_eq!(none_rendered, "None");
4686 }
4687
4688 #[test]
4689 fn admin_client_debug_does_not_leak_token() {
4690 let client = AdminClient::builder()
4691 .service_url("http://localhost:8080".parse().unwrap())
4692 .token("leaky-token".into())
4693 .build()
4694 .unwrap();
4695 let rendered = format!("{client:?}");
4696 assert!(
4697 !rendered.contains("leaky-token"),
4698 "raw token leaked through AdminClient Debug: {rendered}",
4699 );
4700 }
4701
4702 #[test]
4703 fn split_namespace_ok() {
4704 assert_eq!(
4705 split_namespace("public/default").unwrap(),
4706 ("public", "default")
4707 );
4708 }
4709
4710 #[test]
4711 fn split_namespace_rejects_missing_slash() {
4712 assert!(matches!(
4713 split_namespace("public"),
4714 Err(AdminError::InvalidName(_))
4715 ));
4716 }
4717
4718 #[test]
4719 fn split_namespace_rejects_extra_segment() {
4720 assert!(matches!(
4721 split_namespace("public/default/extra"),
4722 Err(AdminError::InvalidName(_))
4723 ));
4724 }
4725
4726 #[test]
4727 fn split_topic_with_scheme() {
4728 let (t, n, name) = split_topic("persistent://acme/svc/orders").unwrap();
4729 assert_eq!((t, n, name), ("acme", "svc", "orders"));
4730 }
4731
4732 #[test]
4733 fn split_topic_without_scheme() {
4734 let (t, n, name) = split_topic("acme/svc/orders").unwrap();
4735 assert_eq!((t, n, name), ("acme", "svc", "orders"));
4736 }
4737
4738 #[test]
4739 fn split_topic_rejects_short_name() {
4740 assert!(matches!(
4741 split_topic("acme/svc"),
4742 Err(AdminError::InvalidName(_))
4743 ));
4744 }
4745
4746 #[test]
4747 fn split_function_id_ok() {
4748 let (t, n, name) = split_function_id("public/default/my-fn").unwrap();
4749 assert_eq!((t, n, name), ("public", "default", "my-fn"));
4750 }
4751
4752 #[test]
4753 fn split_function_id_rejects_short_name() {
4754 assert!(matches!(
4755 split_function_id("public/default"),
4756 Err(AdminError::InvalidName(_))
4757 ));
4758 }
4759
4760 #[test]
4761 fn split_function_id_rejects_persistent_scheme() {
4762 // Functions never carry a `persistent://` prefix — the parser
4763 // refuses one rather than silently treating the scheme as the
4764 // tenant name (which `validate_segment` would later catch via
4765 // the percent-encoded slash rule, but we want a clearer error
4766 // up front).
4767 assert!(matches!(
4768 split_function_id("persistent://acme/svc/fn"),
4769 Err(AdminError::InvalidName(_))
4770 ));
4771 }
4772
4773 #[test]
4774 fn message_id_response_deserialises_java_camelcase() {
4775 // The exact body shape upstream PIP-415 §"Success Response" advertises.
4776 let json = r#"{"ledgerId":12345,"entryId":67890,"partitionIndex":0}"#;
4777 let dto: MessageIdResponse = serde_json::from_str(json).unwrap();
4778 let msg = dto.try_into_message_id().unwrap();
4779 assert_eq!(msg.ledger_id, 12345);
4780 assert_eq!(msg.entry_id, 67890);
4781 assert_eq!(msg.partition, 0);
4782 // The broker resolves at entry granularity — batch fields are absent
4783 // from the JSON and must default to -1 to match the canonical sentinel.
4784 assert_eq!(msg.batch_index, -1);
4785 assert_eq!(msg.batch_size, -1);
4786 }
4787
4788 #[test]
4789 fn message_id_response_defaults_partition_for_non_partitioned_topic() {
4790 // PIP-415 §"Success Response": `partitionIndex: -1` for non-partitioned
4791 // topics. Some broker versions omit the field entirely on
4792 // non-partitioned topics; serde default keeps us correct in either case.
4793 let json = r#"{"ledgerId":1,"entryId":2}"#;
4794 let dto: MessageIdResponse = serde_json::from_str(json).unwrap();
4795 assert_eq!(dto.try_into_message_id().unwrap().partition, -1);
4796 }
4797
4798 #[test]
4799 fn url_helper_emits_single_slash_after_admin_v2() {
4800 // Regression guard: the previous url() helper appended segments after
4801 // the trailing-slash sentinel of /admin/v2/, producing
4802 // /admin/v2//persistent/... — real brokers tolerated it but strict
4803 // mocks (and Java's PulsarAdmin) emit the single-slash form. Pin the
4804 // current behaviour so we notice any future regression.
4805 let client = AdminClient::builder()
4806 .service_url("http://broker.example:8080".parse().unwrap())
4807 .build()
4808 .unwrap();
4809 let url = client.url(&["clusters"]).unwrap();
4810 assert_eq!(url.as_str(), "http://broker.example:8080/admin/v2/clusters");
4811 let url2 = client
4812 .url(&["persistent", "public", "default", "topic", "stats"])
4813 .unwrap();
4814 assert_eq!(
4815 url2.as_str(),
4816 "http://broker.example:8080/admin/v2/persistent/public/default/topic/stats"
4817 );
4818 }
4819
4820 #[test]
4821 fn url_helper_preserves_path_prefix_without_trailing_slash() {
4822 // Regression guard: per WHATWG URL semantics, `Url::join("admin/v2/")`
4823 // on a base whose path has no trailing slash REPLACES the last segment
4824 // — `http://broker/pulsar` + `admin/v2/` becomes
4825 // `http://broker/admin/v2/`, silently dropping `/pulsar`. The builder
4826 // normalises the base to end in `/` before joining so any operator
4827 // running behind a path-prefixed K8s ingress (`--admin-url
4828 // http://gw/pulsar`) gets the right URLs on both V2 and V3 endpoints.
4829 let client = AdminClient::builder()
4830 .service_url("http://broker.example:8080/pulsar".parse().unwrap())
4831 .build()
4832 .unwrap();
4833 let url = client.url(&["clusters"]).unwrap();
4834 assert_eq!(
4835 url.as_str(),
4836 "http://broker.example:8080/pulsar/admin/v2/clusters"
4837 );
4838 let url_v3 = client.url_v3(&["functions", "a", "b"]).unwrap();
4839 assert_eq!(
4840 url_v3.as_str(),
4841 "http://broker.example:8080/pulsar/admin/v3/functions/a/b"
4842 );
4843 }
4844
4845 #[test]
4846 fn split_topic_rejects_dot_segments() {
4847 // LISA-001: `..` / `.` in any segment would silently normalise out via
4848 // url::Url::path_segments_mut, producing a client/server URL parser
4849 // differential. Refuse them at the input boundary.
4850 assert!(matches!(
4851 split_topic("persistent://../foo/bar"),
4852 Err(AdminError::InvalidName(_))
4853 ));
4854 assert!(matches!(
4855 split_topic("./foo/bar"),
4856 Err(AdminError::InvalidName(_))
4857 ));
4858 assert!(matches!(
4859 split_topic("tenant/./topic"),
4860 Err(AdminError::InvalidName(_))
4861 ));
4862 }
4863
4864 #[test]
4865 fn split_topic_rejects_control_bytes_and_percent_encoded_slash() {
4866 assert!(matches!(
4867 split_topic("tenant/ns/topic%2Fevil"),
4868 Err(AdminError::InvalidName(_))
4869 ));
4870 assert!(matches!(
4871 split_topic("tenant/ns/top\0ic"),
4872 Err(AdminError::InvalidName(_))
4873 ));
4874 }
4875}