Skip to main content

kuberic_core/
events.rs

1use bytes::Bytes;
2use tokio::sync::oneshot;
3
4use crate::error::Result;
5use crate::replicator::{OpenContext, ReplicatorHandle};
6use crate::types::{
7    DataLossAction, Epoch, Lsn, OpenMode, OperationStream, ReplicaId, ReplicaInfo,
8    ReplicaSetConfig, ReplicaSetQuorumMode, Role,
9};
10
11// ---------------------------------------------------------------------------
12// Replicator control events (system-internal, operator → replicator actor)
13// ---------------------------------------------------------------------------
14
15/// Control events delivered to the replicator actor by the runtime.
16/// Processed sequentially on the control channel.
17pub enum ReplicatorControlEvent {
18    Open {
19        mode: OpenMode,
20        reply: oneshot::Sender<Result<()>>,
21    },
22    Close {
23        reply: oneshot::Sender<Result<()>>,
24    },
25    Abort,
26
27    ChangeRole {
28        epoch: Epoch,
29        role: Role,
30        reply: oneshot::Sender<Result<()>>,
31    },
32    UpdateEpoch {
33        epoch: Epoch,
34        reply: oneshot::Sender<Result<()>>,
35    },
36
37    // Primary-only reconfiguration commands
38    OnDataLoss {
39        reply: oneshot::Sender<Result<DataLossAction>>,
40    },
41    UpdateCatchUpConfiguration {
42        current: ReplicaSetConfig,
43        previous: ReplicaSetConfig,
44        reply: oneshot::Sender<Result<()>>,
45    },
46    UpdateCurrentConfiguration {
47        current: ReplicaSetConfig,
48        reply: oneshot::Sender<Result<()>>,
49    },
50    WaitForCatchUpQuorum {
51        mode: ReplicaSetQuorumMode,
52        reply: oneshot::Sender<Result<()>>,
53    },
54    BuildReplica {
55        replica: ReplicaInfo,
56        reply: oneshot::Sender<Result<()>>,
57    },
58    RemoveReplica {
59        replica_id: ReplicaId,
60        reply: oneshot::Sender<Result<()>>,
61    },
62}
63
64// ---------------------------------------------------------------------------
65// Replicator data events (user → replicator actor, high-throughput path)
66// ---------------------------------------------------------------------------
67
68/// Request to replicate data to quorum. Sent by StateReplicatorHandle.
69pub struct ReplicateRequest {
70    pub data: Bytes,
71    pub reply: oneshot::Sender<Result<Lsn>>,
72}
73
74// ---------------------------------------------------------------------------
75// Lifecycle events (runtime → user, SF's IStatefulServiceReplica)
76// ---------------------------------------------------------------------------
77
78/// Lifecycle events delivered on the lifecycle channel.
79/// Rare but high-priority — handle immediately (blocks reconfiguration).
80pub enum LifecycleEvent {
81    /// Replica opened. Create replicator, return handle to runtime.
82    ///
83    /// The user creates the replicator (e.g., `WalReplicator::create()`),
84    /// keeps the user-facing handles, and returns the `ReplicatorHandle`
85    /// to the runtime.
86    Open {
87        ctx: OpenContext,
88        reply: oneshot::Sender<Result<ReplicatorHandle>>,
89    },
90
91    /// Role changed. Start/stop background work accordingly.
92    ///
93    /// Reply with the new listening address (e.g., "http://0.0.0.0:8080")
94    /// or empty string if not listening in this role.
95    ChangeRole {
96        new_role: Role,
97        reply: oneshot::Sender<Result<String>>,
98    },
99
100    /// Graceful shutdown. Drain in-flight work, flush state, release
101    /// resources. The runtime calls the replicator's Close after this returns.
102    Close { reply: oneshot::Sender<Result<()>> },
103
104    /// Ungraceful termination. Release resources best-effort and return
105    /// immediately. No reply channel — the runtime doesn't wait.
106    Abort,
107}
108
109// ---------------------------------------------------------------------------
110// State provider events (runtime/replicator → user, SF's IStateProvider)
111// ---------------------------------------------------------------------------
112
113/// State provider callbacks delivered on the state_provider channel.
114/// Role-specific.
115pub enum StateProviderEvent {
116    /// Epoch changed (secondaries only).
117    /// Primary gets epoch via ChangeRole, not UpdateEpoch.
118    /// Operations with LSN above `previous_epoch_last_lsn` from the old
119    /// epoch may be stale (uncommitted zombie primary writes).
120    UpdateEpoch {
121        epoch: Epoch,
122        previous_epoch_last_lsn: Lsn,
123        reply: oneshot::Sender<Result<()>>,
124    },
125
126    /// "What's your last applied LSN?" (secondary, during build/catchup)
127    GetLastCommittedLsn { reply: oneshot::Sender<Result<Lsn>> },
128
129    /// "What state do you already have?" (new idle secondary, during build)
130    GetCopyContext {
131        reply: oneshot::Sender<Result<OperationStream>>,
132    },
133
134    /// "Produce state for this secondary" (primary, during build_replica)
135    GetCopyState {
136        up_to_lsn: Lsn,
137        copy_context: OperationStream,
138        reply: oneshot::Sender<Result<OperationStream>>,
139    },
140
141    /// "Quorum was lost, data loss possible" (new primary after quorum loss)
142    OnDataLoss {
143        reply: oneshot::Sender<Result<bool>>,
144    },
145}
146
147// ---------------------------------------------------------------------------
148// ServiceContext — user-facing handles, defined in replicator::ServiceContext
149// ---------------------------------------------------------------------------
150
151// Users receive OpenContext in the Open event, create a replicator,
152// and keep ServiceContext (partition, replicator, streams, state_provider_rx).
153// The token comes from OpenContext.token.
154
155// ---------------------------------------------------------------------------
156// Channel bundle — REMOVED (internal to WalReplicator)
157// ---------------------------------------------------------------------------
158
159// ReplicatorChannels is no longer needed in the public API.
160// Channel creation is internal to WalReplicator::create().