1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
use super::*;
pub struct Effect {
pub state_machine: StateMachine,
}
impl Effect {
pub async fn exec(self) -> Result<()> {
let cur_app_index = self
.state_machine
.application_pointer
.load(Ordering::SeqCst);
ensure!(cur_app_index < self.state_machine.kernel_pointer.load(Ordering::SeqCst));
let process_index = cur_app_index + 1;
let e = self.state_machine.get_entry(process_index).await?;
let command = Command::deserialize(&e.command);
let do_process = match command {
Command::ExecuteRequest { .. } => true,
Command::CompleteRequest { .. } => true,
Command::Snapshot { .. } => true,
_ => false,
};
if do_process {
let mut response_cache = self.state_machine.response_cache.lock();
debug!("process user@{process_index}");
match command {
Command::Snapshot { .. } => {
self.state_machine.app.apply_snapshot(process_index).await?;
}
Command::ExecuteRequest {
message,
request_id,
} => {
if response_cache.should_execute(&request_id) {
let resp = self
.state_machine
.app
.process_write(message, process_index)
.await?;
response_cache.insert_response(request_id.clone(), resp);
}
// Leader may have the completion for the request.
let app_completion = self
.state_machine
.application_completions
.lock()
.remove(&process_index);
if let Some(app_completion) = app_completion {
if let Some(resp) = response_cache.get_response(&request_id) {
// If client abort the request before retry,
// the completion channel is destroyed because the gRPC is context is cancelled.
// In this case, we should keep the response in the cache for the later request.
if let Err(resp) = app_completion.complete_with(resp) {
response_cache.insert_response(request_id.clone(), resp);
} else {
// After the request is completed, we queue a `CompleteRequest` command for terminating the context.
// This should be queued and replicated to the followers.
// Otherwise followers will never know the request is completed and the context will never be terminated.
let command = Command::CompleteRequest { request_id };
state_machine::effect::append_entry::Effect {
state_machine: self.state_machine.clone(),
}
.exec(Command::serialize(command), None)
.await
.ok();
}
}
}
}
Command::CompleteRequest { request_id } => {
response_cache.complete_response(&request_id);
}
_ => {}
}
}
self.state_machine
.application_pointer
.fetch_max(process_index, Ordering::SeqCst);
Ok(())
}
}