use std::sync::Arc;
use creep::Context;
use futures::channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender};
use parking_lot::RwLock;
use crate::error::ConsensusError;
use crate::state::process::State;
use crate::types::{Address, Node, OverlordMsg};
use crate::DurationConfig;
use crate::{smr::SMR, timer::Timer};
use crate::{Codec, Consensus, ConsensusResult, Crypto, Wal};
type Pile<T> = RwLock<Option<T>>;
pub struct Overlord<T: Codec, F: Consensus<T>, C: Crypto, W: Wal> {
sender: Pile<UnboundedSender<(Context, OverlordMsg<T>)>>,
state_rx: Pile<UnboundedReceiver<(Context, OverlordMsg<T>)>>,
address: Pile<Address>,
consensus: Pile<Arc<F>>,
crypto: Pile<C>,
wal: Pile<Arc<W>>,
}
impl<T, F, C, W> Overlord<T, F, C, W>
where
T: Codec + Send + Sync + 'static,
F: Consensus<T> + 'static,
C: Crypto + Send + Sync + 'static,
W: Wal + 'static,
{
pub fn new(address: Address, consensus: Arc<F>, crypto: C, wal: Arc<W>) -> Self {
let (tx, rx) = unbounded();
Overlord {
sender: RwLock::new(Some(tx)),
state_rx: RwLock::new(Some(rx)),
address: RwLock::new(Some(address)),
consensus: RwLock::new(Some(consensus)),
crypto: RwLock::new(Some(crypto)),
wal: RwLock::new(Some(wal)),
}
}
pub fn get_handler(&self) -> OverlordHandler<T> {
let sender = self.sender.write();
assert!(sender.is_some());
let tx = sender.clone().unwrap();
OverlordHandler::new(tx)
}
pub async fn run(
&self,
interval: u64,
authority_list: Vec<Node>,
timer_config: Option<DurationConfig>,
) -> ConsensusResult<()> {
let (mut smr_provider, evt_1, evt_2) = SMR::new();
let smr_handler = smr_provider.take_smr();
let timer = Timer::new(evt_2, smr_handler.clone(), interval, timer_config);
let (rx, mut state, resp) = {
let mut state_rx = self.state_rx.write();
let mut address = self.address.write();
let mut consensus = self.consensus.write();
let mut crypto = self.crypto.write();
let mut wal = self.wal.write();
let tmp_rx = state_rx.take().unwrap();
let (tmp_state, tmp_resp) = State::new(
smr_handler,
address.take().unwrap(),
interval,
authority_list,
consensus.take().unwrap(),
crypto.take().unwrap(),
wal.take().unwrap(),
);
assert!(address.is_none());
assert!(consensus.is_none());
assert!(crypto.is_none());
assert!(state_rx.is_none());
assert!(wal.is_none());
(tmp_rx, tmp_state, tmp_resp)
};
smr_provider.run();
timer.run();
state.run(rx, evt_1, resp).await?;
Ok(())
}
}
#[derive(Clone, Debug)]
pub struct OverlordHandler<T: Codec>(UnboundedSender<(Context, OverlordMsg<T>)>);
impl<T: Codec> OverlordHandler<T> {
fn new(tx: UnboundedSender<(Context, OverlordMsg<T>)>) -> Self {
OverlordHandler(tx)
}
pub fn send_msg(&self, ctx: Context, msg: OverlordMsg<T>) -> ConsensusResult<()> {
self.0
.unbounded_send((ctx, msg))
.map_err(|e| ConsensusError::Other(format!("Send message error {:?}", e)))
}
}