Skip to main content

kuberic_core/replicator/
mod.rs

1pub mod actor;
2pub(crate) mod copy;
3pub mod primary;
4pub mod queue;
5pub mod quorum;
6pub mod secondary;
7
8use std::sync::Arc;
9use std::time::Duration;
10
11use tokio::sync::{mpsc, oneshot};
12
13use crate::error::{KubericError, Result};
14use crate::events::{ReplicatorControlEvent, StateProviderEvent};
15use crate::handles::{PartitionHandle, PartitionState, StateReplicatorHandle};
16use crate::types::{CancellationToken, FaultType, OpenMode, OperationStream, ReplicaId};
17
18// ---------------------------------------------------------------------------
19// ReplicatorHandle — returned by user to runtime at Open
20// ---------------------------------------------------------------------------
21
22/// Handle returned by the user to the runtime at Open.
23/// The runtime uses this to drive replicator lifecycle via control events.
24/// Contains shared PartitionState for synchronous access-status fencing.
25pub struct ReplicatorHandle {
26    /// Send lifecycle/config commands to the replicator's event loop.
27    control_tx: mpsc::Sender<ReplicatorControlEvent>,
28    /// Shared partition state (atomics). Runtime uses this for:
29    /// - set_status_for_role() — access status fencing (synchronous)
30    /// - GetStatus — read current_progress, committed_lsn
31    state: Arc<PartitionState>,
32    /// Data plane address (for operator registration).
33    data_address: String,
34    /// Shutdown token for immediate abort.
35    shutdown: CancellationToken,
36}
37
38impl ReplicatorHandle {
39    pub fn new(
40        control_tx: mpsc::Sender<ReplicatorControlEvent>,
41        state: Arc<PartitionState>,
42        data_address: String,
43        shutdown: CancellationToken,
44    ) -> Self {
45        Self {
46            control_tx,
47            state,
48            data_address,
49            shutdown,
50        }
51    }
52
53    /// Send a control event and wait for reply.
54    pub async fn send_control<T>(
55        &self,
56        make: impl FnOnce(oneshot::Sender<Result<T>>) -> ReplicatorControlEvent,
57        timeout: Duration,
58    ) -> Result<T> {
59        let (tx, rx) = oneshot::channel();
60        self.control_tx
61            .send(make(tx))
62            .await
63            .map_err(|_| KubericError::Closed)?;
64        match tokio::time::timeout(timeout, rx).await {
65            Ok(Ok(result)) => result,
66            Ok(Err(_)) => Err(KubericError::Closed),
67            Err(_) => Err(KubericError::Internal("replicator timeout".into())),
68        }
69    }
70
71    /// Synchronous access to partition state (atomics, no channel hop).
72    pub fn state(&self) -> &Arc<PartitionState> {
73        &self.state
74    }
75
76    /// Data plane address for operator registration.
77    pub fn data_address(&self) -> &str {
78        &self.data_address
79    }
80
81    /// Cancel the replicator's shutdown token for immediate abort.
82    pub fn abort(&self) {
83        self.shutdown.cancel();
84    }
85}
86
87// ---------------------------------------------------------------------------
88// ServiceContext — user-facing handles kept by the user service
89// ---------------------------------------------------------------------------
90
91/// User-facing handles produced by the replicator factory.
92/// The user keeps these; the ReplicatorHandle goes to the runtime.
93pub struct ServiceContext {
94    /// Write handle (primary path).
95    pub replicator: StateReplicatorHandle,
96    /// Read/write access status + fault reporting.
97    pub partition: Arc<PartitionHandle>,
98    /// Copy stream (secondary, during build). None on primary.
99    pub copy_stream: Option<OperationStream>,
100    /// Replication stream (secondary, during catchup). None on primary.
101    pub replication_stream: Option<OperationStream>,
102}
103
104// ---------------------------------------------------------------------------
105// OpenContext — provided to user at Open to create a replicator
106// ---------------------------------------------------------------------------
107
108/// Context provided to the user at Open time. Contains what the user needs
109/// to create a replicator (bind addresses, replica ID, etc.)
110pub struct OpenContext {
111    pub replica_id: ReplicaId,
112    /// New vs Existing — the replicator needs this for initialization.
113    pub open_mode: OpenMode,
114    /// Address for the data plane gRPC server.
115    pub data_bind: String,
116    /// Cancellation token for the replica's lifetime.
117    pub token: CancellationToken,
118    /// Fault reporting channel (runtime holds the receiver).
119    pub fault_tx: mpsc::Sender<FaultType>,
120}
121
122// ---------------------------------------------------------------------------
123// WalReplicator — factory for the WAL-based quorum replicator
124// ---------------------------------------------------------------------------
125
126use tonic::transport::Server;
127
128use crate::events::ReplicateRequest;
129use crate::proto::replicator_data_server::ReplicatorDataServer;
130use crate::replicator::actor::WalReplicatorActor;
131use crate::replicator::secondary::{SecondaryReceiver, SecondaryState};
132
133/// WAL-based quorum replicator factory.
134/// Creates the actor, data plane, streams, and returns handles.
135pub struct WalReplicator;
136
137impl WalReplicator {
138    /// Create a new WalReplicator. Starts:
139    /// - WalReplicatorActor (event loop processing control_rx + data_rx)
140    /// - Data plane gRPC server (SecondaryReceiver)
141    /// - Replication + copy streams
142    ///
143    /// Returns (runtime_handle, user_handles):
144    /// - runtime_handle: for runtime to send lifecycle events
145    /// - user_handles: for user to read/write replicated data
146    pub async fn create(
147        replica_id: ReplicaId,
148        data_bind: &str,
149        fault_tx: mpsc::Sender<FaultType>,
150        state_provider_tx: mpsc::UnboundedSender<StateProviderEvent>,
151    ) -> Result<(ReplicatorHandle, ServiceContext)> {
152        let (control_tx, control_rx) = mpsc::channel(16);
153        let (data_tx, data_rx) = mpsc::channel::<ReplicateRequest>(256);
154        let state = Arc::new(PartitionState::new());
155        let secondary_state = Arc::new(SecondaryState::new());
156        let shutdown = CancellationToken::new();
157
158        // Replication + copy streams
159        let (repl_op_tx, repl_op_rx) = mpsc::channel(256);
160        let replication_stream = OperationStream::new(repl_op_rx);
161        let (copy_op_tx, copy_op_rx) = mpsc::channel(256);
162        let copy_stream = OperationStream::new(copy_op_rx);
163
164        // Data plane gRPC server
165        let data_receiver = SecondaryReceiver::with_streams(
166            secondary_state,
167            state.clone(),
168            repl_op_tx,
169            copy_op_tx,
170            state_provider_tx.clone(),
171        );
172        let data_listener = tokio::net::TcpListener::bind(data_bind)
173            .await
174            .map_err(|e| KubericError::Internal(Box::new(e)))?;
175        let data_addr = data_listener.local_addr().unwrap();
176        let data_address = format!("http://{}", data_addr);
177
178        let data_shutdown = shutdown.child_token();
179        tokio::spawn(async move {
180            let _ = Server::builder()
181                .add_service(ReplicatorDataServer::new(data_receiver))
182                .serve_with_incoming_shutdown(
183                    tokio_stream::wrappers::TcpListenerStream::new(data_listener),
184                    data_shutdown.cancelled(),
185                )
186                .await;
187        });
188
189        // Replicator actor — receives state_provider_tx for forwarding
190        let actor = WalReplicatorActor::new(replica_id);
191        let state_cp = state.clone();
192        let sp_tx = state_provider_tx.clone();
193        tokio::spawn(async move {
194            actor.run(control_rx, data_rx, state_cp, sp_tx).await;
195        });
196
197        // Build handles
198        let partition = Arc::new(PartitionHandle::new(state.clone(), fault_tx));
199        let replicator_write = StateReplicatorHandle::new(data_tx, state.clone());
200
201        let runtime_handle =
202            ReplicatorHandle::new(control_tx, state.clone(), data_address, shutdown);
203
204        let user_handles = ServiceContext {
205            replicator: replicator_write,
206            partition,
207            copy_stream: Some(copy_stream),
208            replication_stream: Some(replication_stream),
209        };
210
211        Ok((runtime_handle, user_handles))
212    }
213}