use std::io;
use crate::RaftTypeConfig;
use crate::StorageError;
use crate::raft_state::IOId;
use crate::type_config::alias::LogIdOf;
use crate::type_config::alias::OneshotSenderOf;
use crate::type_config::alias::WatchSenderOf;
use crate::type_config::async_runtime::oneshot::OneshotSender;
use crate::type_config::async_runtime::watch::WatchSender;
#[deprecated(since = "0.10.0", note = "Use `IOFlushed` instead")]
pub type LogFlushed<C> = IOFlushed<C>;
pub enum IOFlushed<C>
where C: RaftTypeConfig
{
Noop,
Notify(IOFlushedNotify<C>),
Signal(OneshotSenderOf<C, Result<(), io::Error>>),
}
pub struct IOFlushedNotify<C>
where C: RaftTypeConfig
{
io_id: IOId<C>,
tx: WatchSenderOf<C, Result<IOId<C>, StorageError<C>>>,
}
impl<C> IOFlushed<C>
where C: RaftTypeConfig
{
pub fn noop() -> Self {
Self::Noop
}
pub fn signal(tx: OneshotSenderOf<C, Result<(), io::Error>>) -> Self {
Self::Signal(tx)
}
pub(crate) fn new(io_id: IOId<C>, tx: WatchSenderOf<C, Result<IOId<C>, StorageError<C>>>) -> Self {
Self::Notify(IOFlushedNotify { io_id, tx })
}
#[deprecated(since = "0.10.0", note = "Use `io_completed` instead")]
pub fn log_io_completed(self, result: Result<(), io::Error>) {
self.io_completed(result)
}
pub fn io_completed(self, result: Result<(), io::Error>) {
match self {
Self::Noop => {}
Self::Signal(tx) => {
tx.send(result).ok();
}
Self::Notify(IOFlushedNotify { io_id, tx }) => {
let new_value = match result {
Err(e) => {
let sto_err = Self::make_storage_error(&io_id, e);
Err(sto_err)
}
Ok(_) => {
tracing::debug!("{}: IOFlushed completed: {}", func_name!(), io_id);
Ok(io_id)
}
};
let modified = tx.send_if_modified(move |current| {
if current.is_err() {
tracing::debug!("IO completion ignored: already in error state");
false
} else {
*current = new_value;
true
}
});
if !modified {
tracing::debug!("IO completion not sent: channel state unchanged");
}
}
}
}
fn make_storage_error(io_id: &IOId<C>, e: io::Error) -> StorageError<C> {
tracing::error!("io_completed: IOFlushed error: {}, while flushing IO: {}", e, io_id);
let subject = io_id.subject();
let verb = io_id.verb();
StorageError::from_io_error(subject, verb, e)
}
}
pub struct LogApplied<C>
where C: RaftTypeConfig
{
last_log_id: LogIdOf<C>,
tx: OneshotSenderOf<C, Result<(LogIdOf<C>, Vec<C::R>), StorageError<C>>>,
}
impl<C> LogApplied<C>
where C: RaftTypeConfig
{
#[allow(dead_code)]
pub(crate) fn new(
last_log_id: LogIdOf<C>,
tx: OneshotSenderOf<C, Result<(LogIdOf<C>, Vec<C::R>), StorageError<C>>>,
) -> Self {
Self { last_log_id, tx }
}
pub fn completed(self, result: Result<Vec<C::R>, StorageError<C>>) {
let res = match result {
Ok(x) => {
tracing::debug!("LogApplied up to {}", self.last_log_id);
let resp = (self.last_log_id.clone(), x);
self.tx.send(Ok(resp))
}
Err(e) => {
tracing::error!("LogApplied error: {}, while applying up to {}", e, self.last_log_id);
self.tx.send(Err(e))
}
};
if let Err(_e) = res {
tracing::error!("failed to send apply complete event, last_log_id: {}", self.last_log_id);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::engine::testing::UTConfig;
use crate::type_config::TypeConfigExt;
#[test]
fn test_io_flushed_noop() {
UTConfig::<()>::run(async {
let callback = IOFlushed::<UTConfig>::noop();
callback.io_completed(Ok(()));
});
}
#[test]
fn test_io_flushed_noop_with_error() {
UTConfig::<()>::run(async {
let callback = IOFlushed::<UTConfig>::noop();
callback.io_completed(Err(io::Error::other("test error")));
});
}
#[test]
fn test_io_flushed_signal_success() {
UTConfig::<()>::run(async {
let (tx, rx) = UTConfig::<()>::oneshot();
let callback = IOFlushed::<UTConfig>::signal(tx);
callback.io_completed(Ok(()));
let result = rx.await;
assert!(result.is_ok());
assert!(result.unwrap().is_ok());
});
}
#[test]
fn test_io_flushed_signal_with_error() {
UTConfig::<()>::run(async {
let (tx, rx) = UTConfig::<()>::oneshot();
let callback = IOFlushed::<UTConfig>::signal(tx);
callback.io_completed(Err(io::Error::other("test error")));
let result = rx.await;
assert!(result.is_ok());
let io_result = result.unwrap();
assert!(io_result.is_err());
assert_eq!(io_result.unwrap_err().kind(), io::ErrorKind::Other);
});
}
#[test]
fn test_io_flushed_signal_receiver_dropped() {
UTConfig::<()>::run(async {
let (tx, rx) = UTConfig::<()>::oneshot();
drop(rx);
let callback = IOFlushed::<UTConfig>::signal(tx);
callback.io_completed(Ok(()));
});
}
}