1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
//
// ░▀█▀░█▀▀░█▀█░█▀▄░█▀█░█▀▀░█░░░█▀▀
// ░░█░░▀▀█░█░█░█▀▄░█▀█░█░░░█░░░█▀▀
// ░░▀░░▀▀▀░▀▀▀░▀░▀░▀░▀░▀▀▀░▀▀▀░▀▀▀
//
// tsoracle — Distributed Timestamp Oracle
//
// Copyright (c) 2026 Prisma Risk
// Licensed under the Apache License, Version 2.0
// https://github.com/prisma-risk/tsoracle
//
//! Leader-state stream derived from `Raft::metrics()`.
//!
//! [`leadership_events`] converts openraft's metrics watch into a stream of
//! [`LeadershipState`] transitions. Every change in role, term, or
//! follower-leader identity is emitted; consecutive *identical* projections
//! are suppressed so steady-state metric ticks don't produce a stream of
//! duplicate events.
//!
//! Dedup is full-value (`LeadershipState<C>: PartialEq`), not role-class only.
//! Class-only dedup is unsafe here: the underlying watch coalesces between
//! polls, so a `Leader(N) → Follower(M) → Leader(K)` sequence that lands
//! during a slow poll would be silently swallowed (both endpoints are
//! "Leader"), and the downstream failover fence — which only runs on
//! `Leader` events — would miss the new term. See issue #77.
//!
//! The metrics watch in alpha.20 is `WatchReceiverOf<C, RaftMetrics<C>>` — a
//! runtime-abstracted receiver, not a plain `tokio::sync::watch::Receiver`. The
//! stream is therefore built by hand on top of `WatchReceiver::changed` /
//! `borrow_watched` rather than via `tokio_stream::wrappers::WatchStream`.
//!
//! The very first state observed on the channel is emitted unconditionally; the
//! dedup logic only kicks in for subsequent values.
use Stream;
use WatchReceiver;
use RaftStateMachine;
use WatchReceiverOf;
use ;
/// A coarse projection of openraft's `RaftMetrics::state` (plus the few fields
/// downstream consumers need to act on each role transition).
///
/// `Leader::term` carries the term in which this node became leader.
/// `Follower::leader` is the resolved `(NodeId, Node)` pair for the leader this
/// node is currently following, or `None` if no leader is yet known.
/// Build a stream of `LeadershipState` transitions from a raft instance.
///
/// The returned stream:
///
/// 1. Emits one initial `LeadershipState` derived from the current metrics
/// value (always — even on first poll).
/// 2. Emits subsequent values whenever the projection differs from the last
/// emitted one (any change in role, term, or follower-leader identity).
/// Identical projections are suppressed.
/// 3. Terminates when openraft drops its sender (i.e., the raft has shut
/// down).
/// The by-value counterpart to [`leadership_events`]: build the same dedup
/// stream directly from a metrics receiver instead of borrowing a `Raft<C, SM>`.
///
/// Reach for this when you hold a `WatchReceiverOf<C, RaftMetrics<C>>` (e.g.
/// from `Raft::metrics()` cloned out of a host) and need a `'static` stream.
/// [`leadership_events`] borrows the `raft` argument, so the stream it returns
/// is tied to that borrow; when you instead want a stream that outlives the
/// raft handle — to wrap it with endpoint resolution and keep the cluster
/// alive alongside it — take the receiver by value here. `tsoracle-driver-openraft`
/// uses exactly this to build a `'static` leadership stream that carries its
/// host along for the ride.
///
/// The emission and dedup contract is identical to [`leadership_events`]: the
/// first observed state emits unconditionally, subsequent states emit only when
/// the full projection differs (full-value dedup, not role-class only — see
/// issue #77), and the stream terminates when openraft drops its sender.