1use std::sync::Arc;
2
3use tokio::sync::mpsc;
4
5use crate::events::{ReplicateRequest, ReplicatorControlEvent};
6use crate::handles::PartitionState;
7use crate::types::{Lsn, Role};
8
9pub struct NoopReplicator;
16
17impl NoopReplicator {
18 pub async fn run(
21 mut control_rx: mpsc::Receiver<ReplicatorControlEvent>,
22 mut data_rx: mpsc::Receiver<ReplicateRequest>,
23 state: Arc<PartitionState>,
24 ) {
25 let mut role = Role::None;
26 let mut next_lsn: Lsn = 1;
27
28 loop {
29 tokio::select! {
30 biased;
31
32 event = control_rx.recv() => {
33 let Some(event) = event else { break };
34 match event {
35 ReplicatorControlEvent::Open { reply, .. } => {
36 let _ = reply.send(Ok(()));
37 }
38 ReplicatorControlEvent::Close { reply } => {
39 let _ = reply.send(Ok(()));
40 break;
41 }
42 ReplicatorControlEvent::Abort => {
43 break;
44 }
45 ReplicatorControlEvent::ChangeRole {
46 role: new_role,
47 reply,
48 ..
49 } => {
50 role = new_role;
51 let _ = reply.send(Ok(()));
52 }
53 ReplicatorControlEvent::UpdateEpoch { reply, .. } => {
54 let _ = reply.send(Ok(()));
55 }
56 ReplicatorControlEvent::OnDataLoss { reply } => {
57 let _ = reply.send(Ok(crate::types::DataLossAction::None));
58 }
59 ReplicatorControlEvent::UpdateCatchUpConfiguration { reply, .. } => {
60 let _ = reply.send(Ok(()));
61 }
62 ReplicatorControlEvent::UpdateCurrentConfiguration { reply, .. } => {
63 let _ = reply.send(Ok(()));
64 }
65 ReplicatorControlEvent::WaitForCatchUpQuorum { reply, .. } => {
66 let _ = reply.send(Ok(()));
67 }
68 ReplicatorControlEvent::BuildReplica { reply, .. } => {
69 let _ = reply.send(Ok(()));
70 }
71 ReplicatorControlEvent::RemoveReplica { reply, .. } => {
72 let _ = reply.send(Ok(()));
73 }
74 }
75 }
76
77 req = data_rx.recv(), if role == Role::Primary => {
78 let Some(req) = req else { break };
79 let lsn = next_lsn;
80 next_lsn += 1;
81 state.set_current_progress(lsn);
82 state.set_committed_lsn(lsn);
83 let _ = req.reply.send(Ok(lsn));
84 }
85
86 else => break,
87 }
88 }
89 }
90}
91
92#[cfg(test)]
93mod tests {
94 use super::*;
95 use crate::events::ReplicateRequest;
96 use crate::types::{AccessStatus, CancellationToken, Epoch, OpenMode};
97
98 fn runtime_set_status_for_role(state: &PartitionState, role: Role) {
100 match role {
101 Role::Primary => {
102 state.set_read_status(AccessStatus::Granted);
103 state.set_write_status(AccessStatus::Granted);
104 }
105 _ => {
106 state.set_read_status(AccessStatus::NotPrimary);
107 state.set_write_status(AccessStatus::NotPrimary);
108 }
109 }
110 }
111
112 #[tokio::test]
113 async fn test_noop_lifecycle() {
114 let (control_tx, control_rx) = tokio::sync::mpsc::channel(16);
115 let (data_tx, data_rx) = tokio::sync::mpsc::channel::<ReplicateRequest>(16);
116 let state = Arc::new(PartitionState::new());
117
118 let state_cp = state.clone();
119 let handle = tokio::spawn(async move {
120 NoopReplicator::run(control_rx, data_rx, state_cp).await;
121 });
122
123 let (tx, rx) = tokio::sync::oneshot::channel();
125 control_tx
126 .send(ReplicatorControlEvent::Open {
127 mode: OpenMode::New,
128 reply: tx,
129 })
130 .await
131 .unwrap();
132 rx.await.unwrap().unwrap();
133
134 let (tx, rx) = tokio::sync::oneshot::channel();
136 control_tx
137 .send(ReplicatorControlEvent::ChangeRole {
138 epoch: Epoch::new(0, 1),
139 role: Role::Primary,
140 reply: tx,
141 })
142 .await
143 .unwrap();
144 rx.await.unwrap().unwrap();
145
146 runtime_set_status_for_role(&state, Role::Primary);
148
149 assert_eq!(state.read_status(), AccessStatus::Granted);
150 assert_eq!(state.write_status(), AccessStatus::Granted);
151
152 let (tx, rx) = tokio::sync::oneshot::channel();
154 data_tx
155 .send(ReplicateRequest {
156 data: bytes::Bytes::from("hello"),
157 reply: tx,
158 })
159 .await
160 .unwrap();
161 let lsn = rx.await.unwrap().unwrap();
162 assert_eq!(lsn, 1);
163 assert_eq!(state.current_progress(), 1);
164
165 let (tx, rx) = tokio::sync::oneshot::channel();
167 data_tx
168 .send(ReplicateRequest {
169 data: bytes::Bytes::from("world"),
170 reply: tx,
171 })
172 .await
173 .unwrap();
174 let lsn = rx.await.unwrap().unwrap();
175 assert_eq!(lsn, 2);
176
177 let (tx, rx) = tokio::sync::oneshot::channel();
179 control_tx
180 .send(ReplicatorControlEvent::ChangeRole {
181 epoch: Epoch::new(0, 2),
182 role: Role::ActiveSecondary,
183 reply: tx,
184 })
185 .await
186 .unwrap();
187 rx.await.unwrap().unwrap();
188
189 runtime_set_status_for_role(&state, Role::ActiveSecondary);
191
192 assert_eq!(state.read_status(), AccessStatus::NotPrimary);
193 assert_eq!(state.write_status(), AccessStatus::NotPrimary);
194
195 let (tx, rx) = tokio::sync::oneshot::channel();
197 control_tx
198 .send(ReplicatorControlEvent::Close { reply: tx })
199 .await
200 .unwrap();
201 rx.await.unwrap().unwrap();
202
203 handle.await.unwrap();
204 }
205
206 #[tokio::test]
207 async fn test_noop_replicate_handle() {
208 use crate::handles::StateReplicatorHandle;
209
210 let (control_tx, control_rx) = tokio::sync::mpsc::channel(16);
211 let (data_tx, data_rx) = tokio::sync::mpsc::channel::<ReplicateRequest>(16);
212 let state = Arc::new(PartitionState::new());
213
214 let replicator_handle = StateReplicatorHandle::new(data_tx.clone(), state.clone());
215
216 let state_cp = state.clone();
217 let handle = tokio::spawn(async move {
218 NoopReplicator::run(control_rx, data_rx, state_cp).await;
219 });
220
221 let (tx, rx) = tokio::sync::oneshot::channel();
223 control_tx
224 .send(ReplicatorControlEvent::Open {
225 mode: OpenMode::New,
226 reply: tx,
227 })
228 .await
229 .unwrap();
230 rx.await.unwrap().unwrap();
231
232 let (tx, rx) = tokio::sync::oneshot::channel();
233 control_tx
234 .send(ReplicatorControlEvent::ChangeRole {
235 epoch: Epoch::new(0, 1),
236 role: Role::Primary,
237 reply: tx,
238 })
239 .await
240 .unwrap();
241 rx.await.unwrap().unwrap();
242
243 runtime_set_status_for_role(&state, Role::Primary);
245
246 let token = CancellationToken::new();
248 let lsn = replicator_handle
249 .replicate(bytes::Bytes::from("test"), token)
250 .await
251 .unwrap();
252 assert_eq!(lsn, 1);
253
254 let (tx, rx) = tokio::sync::oneshot::channel();
256 control_tx
257 .send(ReplicatorControlEvent::Close { reply: tx })
258 .await
259 .unwrap();
260 rx.await.unwrap().unwrap();
261
262 handle.await.unwrap();
263 }
264
265 #[tokio::test]
266 async fn test_noop_replicate_not_primary() {
267 use crate::handles::StateReplicatorHandle;
268
269 let (_control_tx, control_rx) = tokio::sync::mpsc::channel(16);
270 let (data_tx, data_rx) = tokio::sync::mpsc::channel::<ReplicateRequest>(16);
271 let state = Arc::new(PartitionState::new());
272
273 let replicator_handle = StateReplicatorHandle::new(data_tx.clone(), state.clone());
274
275 let state_cp = state.clone();
276 let _handle = tokio::spawn(async move {
277 NoopReplicator::run(control_rx, data_rx, state_cp).await;
278 });
279
280 let token = CancellationToken::new();
282 let result = replicator_handle
283 .replicate(bytes::Bytes::from("test"), token)
284 .await;
285
286 assert!(matches!(result, Err(crate::KubericError::NotPrimary)));
287 }
288}