tsoracle-openraft-toolkit 0.1.4

Reusable openraft glue: TypeConfig macro, RocksDB log store, lifecycle helpers
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
//
//  ░▀█▀░█▀▀░█▀█░█▀▄░█▀█░█▀▀░█░░░█▀▀
//  ░░█░░▀▀█░█░█░█▀▄░█▀█░█░░░█░░░█▀▀
//  ░░▀░░▀▀▀░▀▀▀░▀░▀░▀░▀░▀▀▀░▀▀▀░▀▀▀
//
//  tsoracle — Distributed Timestamp Oracle
//
//  Copyright (c) 2026 Prisma Risk
//  Licensed under the Apache License, Version 2.0
//  https://github.com/prisma-risk/tsoracle
//

//! Tests for the lifecycle helpers.
//!
//! Real cluster behavior is exercised by downstream consumers' integration
//! tests. The tests here are compile-time signature checks plus pure-function
//! assertions where possible.

use std::collections::BTreeMap;

use tsoracle_openraft_toolkit::BootstrapMode;

mod common;
use common::{TestPeer, TestTypeConfig};

// Verifies the public types compile in the shapes downstream consumers expect.
#[test]
fn bootstrap_mode_constructs_in_each_shape() {
    let mut members: BTreeMap<u64, TestPeer> = BTreeMap::new();
    members.insert(
        1,
        TestPeer {
            addr: "host-1:9000".into(),
        },
    );

    let _fresh: BootstrapMode<TestTypeConfig> = BootstrapMode::Fresh {
        initial_members: members,
    };
    let _reopen: BootstrapMode<TestTypeConfig> = BootstrapMode::Reopen;
    let _join: BootstrapMode<TestTypeConfig> = BootstrapMode::Join;
}

// Verifies the `bootstrap` function has the expected signature.
// Doesn't execute it — we have no Raft<...> instance in a unit test.
#[allow(dead_code)]
fn _bootstrap_signature_compiles<C, SM>(raft: &openraft::Raft<C, SM>, mode: BootstrapMode<C>)
where
    C: openraft::RaftTypeConfig,
    SM: openraft::storage::RaftStateMachine<C>,
{
    let fut = async move { tsoracle_openraft_toolkit::bootstrap(raft, mode).await };
    drop(fut);
}

// Same idea for `change_membership` / `add_learner`: keep a compile-time
// signature check so an openraft bump that shifts the argument shape breaks
// here rather than at the first downstream call site.
#[allow(dead_code)]
fn _change_membership_signature_compiles<C, SM>(
    raft: &openraft::Raft<C, SM>,
    voters: std::collections::BTreeSet<C::NodeId>,
) where
    C: openraft::RaftTypeConfig,
    SM: openraft::storage::RaftStateMachine<C>,
{
    let fut =
        async move { tsoracle_openraft_toolkit::change_membership(raft, voters, false).await };
    drop(fut);
}

#[allow(dead_code)]
fn _add_learner_signature_compiles<C, SM>(
    raft: &openraft::Raft<C, SM>,
    id: C::NodeId,
    node: C::Node,
) where
    C: openraft::RaftTypeConfig,
    SM: openraft::storage::RaftStateMachine<C>,
{
    let fut = async move { tsoracle_openraft_toolkit::add_learner(raft, id, node, false).await };
    drop(fut);
}

// Compile-time signature check for `leadership_events`. Like the other shims,
// this never executes (we have no real `Raft<C, SM>` in a unit test) — its job
// is to break the build if alpha.20 ever shifts the metrics accessor's return
// type or the `Raft<C, SM>` shape.
#[allow(dead_code)]
fn _leadership_events_signature_compiles<C, SM>(
    raft: &openraft::Raft<C, SM>,
) -> impl futures::Stream<Item = tsoracle_openraft_toolkit::LeadershipState<C>>
where
    C: openraft::RaftTypeConfig,
    SM: openraft::storage::RaftStateMachine<C>,
{
    tsoracle_openraft_toolkit::leadership_events(raft)
}

