kuberic_core/replicator/
mod.rs1pub mod actor;
2pub(crate) mod copy;
3pub mod primary;
4pub mod queue;
5pub mod quorum;
6pub mod secondary;
7
8use std::sync::Arc;
9use std::time::Duration;
10
11use tokio::sync::{mpsc, oneshot};
12
13use crate::error::{KubericError, Result};
14use crate::events::{ReplicatorControlEvent, StateProviderEvent};
15use crate::handles::{PartitionHandle, PartitionState, StateReplicatorHandle};
16use crate::types::{CancellationToken, FaultType, OpenMode, OperationStream, ReplicaId};
17
18pub struct ReplicatorHandle {
26 control_tx: mpsc::Sender<ReplicatorControlEvent>,
28 state: Arc<PartitionState>,
32 data_address: String,
34 shutdown: CancellationToken,
36}
37
38impl ReplicatorHandle {
39 pub fn new(
40 control_tx: mpsc::Sender<ReplicatorControlEvent>,
41 state: Arc<PartitionState>,
42 data_address: String,
43 shutdown: CancellationToken,
44 ) -> Self {
45 Self {
46 control_tx,
47 state,
48 data_address,
49 shutdown,
50 }
51 }
52
53 pub async fn send_control<T>(
55 &self,
56 make: impl FnOnce(oneshot::Sender<Result<T>>) -> ReplicatorControlEvent,
57 timeout: Duration,
58 ) -> Result<T> {
59 let (tx, rx) = oneshot::channel();
60 self.control_tx
61 .send(make(tx))
62 .await
63 .map_err(|_| KubericError::Closed)?;
64 match tokio::time::timeout(timeout, rx).await {
65 Ok(Ok(result)) => result,
66 Ok(Err(_)) => Err(KubericError::Closed),
67 Err(_) => Err(KubericError::Internal("replicator timeout".into())),
68 }
69 }
70
71 pub fn state(&self) -> &Arc<PartitionState> {
73 &self.state
74 }
75
76 pub fn data_address(&self) -> &str {
78 &self.data_address
79 }
80
81 pub fn abort(&self) {
83 self.shutdown.cancel();
84 }
85}
86
87pub struct ServiceContext {
94 pub replicator: StateReplicatorHandle,
96 pub partition: Arc<PartitionHandle>,
98 pub copy_stream: Option<OperationStream>,
100 pub replication_stream: Option<OperationStream>,
102}
103
104pub struct OpenContext {
111 pub replica_id: ReplicaId,
112 pub open_mode: OpenMode,
114 pub data_bind: String,
116 pub token: CancellationToken,
118 pub fault_tx: mpsc::Sender<FaultType>,
120}
121
122use tonic::transport::Server;
127
128use crate::events::ReplicateRequest;
129use crate::proto::replicator_data_server::ReplicatorDataServer;
130use crate::replicator::actor::WalReplicatorActor;
131use crate::replicator::secondary::{SecondaryReceiver, SecondaryState};
132
133pub struct WalReplicator;
136
137impl WalReplicator {
138 pub async fn create(
147 replica_id: ReplicaId,
148 data_bind: &str,
149 fault_tx: mpsc::Sender<FaultType>,
150 state_provider_tx: mpsc::UnboundedSender<StateProviderEvent>,
151 ) -> Result<(ReplicatorHandle, ServiceContext)> {
152 let (control_tx, control_rx) = mpsc::channel(16);
153 let (data_tx, data_rx) = mpsc::channel::<ReplicateRequest>(256);
154 let state = Arc::new(PartitionState::new());
155 let secondary_state = Arc::new(SecondaryState::new());
156 let shutdown = CancellationToken::new();
157
158 let (repl_op_tx, repl_op_rx) = mpsc::channel(256);
160 let replication_stream = OperationStream::new(repl_op_rx);
161 let (copy_op_tx, copy_op_rx) = mpsc::channel(256);
162 let copy_stream = OperationStream::new(copy_op_rx);
163
164 let data_receiver = SecondaryReceiver::with_streams(
166 secondary_state,
167 state.clone(),
168 repl_op_tx,
169 copy_op_tx,
170 state_provider_tx.clone(),
171 );
172 let data_listener = tokio::net::TcpListener::bind(data_bind)
173 .await
174 .map_err(|e| KubericError::Internal(Box::new(e)))?;
175 let data_addr = data_listener.local_addr().unwrap();
176 let data_address = format!("http://{}", data_addr);
177
178 let data_shutdown = shutdown.child_token();
179 tokio::spawn(async move {
180 let _ = Server::builder()
181 .add_service(ReplicatorDataServer::new(data_receiver))
182 .serve_with_incoming_shutdown(
183 tokio_stream::wrappers::TcpListenerStream::new(data_listener),
184 data_shutdown.cancelled(),
185 )
186 .await;
187 });
188
189 let actor = WalReplicatorActor::new(replica_id);
191 let state_cp = state.clone();
192 let sp_tx = state_provider_tx.clone();
193 tokio::spawn(async move {
194 actor.run(control_rx, data_rx, state_cp, sp_tx).await;
195 });
196
197 let partition = Arc::new(PartitionHandle::new(state.clone(), fault_tx));
199 let replicator_write = StateReplicatorHandle::new(data_tx, state.clone());
200
201 let runtime_handle =
202 ReplicatorHandle::new(control_tx, state.clone(), data_address, shutdown);
203
204 let user_handles = ServiceContext {
205 replicator: replicator_write,
206 partition,
207 copy_stream: Some(copy_stream),
208 replication_stream: Some(replication_stream),
209 };
210
211 Ok((runtime_handle, user_handles))
212 }
213}