use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Instant;
use parking_lot::{Condvar, Mutex};
use crate::error::StoreError;
use crate::types::{BlockId, BlockUndo, Operation};
#[derive(Debug, Clone, Copy)]
pub struct ApplyMetricsContext {
pub started_at: Instant,
pub ops_count: usize,
pub set_count: usize,
pub zero_delete_count: usize,
}
impl ApplyMetricsContext {
pub fn from_ops(started_at: Instant, ops: &[Operation]) -> Self {
let mut set_count = 0usize;
let mut zero_delete_count = 0usize;
for op in ops {
if op.value.is_delete() {
zero_delete_count += 1;
} else {
set_count += 1;
}
}
Self {
started_at,
ops_count: ops.len(),
set_count,
zero_delete_count,
}
}
}
#[derive(Debug)]
pub enum TaskStatus {
Pending,
Persisting,
Cancelled,
Completed(Result<(), Arc<StoreError>>),
}
pub struct PersistenceTask {
pub block_height: BlockId,
pub operations: Vec<Operation>,
undo: Mutex<Option<Arc<BlockUndo>>>,
pub metrics: ApplyMetricsContext,
cancelled: AtomicBool,
status: Mutex<TaskStatus>,
status_cv: Condvar,
}
impl PersistenceTask {
pub fn new(
block_height: BlockId,
operations: Vec<Operation>,
undo: Arc<BlockUndo>,
metrics: ApplyMetricsContext,
) -> Arc<Self> {
Arc::new(Self {
block_height,
operations,
undo: Mutex::new(Some(undo)),
metrics,
cancelled: AtomicBool::new(false),
status: Mutex::new(TaskStatus::Pending),
status_cv: Condvar::new(),
})
}
pub fn clone_undo(&self) -> Arc<BlockUndo> {
self.undo
.lock()
.as_ref()
.expect("undo released before persistence began")
.clone()
}
pub fn release_undo(&self) -> Option<Arc<BlockUndo>> {
self.undo.lock().take()
}
pub fn mark_cancelled(&self) {
self.cancelled.store(true, Ordering::Release);
let mut status = self.status.lock();
*status = TaskStatus::Cancelled;
self.status_cv.notify_all();
}
pub fn wait_completion(&self) -> Result<(), Arc<StoreError>> {
let mut status = self.status.lock();
loop {
match &*status {
TaskStatus::Completed(result) => {
return result.clone();
}
TaskStatus::Cancelled => {
return Ok(());
}
TaskStatus::Pending | TaskStatus::Persisting => {
self.status_cv.wait(&mut status);
}
}
}
}
pub fn set_status(&self, new_status: TaskStatus) {
let mut status = self.status.lock();
*status = new_status;
self.status_cv.notify_all();
}
pub fn is_cancelled(&self) -> bool {
self.cancelled.load(Ordering::Acquire)
}
}