#[tokio::test]
async fn leadership_events_emits_initial_state_and_terminates_on_drop() {
    use futures::StreamExt;
    use openraft::RaftMetrics;
    use openraft::type_config::TypeConfigExt;
    use tsoracle_openraft_toolkit::LeadershipState;

    // Construct a `RaftMetrics<TestTypeConfig>` via the public `new_initial`
    // constructor — `Default` isn't implemented on alpha.20's `RaftMetrics`.
    // `new_initial` produces a Follower-state snapshot with `current_term = 0`
    // and no current leader.
    let metrics: RaftMetrics<TestTypeConfig> = RaftMetrics::new_initial(1u64);

    // Use the type config's own watch channel so the receiver matches the
    // runtime-abstracted alias `WatchReceiverOf<C, RaftMetrics<C>>` exactly.
    let (tx, rx) = <TestTypeConfig as TypeConfigExt>::watch_channel(metrics);

    let mut stream = std::pin::pin!(
        tsoracle_openraft_toolkit::lifecycle::leader::stream_from_receiver::<TestTypeConfig>(rx)
    );

    // Initial state emits unconditionally.
    let first = stream.next().await.expect("initial state emitted");
    assert!(
        matches!(
            first,
            LeadershipState::Follower {
                term: 0,
                leader: None
            }
        ),
        "expected initial Follower {{ term: 0, leader: None }}; got {first:?}",
    );

    // Dropping the sender terminates the stream after any in-flight wait.
    drop(tx);
    assert!(
        stream.next().await.is_none(),
        "stream should terminate when sender drops"
    );
}

#[tokio::test]
async fn leadership_events_dedups_repeated_class_until_transition() {
    use futures::StreamExt;
    use openraft::RaftMetrics;
    use openraft::ServerState;
    use openraft::WatchSender;
    use openraft::type_config::TypeConfigExt;
    use tsoracle_openraft_toolkit::LeadershipState;

    // Start in Follower.
    let initial: RaftMetrics<TestTypeConfig> = RaftMetrics::new_initial(1u64);
    let (tx, rx) = <TestTypeConfig as TypeConfigExt>::watch_channel(initial);

    let mut stream = std::pin::pin!(
        tsoracle_openraft_toolkit::lifecycle::leader::stream_from_receiver::<TestTypeConfig>(rx)
    );

    // First poll yields the initial Follower.
    let first = stream.next().await.expect("initial");
    assert!(
        matches!(first, LeadershipState::Follower { .. }),
        "got {first:?}"
    );

    // Send another Follower-class metrics value — must be swallowed by dedup.
    // We schedule a Leader transition right after so the next stream poll has
    // something to surface, and confirm the in-between Follower update was not
    // emitted as its own item.
    let mut next_follower: RaftMetrics<TestTypeConfig> = RaftMetrics::new_initial(1u64);
    next_follower.current_term = 1;
    tx.send(next_follower).unwrap();

    let mut leader_metrics: RaftMetrics<TestTypeConfig> = RaftMetrics::new_initial(1u64);
    leader_metrics.state = ServerState::Leader;
    leader_metrics.current_term = 1;
    tx.send(leader_metrics).unwrap();

    let next = stream.next().await.expect("transition");
    assert!(
        matches!(next, LeadershipState::Leader { term: 1 }),
        "expected Leader {{ term: 1 }}; got {next:?}",
    );

    drop(tx);
    assert!(stream.next().await.is_none());
}

// Walks the role-class projections that the Follower→Leader test doesn't touch:
// Candidate, Learner, Shutdown. Each is sent as the *initial* watch value so the
// dedup logic emits it unconditionally on first poll; this keeps the test
// straight-line rather than threading a multi-step transition through one
// sender.
#[tokio::test]
async fn leadership_events_projects_candidate_learner_and_shutdown() {
    use futures::StreamExt;
    use openraft::RaftMetrics;
    use openraft::ServerState;
    use openraft::type_config::TypeConfigExt;
    use tsoracle_openraft_toolkit::LeadershipState;

    async fn first_emission(state: ServerState, term: u64) -> LeadershipState<TestTypeConfig> {
        let mut metrics: RaftMetrics<TestTypeConfig> = RaftMetrics::new_initial(1u64);
        metrics.state = state;
        metrics.current_term = term;
        let (_tx, rx) = <TestTypeConfig as TypeConfigExt>::watch_channel(metrics);
        let mut stream = std::pin::pin!(
            tsoracle_openraft_toolkit::lifecycle::leader::stream_from_receiver::<TestTypeConfig>(
                rx
            )
        );
        stream.next().await.expect("initial state emitted")
    }

    assert!(matches!(
        first_emission(ServerState::Candidate, 5).await,
        LeadershipState::Candidate { term: 5 }
    ));
    assert!(matches!(
        first_emission(ServerState::Learner, 9).await,
        LeadershipState::Learner
    ));
    assert!(matches!(
        first_emission(ServerState::Shutdown, 2).await,
        LeadershipState::Shutdown
    ));
}

