use super::*;
impl CommandLog {
pub fn register_completion(&self, index: Index, completion: Completion) {
match completion {
Completion::User(completion) => {
self.user_completions.lock().insert(index, completion);
}
Completion::Kern(completion) => {
self.kern_completions.lock().insert(index, completion);
}
}
}
pub async fn advance_snapshot_index(&self) -> Result<()> {
let cur_snapshot_index = self.snapshot_pointer.load(Ordering::SeqCst);
let proposed_snapshot_index = self.app.get_latest_snapshot().await?;
if proposed_snapshot_index > cur_snapshot_index {
info!("found a newer proposed snapshot@{proposed_snapshot_index}. will move the snapshot index.");
let new_config = {
let last_membership_index = self
.find_last_membership_index(proposed_snapshot_index)
.await?
.context(Error::BadLogState)?;
self.try_read_membership(last_membership_index)
.await?
.context(Error::BadLogState)?
};
let new_snapshot_entry = {
let old_entry = self.get_entry(proposed_snapshot_index).await?;
Entry {
command: Command::serialize(Command::Snapshot {
membership: new_config,
}),
..old_entry
}
};
self.insert_snapshot(new_snapshot_entry).await?;
}
Ok(())
}
pub(crate) async fn advance_user_process(&self, app: App) -> Result<()> {
let cur_user_index = self.user_pointer.load(Ordering::SeqCst);
ensure!(cur_user_index < self.kern_pointer.load(Ordering::SeqCst));
let process_index = cur_user_index + 1;
let e = self.get_entry(process_index).await?;
let command = Command::deserialize(&e.command);
let do_process = match command {
Command::ExecuteRequest { .. } => true,
Command::CompleteRequest { .. } => true,
Command::Snapshot { .. } => true,
_ => false,
};
if do_process {
let mut response_cache = self.response_cache.lock();
debug!("process user@{process_index}");
match command {
Command::Snapshot { .. } => {
app.install_snapshot(process_index).await?;
}
Command::ExecuteRequest {
message,
request_id,
} => {
if response_cache.should_execute(&request_id) {
let resp = app.process_write(message, process_index).await?;
response_cache.insert_response(request_id.clone(), resp);
}
if let Some(user_completion) =
self.user_completions.lock().remove(&process_index)
{
if let Some(resp) = response_cache.get_response(&request_id) {
if let Err(resp) = user_completion.complete_with(resp) {
response_cache.insert_response(request_id.clone(), resp);
} else {
let command = Command::CompleteRequest { request_id };
self.append_new_entry(Command::serialize(command), None)
.await
.ok();
}
}
}
}
Command::CompleteRequest { request_id } => {
response_cache.complete_response(&request_id);
}
_ => {}
}
}
self.user_pointer.store(process_index, Ordering::SeqCst);
Ok(())
}
pub(crate) async fn advance_kern_process(&self, voter: Voter) -> Result<()> {
let cur_kern_index = self.kern_pointer.load(Ordering::SeqCst);
ensure!(cur_kern_index < self.commit_pointer.load(Ordering::SeqCst));
let process_index = cur_kern_index + 1;
let e = self.get_entry(process_index).await?;
let command = Command::deserialize(&e.command);
let do_process = match command {
Command::Barrier { .. } => true,
Command::ClusterConfiguration { .. } => true,
_ => false,
};
if do_process {
debug!("process kern@{process_index}");
match command {
Command::Barrier(term) => {
voter.commit_safe_term(term);
}
Command::ClusterConfiguration { .. } => {}
_ => {}
}
if let Some(kern_completion) = self.kern_completions.lock().remove(&process_index) {
kern_completion.complete();
}
}
self.kern_pointer.store(process_index, Ordering::SeqCst);
Ok(())
}
}