1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
//! Public `Broker<B>` hub. See DESIGN_V2.md §6.1.
use std::time::Duration;
use crate::backend::Backend;
use crate::backend::capability::HasCoordinatedGroups;
use crate::consumer_group::ConsumerGroup;
use crate::consumer_supervisor::ConsumerSupervisor;
use crate::error::Result;
use crate::publisher::Publisher;
use crate::topology_declarer::TopologyDeclarer;
/// Default deadline for `Broker::ping`. Matches the rest of shove's 5 s
/// timeout constants (kafka `PRODUCE_TIMEOUT`, autoscaler metadata fetch,
/// `MESSAGE_TIMEOUT_MS`). Override via [`Broker::ping_with_timeout`].
pub const DEFAULT_PING_TIMEOUT: Duration = Duration::from_secs(5);
pub struct Broker<B: Backend> {
client: B::Client,
}
impl<B: Backend> Broker<B> {
pub async fn new(config: B::Config) -> Result<Self> {
Ok(Self {
client: B::connect(config).await?,
})
}
pub fn from_client(client: B::Client) -> Self {
Self { client }
}
pub async fn publisher(&self) -> Result<Publisher<B>> {
Ok(Publisher::new(B::make_publisher(&self.client).await?))
}
/// Return a [`ConsumerSupervisor`] for spawning fixed-concurrency consumers.
///
/// # SQS autoscaling note
///
/// For the SQS backend, consumers started through this supervisor are **not**
/// registered in `SqsConsumerGroupRegistry`. Pairing this path with
/// [`autoscaler`](Self::autoscaler) will produce an autoscaler that always
/// observes zero groups. Use [`SqsConsumerGroupRegistry`] directly when
/// autoscaling is required.
///
/// [`SqsConsumerGroupRegistry`]: crate::backends::sns::registry::SqsConsumerGroupRegistry
pub fn consumer_supervisor(&self) -> ConsumerSupervisor<B> {
ConsumerSupervisor::new(&self.client)
}
pub fn topology(&self) -> TopologyDeclarer<B> {
TopologyDeclarer::new(B::make_declarer(&self.client))
}
pub async fn close(&self) {
B::close(&self.client).await
}
/// Verify the broker is reachable. Issues a single bounded RPC against
/// the cluster and returns `Ok(())` iff it completes within
/// [`DEFAULT_PING_TIMEOUT`].
///
/// Designed for liveness / readiness probes:
///
/// - **No retries** — a failed probe is returned to the caller as-is.
/// Probe policy (retry counts, failure thresholds) belongs to the caller
/// (k8s `failureThreshold`, an HTTP middleware, etc.).
/// - **No metrics emitted** — probes are called frequently; recording a
/// metric per call would drown out failure signal.
/// - **Post-`close()` semantics are backend-specific.** Backends with a
/// meaningful close (Kafka, RabbitMQ, NATS, SQS, InMemory) check a
/// shutdown token and return `Err(ShoveError::Connection)` before any
/// I/O. The Redis backend's `close` is a no-op — its connections drop
/// on last `Arc` release — so ping continues to function until the
/// broker itself becomes unreachable.
/// - **Backends may transparently recover stale internal state.**
/// For example, the RabbitMQ backend dials a fresh AMQP connection if
/// the cached one died, librdkafka maintains its own broker connection
/// pool, and async-nats heartbeats keep the underlying connection
/// healthy. A probe that succeeds after such recovery is reported as
/// `Ok(())` — the broker is reachable now, which is what liveness asks.
pub async fn ping(&self) -> Result<()> {
self.ping_with_timeout(DEFAULT_PING_TIMEOUT).await
}
/// Same as [`ping`](Self::ping), with a caller-supplied deadline. Exceeding
/// `timeout` returns `Err(ShoveError::Connection)`.
pub async fn ping_with_timeout(&self, timeout: Duration) -> Result<()> {
B::ping(&self.client, timeout).await
}
/// Return a [`QueueStatsImpl`](crate::backend::Backend::QueueStatsImpl) for
/// reading queue depth from the underlying broker.
pub fn queue_stats_provider(&self) -> B::QueueStatsImpl {
B::make_stats_provider(&self.client)
}
/// Return a [`Backend::AutoscalerImpl`](crate::backend::Backend::AutoscalerImpl)
/// for driving generic autoscaling through the
/// [`AutoscalerBackend`](crate::autoscaler::AutoscalerBackend) interface.
///
/// The returned value implements [`AutoscalerBackend`](crate::autoscaler::AutoscalerBackend)
/// and can be passed directly to
/// [`Autoscaler::new`](crate::autoscaler::Autoscaler::new).
///
/// # SQS autoscaling note
///
/// For the SQS backend, the returned autoscaler queries a
/// `SqsConsumerGroupRegistry`. Groups must be registered through
/// [`SqsConsumerGroupRegistry::register`] — consumers spawned via
/// [`consumer_supervisor`](Self::consumer_supervisor) are **not** visible
/// to the autoscaler and it will always observe zero groups.
///
/// [`SqsConsumerGroupRegistry::register`]: crate::backends::sns::registry::SqsConsumerGroupRegistry::register
pub fn autoscaler(&self) -> B::AutoscalerImpl {
B::make_autoscaler(&self.client)
}
}
impl<B: HasCoordinatedGroups> Broker<B> {
pub fn consumer_group(&self) -> ConsumerGroup<B> {
ConsumerGroup::new(B::make_registry(&self.client))
}
}