// Drives `resolve_leader`'s populated-membership branch: when the metrics report
// a `current_leader` that exists in `membership_config.nodes()`, the Follower
// projection carries the resolved `(id, node)` pair. Without this, only the
// `current_leader = None` path (covered by the initial-state test above) gets
// exercised.
#[tokio::test]
async fn leadership_events_resolves_follower_leader_when_in_membership() {
    use std::collections::{BTreeMap, BTreeSet};
    use std::sync::Arc;

    use futures::StreamExt;
    use openraft::Membership;
    use openraft::RaftMetrics;
    use openraft::StoredMembership;
    use openraft::type_config::TypeConfigExt;
    use openraft::type_config::alias::StoredMembershipOf;
    use tsoracle_openraft_toolkit::LeadershipState;

    let peer_2 = TestPeer {
        addr: "host-2:9000".into(),
    };
    let nodes: BTreeMap<u64, TestPeer> = BTreeMap::from([
        (
            1,
            TestPeer {
                addr: "host-1:9000".into(),
            },
        ),
        (2, peer_2.clone()),
    ]);
    let membership: Membership<u64, TestPeer> =
        Membership::new(vec![BTreeSet::from([1u64, 2])], nodes).unwrap();
    let stored: StoredMembershipOf<TestTypeConfig> = StoredMembership::new(None, membership);

    let mut metrics: RaftMetrics<TestTypeConfig> = RaftMetrics::new_initial(1u64);
    metrics.current_leader = Some(2);
    metrics.membership_config = Arc::new(stored);

    let (_tx, rx) = <TestTypeConfig as TypeConfigExt>::watch_channel(metrics);
    let mut stream = std::pin::pin!(
        tsoracle_openraft_toolkit::lifecycle::leader::stream_from_receiver::<TestTypeConfig>(rx)
    );

    let first = stream.next().await.expect("initial state emitted");
    match first {
        LeadershipState::Follower {
            term: 0,
            leader: Some((id, node)),
        } => {
            assert_eq!(id, 2);
            assert_eq!(node, peer_2);
        }
        other => panic!("expected Follower with resolved leader; got {other:?}"),
    }
}

// Regression test for the term-coalesce bug fixed in #77.
//
// Scenario: this node is Leader(term=N). Quorum churn causes a fast
// Leader → Follower(term=M) → Leader(term=K) sequence on a higher term.
// All three updates land on the watch channel before the stream's next
// poll. The watch only stores the latest value, so the receiver sees
// Leader(term=K) directly — the intermediate Follower(term=M) is gone.
//
// Under the previous `RoleClass`-keyed dedup the new `Leader(term=K)`
// compared equal to the prior `Leader(term=N)` ("both are Leader") and
// was silently suppressed. Downstream of `tsoracle-driver-openraft`'s
// `leadership_events`, that means `tsoracle-server`'s failover fence
// never runs for term K, and this node continues issuing timestamps
// from the term-N allocator floor — a global ordering regression.
//
// Under full-value dedup, `Leader(term=K)` is not equal to
// `Leader(term=N)`, so the next poll emits the new Leader and the
// fence runs.
#[tokio::test]
async fn leadership_events_emits_leader_after_coalesced_term_change() {
    use futures::StreamExt;
    use openraft::RaftMetrics;
    use openraft::ServerState;
    use openraft::WatchSender;
    use openraft::type_config::TypeConfigExt;
    use tsoracle_openraft_toolkit::LeadershipState;

    // Initial value is Leader(term=1) so the first poll establishes the
    // last-emitted projection without any pre-roll transitions.
    let mut initial: RaftMetrics<TestTypeConfig> = RaftMetrics::new_initial(1u64);
    initial.state = ServerState::Leader;
    initial.current_term = 1;
    let (tx, rx) = <TestTypeConfig as TypeConfigExt>::watch_channel(initial);

    let mut stream = std::pin::pin!(
        tsoracle_openraft_toolkit::lifecycle::leader::stream_from_receiver::<TestTypeConfig>(rx)
    );

    let first = stream.next().await.expect("initial Leader");
    assert!(
        matches!(first, LeadershipState::Leader { term: 1 }),
        "expected initial Leader {{ term: 1 }}; got {first:?}",
    );

    // Push Follower(term=2) and Leader(term=3) before the next poll.
    // The watch coalesces: receiver's next borrow yields Leader(term=3).
    let mut as_follower: RaftMetrics<TestTypeConfig> = RaftMetrics::new_initial(1u64);
    as_follower.state = ServerState::Follower;
    as_follower.current_term = 2;
    tx.send(as_follower).unwrap();

    let mut as_leader_again: RaftMetrics<TestTypeConfig> = RaftMetrics::new_initial(1u64);
    as_leader_again.state = ServerState::Leader;
    as_leader_again.current_term = 3;
    tx.send(as_leader_again).unwrap();

    // Drop the sender so a broken dedup that suppresses the term change
    // falls through to `changed().await` → Err → `None`, instead of
    // hanging the test forever.
    drop(tx);

    let next = stream.next().await;
    assert!(
        matches!(next, Some(LeadershipState::Leader { term: 3 })),
        "expected Leader {{ term: 3 }} after coalesced Follower(2)/Leader(3); \
         got {next:?} — a None here means the dedup suppressed the new term, \
         which is the bug this test exists to prevent",
    );

    // Once the new leader is emitted, the stream terminates (sender dropped).
    assert!(stream.next().await.is_none());
}

