use {
crate::{
PeerId,
groups::{
Bonds,
Cursor,
GroupId,
Index,
IndexRange,
StateMachine,
When,
config::GroupConfig,
error::{CommandError, QueryError},
state::{GroupHandle, WorkerCommand, WorkerRaftCommand},
},
primitives::{Short, ShortFmtExt},
},
core::{fmt, fmt::Debug, marker::PhantomData},
dashmap::DashMap,
derive_more::Deref,
futures::FutureExt,
serde::{Deserialize, Serialize},
state::WorkerState,
std::sync::Arc,
tokio::sync::{mpsc::error::SendError, oneshot},
};
pub(in crate::groups) mod state;
pub(super) mod worker;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Consistency {
Weak,
Strong,
}
#[derive(Clone, Deref, Serialize, Deserialize)]
pub struct QueryResultAt<M: StateMachine> {
#[deref]
pub result: M::QueryResult,
pub at_position: Index,
}
impl<M: StateMachine> Debug for QueryResultAt<M> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("QueryResultAt")
.field("result", &"<query result>")
.field("at_position", &self.at_position)
.finish()
}
}
pub struct Group<M: StateMachine> {
state: Arc<WorkerState<M>>,
groups: Arc<DashMap<GroupId, Arc<GroupHandle>>>,
#[doc(hidden)]
_p: PhantomData<M>,
}
impl<M: StateMachine> Group<M> {
pub fn id(&self) -> &GroupId {
self.state.group_id()
}
pub fn is_leader(&self) -> bool {
self.state.when.current_leader() == Some(self.state.local_id())
}
pub fn leader(&self) -> Option<PeerId> {
self.state.when.current_leader()
}
pub fn bonds(&self) -> Bonds<M> {
self.state.bonds.clone()
}
pub fn config(&self) -> &GroupConfig {
&self.state.config
}
pub fn when(&self) -> &When {
&self.state.when
}
pub fn committed(&self) -> Index {
self.state.when.current_committed()
}
pub fn log_position(&self) -> Cursor {
self.state.when.current_log_pos()
}
}
impl<M: StateMachine> Group<M> {
pub fn execute(
&self,
command: M::Command,
) -> impl Future<Output = Result<Index, CommandError<M>>> + Send + Sync + 'static
{
self
.execute_many([command])
.map(|range| range.map(|r| *r.start()))
}
pub fn execute_many(
&self,
commands: impl IntoIterator<Item = M::Command>,
) -> impl Future<Output = Result<IndexRange, CommandError<M>>>
+ Send
+ Sync
+ 'static {
let when = self.when().clone();
let assigned_ix_fut = self.feed_many(commands);
async move {
let assigned_ix = assigned_ix_fut.await?;
when.committed().reaches(assigned_ix.clone()).await;
Ok(assigned_ix)
}
}
pub fn feed(
&self,
command: M::Command,
) -> impl Future<Output = Result<Index, CommandError<M>>> + Send + Sync + 'static
{
self
.feed_many([command])
.map(|range| range.map(|r| *r.start()))
}
pub fn feed_many(
&self,
commands: impl IntoIterator<Item = M::Command>,
) -> impl Future<Output = Result<IndexRange, CommandError<M>>>
+ Send
+ Sync
+ 'static {
let commands: Vec<_> = commands.into_iter().collect();
let (result_tx, result_rx) = oneshot::channel();
if commands.is_empty() {
#[expect(clippy::missing_panics_doc)]
result_tx
.send(Err(CommandError::NoCommands))
.expect("oneshot channel should be open");
} else if let Err(SendError(WorkerCommand::Raft(
WorkerRaftCommand::Feed(_, result_tx),
))) =
self
.state
.cmd_tx
.send(WorkerCommand::Raft(WorkerRaftCommand::Feed(
commands, result_tx,
))) {
#[expect(clippy::missing_panics_doc)]
result_tx
.send(Err(CommandError::GroupTerminated))
.expect("oneshot channel should be open");
}
async move {
match result_rx.await {
Ok(Ok(index_range)) => Ok(index_range),
Ok(Err(e)) => Err(e), Err(_) => Err(CommandError::GroupTerminated), }
}
}
pub fn query(
&self,
query: M::Query,
consistency: Consistency,
) -> impl Future<Output = Result<QueryResultAt<M>, QueryError<M>>>
+ Send
+ Sync
+ 'static {
let (result_tx, result_rx) = oneshot::channel();
if let Err(SendError(WorkerCommand::Raft(WorkerRaftCommand::Query(
_,
_,
result_tx,
)))) =
self
.state
.cmd_tx
.send(WorkerCommand::Raft(WorkerRaftCommand::Query(
query,
consistency,
result_tx,
))) {
#[expect(clippy::missing_panics_doc)]
result_tx
.send(Err(QueryError::GroupTerminated))
.expect("oneshot channel should be open");
}
async move {
match result_rx.await {
Ok(Ok(result)) => Ok(result),
Ok(Err(e)) => Err(e), Err(_) => Err(QueryError::GroupTerminated), }
}
}
}
impl<M: StateMachine> Drop for Group<M> {
fn drop(&mut self) {
let labels = [("network", self.state.network_id().short().to_string())];
metrics::gauge!("mosaik.groups.active", &labels).decrement(1.0);
self.state.bonds.notify_departure();
self.state.cancel.cancel();
self.groups.remove(self.id());
}
}
impl<M: StateMachine> QueryResultAt<M> {
pub fn into(self) -> M::QueryResult {
self.result
}
pub const fn result(&self) -> &M::QueryResult {
&self.result
}
pub const fn state_position(&self) -> Index {
self.at_position
}
}
impl<M: StateMachine> PartialEq<M::QueryResult> for QueryResultAt<M>
where
M::QueryResult: PartialEq,
{
fn eq(&self, other: &M::QueryResult) -> bool {
&self.result == other
}
}
impl<M: StateMachine> core::fmt::Display for QueryResultAt<M>
where
M::QueryResult: core::fmt::Display,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", &self.result)
}
}
impl<M: StateMachine> Group<M> {
pub(super) const fn new(
state: Arc<WorkerState<M>>,
groups: Arc<DashMap<GroupId, Arc<GroupHandle>>>,
) -> Self {
Self {
state,
groups,
_p: PhantomData,
}
}
}
impl<M: StateMachine> fmt::Display for Group<M> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Group({})", Short(self.id()))
}
}