use std::sync::Arc;
use tokio::sync::mpsc;
use crate::events::{ReplicateRequest, ReplicatorControlEvent};
use crate::handles::PartitionState;
use crate::types::{Lsn, Role};
pub struct NoopReplicator;
impl NoopReplicator {
pub async fn run(
mut control_rx: mpsc::Receiver<ReplicatorControlEvent>,
mut data_rx: mpsc::Receiver<ReplicateRequest>,
state: Arc<PartitionState>,
) {
let mut role = Role::None;
let mut next_lsn: Lsn = 1;
loop {
tokio::select! {
biased;
event = control_rx.recv() => {
let Some(event) = event else { break };
match event {
ReplicatorControlEvent::Open { reply, .. } => {
let _ = reply.send(Ok(()));
}
ReplicatorControlEvent::Close { reply } => {
let _ = reply.send(Ok(()));
break;
}
ReplicatorControlEvent::Abort => {
break;
}
ReplicatorControlEvent::ChangeRole {
role: new_role,
reply,
..
} => {
role = new_role;
let _ = reply.send(Ok(()));
}
ReplicatorControlEvent::UpdateEpoch { reply, .. } => {
let _ = reply.send(Ok(()));
}
ReplicatorControlEvent::OnDataLoss { reply } => {
let _ = reply.send(Ok(crate::types::DataLossAction::None));
}
ReplicatorControlEvent::UpdateCatchUpConfiguration { reply, .. } => {
let _ = reply.send(Ok(()));
}
ReplicatorControlEvent::UpdateCurrentConfiguration { reply, .. } => {
let _ = reply.send(Ok(()));
}
ReplicatorControlEvent::WaitForCatchUpQuorum { reply, .. } => {
let _ = reply.send(Ok(()));
}
ReplicatorControlEvent::BuildReplica { reply, .. } => {
let _ = reply.send(Ok(()));
}
ReplicatorControlEvent::RemoveReplica { reply, .. } => {
let _ = reply.send(Ok(()));
}
}
}
req = data_rx.recv(), if role == Role::Primary => {
let Some(req) = req else { break };
let lsn = next_lsn;
next_lsn += 1;
state.set_current_progress(lsn);
state.set_committed_lsn(lsn);
let _ = req.reply.send(Ok(lsn));
}
else => break,
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::events::ReplicateRequest;
use crate::types::{AccessStatus, CancellationToken, Epoch, OpenMode};
fn runtime_set_status_for_role(state: &PartitionState, role: Role) {
match role {
Role::Primary => {
state.set_read_status(AccessStatus::Granted);
state.set_write_status(AccessStatus::Granted);
}
_ => {
state.set_read_status(AccessStatus::NotPrimary);
state.set_write_status(AccessStatus::NotPrimary);
}
}
}
#[tokio::test]
async fn test_noop_lifecycle() {
let (control_tx, control_rx) = tokio::sync::mpsc::channel(16);
let (data_tx, data_rx) = tokio::sync::mpsc::channel::<ReplicateRequest>(16);
let state = Arc::new(PartitionState::new());
let state_cp = state.clone();
let handle = tokio::spawn(async move {
NoopReplicator::run(control_rx, data_rx, state_cp).await;
});
let (tx, rx) = tokio::sync::oneshot::channel();
control_tx
.send(ReplicatorControlEvent::Open {
mode: OpenMode::New,
reply: tx,
})
.await
.unwrap();
rx.await.unwrap().unwrap();
let (tx, rx) = tokio::sync::oneshot::channel();
control_tx
.send(ReplicatorControlEvent::ChangeRole {
epoch: Epoch::new(0, 1),
role: Role::Primary,
reply: tx,
})
.await
.unwrap();
rx.await.unwrap().unwrap();
runtime_set_status_for_role(&state, Role::Primary);
assert_eq!(state.read_status(), AccessStatus::Granted);
assert_eq!(state.write_status(), AccessStatus::Granted);
let (tx, rx) = tokio::sync::oneshot::channel();
data_tx
.send(ReplicateRequest {
data: bytes::Bytes::from("hello"),
reply: tx,
})
.await
.unwrap();
let lsn = rx.await.unwrap().unwrap();
assert_eq!(lsn, 1);
assert_eq!(state.current_progress(), 1);
let (tx, rx) = tokio::sync::oneshot::channel();
data_tx
.send(ReplicateRequest {
data: bytes::Bytes::from("world"),
reply: tx,
})
.await
.unwrap();
let lsn = rx.await.unwrap().unwrap();
assert_eq!(lsn, 2);
let (tx, rx) = tokio::sync::oneshot::channel();
control_tx
.send(ReplicatorControlEvent::ChangeRole {
epoch: Epoch::new(0, 2),
role: Role::ActiveSecondary,
reply: tx,
})
.await
.unwrap();
rx.await.unwrap().unwrap();
runtime_set_status_for_role(&state, Role::ActiveSecondary);
assert_eq!(state.read_status(), AccessStatus::NotPrimary);
assert_eq!(state.write_status(), AccessStatus::NotPrimary);
let (tx, rx) = tokio::sync::oneshot::channel();
control_tx
.send(ReplicatorControlEvent::Close { reply: tx })
.await
.unwrap();
rx.await.unwrap().unwrap();
handle.await.unwrap();
}
#[tokio::test]
async fn test_noop_replicate_handle() {
use crate::handles::StateReplicatorHandle;
let (control_tx, control_rx) = tokio::sync::mpsc::channel(16);
let (data_tx, data_rx) = tokio::sync::mpsc::channel::<ReplicateRequest>(16);
let state = Arc::new(PartitionState::new());
let replicator_handle = StateReplicatorHandle::new(data_tx.clone(), state.clone());
let state_cp = state.clone();
let handle = tokio::spawn(async move {
NoopReplicator::run(control_rx, data_rx, state_cp).await;
});
let (tx, rx) = tokio::sync::oneshot::channel();
control_tx
.send(ReplicatorControlEvent::Open {
mode: OpenMode::New,
reply: tx,
})
.await
.unwrap();
rx.await.unwrap().unwrap();
let (tx, rx) = tokio::sync::oneshot::channel();
control_tx
.send(ReplicatorControlEvent::ChangeRole {
epoch: Epoch::new(0, 1),
role: Role::Primary,
reply: tx,
})
.await
.unwrap();
rx.await.unwrap().unwrap();
runtime_set_status_for_role(&state, Role::Primary);
let token = CancellationToken::new();
let lsn = replicator_handle
.replicate(bytes::Bytes::from("test"), token)
.await
.unwrap();
assert_eq!(lsn, 1);
let (tx, rx) = tokio::sync::oneshot::channel();
control_tx
.send(ReplicatorControlEvent::Close { reply: tx })
.await
.unwrap();
rx.await.unwrap().unwrap();
handle.await.unwrap();
}
#[tokio::test]
async fn test_noop_replicate_not_primary() {
use crate::handles::StateReplicatorHandle;
let (_control_tx, control_rx) = tokio::sync::mpsc::channel(16);
let (data_tx, data_rx) = tokio::sync::mpsc::channel::<ReplicateRequest>(16);
let state = Arc::new(PartitionState::new());
let replicator_handle = StateReplicatorHandle::new(data_tx.clone(), state.clone());
let state_cp = state.clone();
let _handle = tokio::spawn(async move {
NoopReplicator::run(control_rx, data_rx, state_cp).await;
});
let token = CancellationToken::new();
let result = replicator_handle
.replicate(bytes::Bytes::from("test"), token)
.await;
assert!(matches!(result, Err(crate::KubericError::NotPrimary)));
}
}