use crate::RaftTypeConfig;
use crate::async_runtime::MpscSender;
use crate::async_runtime::MpscWeakSender;
use crate::async_runtime::SendError;
use crate::core::sm;
use crate::type_config::TypeConfigExt;
use crate::type_config::alias::JoinHandleOf;
use crate::type_config::alias::MpscSenderOf;
use crate::type_config::alias::MpscWeakSenderOf;
use crate::type_config::alias::SnapshotOf;
pub(crate) struct Handle<C, SM = ()>
where C: RaftTypeConfig
{
pub(in crate::core::sm) cmd_tx: MpscSenderOf<C, sm::Command<C, SM>>,
#[allow(dead_code)]
pub(in crate::core::sm) join_handle: JoinHandleOf<C, ()>,
}
impl<C, SM> Handle<C, SM>
where C: RaftTypeConfig
{
pub(crate) async fn send(&mut self, cmd: sm::Command<C, SM>) -> Result<(), SendError<sm::Command<C, SM>>> {
tracing::debug!("sending command to state machine worker: {:?}", cmd);
self.cmd_tx.send(cmd).await
}
pub(crate) fn downgrade_sender(&self) -> MpscWeakSenderOf<C, sm::Command<C, SM>> {
self.cmd_tx.downgrade()
}
pub(crate) fn new_snapshot_reader(&self) -> SnapshotReader<C, SM> {
SnapshotReader {
cmd_tx: self.cmd_tx.downgrade(),
}
}
}
pub(crate) struct SnapshotReader<C, SM = ()>
where C: RaftTypeConfig
{
cmd_tx: MpscWeakSenderOf<C, sm::Command<C, SM>>,
}
impl<C, SM> SnapshotReader<C, SM>
where C: RaftTypeConfig
{
pub(crate) async fn get_snapshot(&self) -> Result<Option<SnapshotOf<C>>, &'static str> {
let (tx, rx) = C::oneshot();
let cmd = sm::Command::get_snapshot(tx);
tracing::debug!("SnapshotReader sending command to sm::Worker: {:?}", cmd);
let Some(cmd_tx) = self.cmd_tx.upgrade() else {
tracing::info!("failed to upgrade cmd_tx, sm::Worker may have shutdown");
return Err("failed to upgrade cmd_tx, sm::Worker may have shutdown");
};
cmd_tx.send(cmd).await.ok();
let snapshot = match rx.await {
Ok(x) => x,
Err(_e) => {
tracing::error!("failed to receive snapshot, sm::Worker may have shutdown");
return Err("failed to receive snapshot, sm::Worker may have shutdown");
}
};
Ok(snapshot)
}
}