use {
crate::{
Consistency,
NetworkId,
PeerId,
discovery::{Discovery, SignedPeerEntry},
groups::{
Bonds,
CommandError,
Config,
Group,
GroupId,
Groups,
IndexRange,
QueryError,
QueryResultAt,
StateMachine,
When,
bond::{BondEvent, HandshakeStart},
config::GroupConfig,
key::SecretProof,
},
network::{LocalNode, link::Link},
},
core::any::{Any, TypeId},
dashmap::DashMap,
iroh::protocol::AcceptError,
std::sync::Arc,
tokio::sync::{
mpsc::{UnboundedReceiver, UnboundedSender},
oneshot,
},
tokio_util::sync::CancellationToken,
};
#[derive(Debug)]
pub struct GroupHandle {
state: Arc<dyn Any + Send + Sync + 'static>,
accepts: UnboundedSender<AcceptRequest>,
}
impl GroupHandle {
pub(crate) fn new<M: StateMachine>(state: Arc<WorkerState<M>>) -> Self {
let accepts = state.accepts.clone();
Self { state, accepts }
}
pub async fn accept(
&self,
link: Link<Groups>,
peer: SignedPeerEntry,
handshake: HandshakeStart,
) -> Result<(), AcceptError> {
let (result_tx, result_rx) = oneshot::channel();
let request = AcceptRequest {
link,
peer,
handshake,
result_tx,
};
self.accepts.send(request).map_err(AcceptError::from_err)?;
result_rx.await.map_err(AcceptError::from_err)?
}
pub fn public_handle<M: StateMachine>(
&self,
groups: &Arc<DashMap<GroupId, Arc<Self>>>,
) -> Group<M> {
self
.state
.clone()
.downcast::<WorkerState<M>>()
.expect("GroupHandle state type mismatch. this is a bug.")
.public_handle(groups)
}
}
pub(in crate::groups) struct WorkerState<M: StateMachine> {
pub config: GroupConfig,
pub global_config: Arc<Config>,
pub local: LocalNode,
pub discovery: Discovery,
pub bonds: Bonds<M>,
pub cancel: CancellationToken,
pub when: When,
pub accepts: UnboundedSender<AcceptRequest>,
pub cmd_tx: UnboundedSender<WorkerCommand<M>>,
pub types: (TypeId, TypeId),
}
impl<M: StateMachine> WorkerState<M> {
pub fn local_id(&self) -> PeerId {
self.local.id()
}
pub const fn group_id(&self) -> &GroupId {
self.config.group_id()
}
pub fn network_id(&self) -> &NetworkId {
self.local.network_id()
}
pub const fn state_machine_type(&self) -> TypeId {
self.types.0
}
pub fn generate_key_proof(&self, link: &Link<Groups>) -> SecretProof {
self
.config
.key()
.generate_proof(link, self.local.id(), *self.group_id())
}
pub fn validate_key_proof(
&self,
link: &Link<Groups>,
proof: SecretProof,
) -> bool {
self
.config
.key()
.validate_proof(link, proof, *self.group_id())
}
pub fn bond_with(&self, peer: SignedPeerEntry) {
let _ = self.cmd_tx.send(WorkerCommand::Connect(Box::new(peer)));
}
pub fn public_handle(
self: &Arc<Self>,
groups: &Arc<DashMap<GroupId, Arc<GroupHandle>>>,
) -> Group<M> {
assert_eq!(self.state_machine_type(), TypeId::of::<M>());
Group::new(Arc::clone(self), Arc::clone(groups))
}
}
pub enum WorkerCommand<M: StateMachine> {
Connect(Box<SignedPeerEntry>),
Subscribe(UnboundedReceiver<BondEvent<M>>, PeerId),
Raft(WorkerRaftCommand<M>),
}
pub(in crate::groups) enum WorkerRaftCommand<M: StateMachine> {
Feed(
Vec<M::Command>,
oneshot::Sender<Result<IndexRange, CommandError<M>>>,
),
Query(
M::Query,
Consistency,
oneshot::Sender<Result<QueryResultAt<M>, QueryError<M>>>,
),
}
pub(in crate::groups) struct AcceptRequest {
pub link: Link<Groups>,
pub peer: SignedPeerEntry,
pub handshake: HandshakeStart,
pub result_tx: oneshot::Sender<Result<(), AcceptError>>,
}