use tokio::sync::mpsc;
use crate::core::sm;
use crate::type_config::alias::JoinHandleOf;
use crate::type_config::TypeConfigExt;
use crate::RaftTypeConfig;
use crate::Snapshot;
pub(crate) struct Handle<C>
where C: RaftTypeConfig
{
pub(in crate::core::sm) cmd_tx: mpsc::UnboundedSender<sm::Command<C>>,
#[allow(dead_code)]
pub(in crate::core::sm) join_handle: JoinHandleOf<C, ()>,
}
impl<C> Handle<C>
where C: RaftTypeConfig
{
pub(crate) fn send(&mut self, cmd: sm::Command<C>) -> Result<(), mpsc::error::SendError<sm::Command<C>>> {
tracing::debug!("sending command to state machine worker: {:?}", cmd);
self.cmd_tx.send(cmd)
}
pub(crate) fn new_snapshot_reader(&self) -> SnapshotReader<C> {
SnapshotReader {
cmd_tx: self.cmd_tx.downgrade(),
}
}
}
pub(crate) struct SnapshotReader<C>
where C: RaftTypeConfig
{
cmd_tx: mpsc::WeakUnboundedSender<sm::Command<C>>,
}
impl<C> SnapshotReader<C>
where C: RaftTypeConfig
{
pub(crate) async fn get_snapshot(&self) -> Result<Option<Snapshot<C>>, &'static str> {
let (tx, rx) = C::oneshot();
let cmd = sm::Command::get_snapshot(tx);
tracing::debug!("SnapshotReader sending command to sm::Worker: {:?}", cmd);
let Some(cmd_tx) = self.cmd_tx.upgrade() else {
tracing::info!("failed to upgrade cmd_tx, sm::Worker may have shutdown");
return Err("failed to upgrade cmd_tx, sm::Worker may have shutdown");
};
let _ = cmd_tx.send(cmd);
let got = match rx.await {
Ok(x) => x,
Err(_e) => {
tracing::error!("failed to receive snapshot, sm::Worker may have shutdown");
return Err("failed to receive snapshot, sm::Worker may have shutdown");
}
};
let snapshot = got.unwrap();
Ok(snapshot)
}
}