pub mod actor;
pub(crate) mod copy;
pub mod primary;
pub mod queue;
pub mod quorum;
pub mod secondary;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{mpsc, oneshot};
use crate::error::{KubericError, Result};
use crate::events::{ReplicatorControlEvent, StateProviderEvent};
use crate::handles::{PartitionHandle, PartitionState, StateReplicatorHandle};
use crate::types::{CancellationToken, FaultType, OpenMode, OperationStream, ReplicaId};
pub struct ReplicatorHandle {
control_tx: mpsc::Sender<ReplicatorControlEvent>,
state: Arc<PartitionState>,
data_address: String,
shutdown: CancellationToken,
}
impl ReplicatorHandle {
pub fn new(
control_tx: mpsc::Sender<ReplicatorControlEvent>,
state: Arc<PartitionState>,
data_address: String,
shutdown: CancellationToken,
) -> Self {
Self {
control_tx,
state,
data_address,
shutdown,
}
}
pub async fn send_control<T>(
&self,
make: impl FnOnce(oneshot::Sender<Result<T>>) -> ReplicatorControlEvent,
timeout: Duration,
) -> Result<T> {
let (tx, rx) = oneshot::channel();
self.control_tx
.send(make(tx))
.await
.map_err(|_| KubericError::Closed)?;
match tokio::time::timeout(timeout, rx).await {
Ok(Ok(result)) => result,
Ok(Err(_)) => Err(KubericError::Closed),
Err(_) => Err(KubericError::Internal("replicator timeout".into())),
}
}
pub fn state(&self) -> &Arc<PartitionState> {
&self.state
}
pub fn data_address(&self) -> &str {
&self.data_address
}
pub fn abort(&self) {
self.shutdown.cancel();
}
}
pub struct ServiceContext {
pub replicator: StateReplicatorHandle,
pub partition: Arc<PartitionHandle>,
pub copy_stream: Option<OperationStream>,
pub replication_stream: Option<OperationStream>,
}
pub struct OpenContext {
pub replica_id: ReplicaId,
pub open_mode: OpenMode,
pub data_bind: String,
pub token: CancellationToken,
pub fault_tx: mpsc::Sender<FaultType>,
}
use tonic::transport::Server;
use crate::events::ReplicateRequest;
use crate::proto::replicator_data_server::ReplicatorDataServer;
use crate::replicator::actor::WalReplicatorActor;
use crate::replicator::secondary::{SecondaryReceiver, SecondaryState};
pub struct WalReplicator;
impl WalReplicator {
pub async fn create(
replica_id: ReplicaId,
data_bind: &str,
fault_tx: mpsc::Sender<FaultType>,
state_provider_tx: mpsc::UnboundedSender<StateProviderEvent>,
) -> Result<(ReplicatorHandle, ServiceContext)> {
let (control_tx, control_rx) = mpsc::channel(16);
let (data_tx, data_rx) = mpsc::channel::<ReplicateRequest>(256);
let state = Arc::new(PartitionState::new());
let secondary_state = Arc::new(SecondaryState::new());
let shutdown = CancellationToken::new();
let (repl_op_tx, repl_op_rx) = mpsc::channel(256);
let replication_stream = OperationStream::new(repl_op_rx);
let (copy_op_tx, copy_op_rx) = mpsc::channel(256);
let copy_stream = OperationStream::new(copy_op_rx);
let data_receiver = SecondaryReceiver::with_streams(
secondary_state,
state.clone(),
repl_op_tx,
copy_op_tx,
state_provider_tx.clone(),
);
let data_listener = tokio::net::TcpListener::bind(data_bind)
.await
.map_err(|e| KubericError::Internal(Box::new(e)))?;
let data_addr = data_listener.local_addr().unwrap();
let data_address = format!("http://{}", data_addr);
let data_shutdown = shutdown.child_token();
tokio::spawn(async move {
let _ = Server::builder()
.add_service(ReplicatorDataServer::new(data_receiver))
.serve_with_incoming_shutdown(
tokio_stream::wrappers::TcpListenerStream::new(data_listener),
data_shutdown.cancelled(),
)
.await;
});
let actor = WalReplicatorActor::new(replica_id);
let state_cp = state.clone();
let sp_tx = state_provider_tx.clone();
tokio::spawn(async move {
actor.run(control_rx, data_rx, state_cp, sp_tx).await;
});
let partition = Arc::new(PartitionHandle::new(state.clone(), fault_tx));
let replicator_write = StateReplicatorHandle::new(data_tx, state.clone());
let runtime_handle =
ReplicatorHandle::new(control_tx, state.clone(), data_address, shutdown);
let user_handles = ServiceContext {
replicator: replicator_write,
partition,
copy_stream: Some(copy_stream),
replication_stream: Some(replication_stream),
};
Ok((runtime_handle, user_handles))
}
}