// Acceptance-criteria guard: same-shape projections (identical role, term,
// and leader identity) must continue to be suppressed. Under the previous
// `RoleClass`-keyed dedup this was trivially true; under full-value dedup
// it still holds because `LeadershipState<C>` derives `PartialEq`.
//
// The send goes through the channel rather than being a no-op so the test
// would also catch a (hypothetical) future change that emits on every
// sender-side notification regardless of value equality.
#[tokio::test]
async fn leadership_events_suppresses_identical_projection() {
    use futures::StreamExt;
    use openraft::RaftMetrics;
    use openraft::WatchSender;
    use openraft::type_config::TypeConfigExt;
    use tsoracle_openraft_toolkit::LeadershipState;

    let initial: RaftMetrics<TestTypeConfig> = RaftMetrics::new_initial(1u64);
    let (tx, rx) = <TestTypeConfig as TypeConfigExt>::watch_channel(initial);

    let mut stream = std::pin::pin!(
        tsoracle_openraft_toolkit::lifecycle::leader::stream_from_receiver::<TestTypeConfig>(rx)
    );

    let first = stream.next().await.expect("initial Follower");
    assert!(
        matches!(
            first,
            LeadershipState::Follower {
                term: 0,
                leader: None,
            },
        ),
        "expected initial Follower {{ term: 0, leader: None }}; got {first:?}",
    );

    // Send an identical projection. Channel coalescing isn't the suppressor
    // here — only one send happens — so any emission would come from the
    // dedup logic letting an equal value through.
    let identical: RaftMetrics<TestTypeConfig> = RaftMetrics::new_initial(1u64);
    tx.send(identical).unwrap();
    drop(tx);

    // With identical-projection suppression the stream sees no new emit and
    // terminates when the sender drops.
    assert!(
        stream.next().await.is_none(),
        "identical projection must be suppressed; stream should terminate \
         on sender drop without re-emitting",
    );
}

// Counterpart to the test above: when `current_leader` is set but the node id
// isn't present in `membership_config.nodes()` (e.g. the leader was just
// removed from the config), `resolve_leader` returns `None` and the Follower
// projection carries `leader: None`. This exercises `find(...).map(...)`'s
// no-match arm.
#[tokio::test]
async fn leadership_events_drops_follower_leader_when_not_in_membership() {
    use std::collections::{BTreeMap, BTreeSet};
    use std::sync::Arc;

    use futures::StreamExt;
    use openraft::Membership;
    use openraft::RaftMetrics;
    use openraft::StoredMembership;
    use openraft::type_config::TypeConfigExt;
    use openraft::type_config::alias::StoredMembershipOf;
    use tsoracle_openraft_toolkit::LeadershipState;

    let nodes: BTreeMap<u64, TestPeer> = BTreeMap::from([(
        1,
        TestPeer {
            addr: "host-1:9000".into(),
        },
    )]);
    let membership: Membership<u64, TestPeer> =
        Membership::new(vec![BTreeSet::from([1u64])], nodes).unwrap();
    let stored: StoredMembershipOf<TestTypeConfig> = StoredMembership::new(None, membership);

    let mut metrics: RaftMetrics<TestTypeConfig> = RaftMetrics::new_initial(1u64);
    // 99 is intentionally absent from the membership above.
    metrics.current_leader = Some(99);
    metrics.membership_config = Arc::new(stored);

    let (_tx, rx) = <TestTypeConfig as TypeConfigExt>::watch_channel(metrics);
    let mut stream = std::pin::pin!(
        tsoracle_openraft_toolkit::lifecycle::leader::stream_from_receiver::<TestTypeConfig>(rx)
    );

    let first = stream.next().await.expect("initial state emitted");
    assert!(
        matches!(
            first,
            LeadershipState::Follower {
                term: 0,
                leader: None
            }
        ),
        "expected Follower with no resolved leader; got {first:?}",
    );
}