kuberic_core/events.rs
1use bytes::Bytes;
2use tokio::sync::oneshot;
3
4use crate::error::Result;
5use crate::replicator::{OpenContext, ReplicatorHandle};
6use crate::types::{
7 DataLossAction, Epoch, Lsn, OpenMode, OperationStream, ReplicaId, ReplicaInfo,
8 ReplicaSetConfig, ReplicaSetQuorumMode, Role,
9};
10
11// ---------------------------------------------------------------------------
12// Replicator control events (system-internal, operator → replicator actor)
13// ---------------------------------------------------------------------------
14
15/// Control events delivered to the replicator actor by the runtime.
16/// Processed sequentially on the control channel.
17pub enum ReplicatorControlEvent {
18 Open {
19 mode: OpenMode,
20 reply: oneshot::Sender<Result<()>>,
21 },
22 Close {
23 reply: oneshot::Sender<Result<()>>,
24 },
25 Abort,
26
27 ChangeRole {
28 epoch: Epoch,
29 role: Role,
30 reply: oneshot::Sender<Result<()>>,
31 },
32 UpdateEpoch {
33 epoch: Epoch,
34 reply: oneshot::Sender<Result<()>>,
35 },
36
37 // Primary-only reconfiguration commands
38 OnDataLoss {
39 reply: oneshot::Sender<Result<DataLossAction>>,
40 },
41 UpdateCatchUpConfiguration {
42 current: ReplicaSetConfig,
43 previous: ReplicaSetConfig,
44 reply: oneshot::Sender<Result<()>>,
45 },
46 UpdateCurrentConfiguration {
47 current: ReplicaSetConfig,
48 reply: oneshot::Sender<Result<()>>,
49 },
50 WaitForCatchUpQuorum {
51 mode: ReplicaSetQuorumMode,
52 reply: oneshot::Sender<Result<()>>,
53 },
54 BuildReplica {
55 replica: ReplicaInfo,
56 reply: oneshot::Sender<Result<()>>,
57 },
58 RemoveReplica {
59 replica_id: ReplicaId,
60 reply: oneshot::Sender<Result<()>>,
61 },
62}
63
64// ---------------------------------------------------------------------------
65// Replicator data events (user → replicator actor, high-throughput path)
66// ---------------------------------------------------------------------------
67
68/// Request to replicate data to quorum. Sent by StateReplicatorHandle.
69pub struct ReplicateRequest {
70 pub data: Bytes,
71 pub reply: oneshot::Sender<Result<Lsn>>,
72}
73
74// ---------------------------------------------------------------------------
75// Lifecycle events (runtime → user, SF's IStatefulServiceReplica)
76// ---------------------------------------------------------------------------
77
78/// Lifecycle events delivered on the lifecycle channel.
79/// Rare but high-priority — handle immediately (blocks reconfiguration).
80pub enum LifecycleEvent {
81 /// Replica opened. Create replicator, return handle to runtime.
82 ///
83 /// The user creates the replicator (e.g., `WalReplicator::create()`),
84 /// keeps the user-facing handles, and returns the `ReplicatorHandle`
85 /// to the runtime.
86 Open {
87 ctx: OpenContext,
88 reply: oneshot::Sender<Result<ReplicatorHandle>>,
89 },
90
91 /// Role changed. Start/stop background work accordingly.
92 ///
93 /// Reply with the new listening address (e.g., "http://0.0.0.0:8080")
94 /// or empty string if not listening in this role.
95 ChangeRole {
96 new_role: Role,
97 reply: oneshot::Sender<Result<String>>,
98 },
99
100 /// Graceful shutdown. Drain in-flight work, flush state, release
101 /// resources. The runtime calls the replicator's Close after this returns.
102 Close { reply: oneshot::Sender<Result<()>> },
103
104 /// Ungraceful termination. Release resources best-effort and return
105 /// immediately. No reply channel — the runtime doesn't wait.
106 Abort,
107}
108
109// ---------------------------------------------------------------------------
110// State provider events (runtime/replicator → user, SF's IStateProvider)
111// ---------------------------------------------------------------------------
112
113/// State provider callbacks delivered on the state_provider channel.
114/// Role-specific.
115pub enum StateProviderEvent {
116 /// Epoch changed (secondaries only).
117 /// Primary gets epoch via ChangeRole, not UpdateEpoch.
118 /// Operations with LSN above `previous_epoch_last_lsn` from the old
119 /// epoch may be stale (uncommitted zombie primary writes).
120 UpdateEpoch {
121 epoch: Epoch,
122 previous_epoch_last_lsn: Lsn,
123 reply: oneshot::Sender<Result<()>>,
124 },
125
126 /// "What's your last applied LSN?" (secondary, during build/catchup)
127 GetLastCommittedLsn { reply: oneshot::Sender<Result<Lsn>> },
128
129 /// "What state do you already have?" (new idle secondary, during build)
130 GetCopyContext {
131 reply: oneshot::Sender<Result<OperationStream>>,
132 },
133
134 /// "Produce state for this secondary" (primary, during build_replica)
135 GetCopyState {
136 up_to_lsn: Lsn,
137 copy_context: OperationStream,
138 reply: oneshot::Sender<Result<OperationStream>>,
139 },
140
141 /// "Quorum was lost, data loss possible" (new primary after quorum loss)
142 OnDataLoss {
143 reply: oneshot::Sender<Result<bool>>,
144 },
145}
146
147// ---------------------------------------------------------------------------
148// ServiceContext — user-facing handles, defined in replicator::ServiceContext
149// ---------------------------------------------------------------------------
150
151// Users receive OpenContext in the Open event, create a replicator,
152// and keep ServiceContext (partition, replicator, streams, state_provider_rx).
153// The token comes from OpenContext.token.
154
155// ---------------------------------------------------------------------------
156// Channel bundle — REMOVED (internal to WalReplicator)
157// ---------------------------------------------------------------------------
158
159// ReplicatorChannels is no longer needed in the public API.
160// Channel creation is internal to WalReplicator::create().