mosaik 0.3.13

A Rust runtime for building self-organizing, leaderless distributed systems.
Documentation
use {
	core::num::NonZero,
	futures::{StreamExt, stream::FuturesUnordered},
	mosaik::{
		Network,
		PeerId,
		groups::{ApplyContext, Group, LogReplaySync, StateMachine},
		primitives::{Short, UniqueId},
	},
	serde::{Deserialize, Serialize},
};

mod auth;
mod bonds;
mod builder;
mod catchup;
mod execute;
mod feed;
mod leader;

#[derive(Debug)]
struct Counter {
	value: i64,
	sync_batch_size: NonZero<u64>,
}

impl Default for Counter {
	fn default() -> Self {
		Self {
			value: 0,
			sync_batch_size: NonZero::new(1000).unwrap(),
		}
	}
}

impl Counter {
	pub const fn with_sync_batch_size(mut self, sync_batch_size: u64) -> Self {
		self.sync_batch_size = NonZero::new(sync_batch_size).unwrap();
		self
	}
}

impl StateMachine for Counter {
	type Command = CounterCommand;
	type Query = CounterValueQuery;
	type QueryResult = i64;
	type StateSync = LogReplaySync<Self>;

	fn signature(&self) -> UniqueId {
		UniqueId::from_u8(2)
	}

	fn apply(&mut self, command: Self::Command, _: &dyn ApplyContext) {
		match command {
			CounterCommand::Increment(n) => {
				self.value = self.value.wrapping_add(i64::from(n));
			}
			CounterCommand::Decrement(n) => {
				self.value = self.value.wrapping_sub(i64::from(n));
			}
		}
	}

	fn query(&self, query: Self::Query) -> Self::QueryResult {
		match query {
			CounterValueQuery => self.value,
		}
	}

	fn state_sync(&self) -> Self::StateSync {
		LogReplaySync::default().with_batch_size(self.sync_batch_size)
	}
}

#[derive(Debug, Clone, Serialize, Deserialize)]
enum CounterCommand {
	Increment(u32),
	Decrement(u32),
}

#[derive(Debug, Clone, Serialize, Deserialize)]
struct CounterValueQuery;

pub async fn ensure_bonds_formed<M: StateMachine>(
	group: &Group<M>,
	local: &Network,
	peers: &[&Network],
	name: &str,
) {
	loop {
		let bonds = group.bonds();

		if bonds.is_empty() {
			group.bonds().changed().await;
		}

		tracing::info!(
			"{name} bonds changed (local = {})",
			Short(local.local().id())
		);

		for bond in group.bonds().iter() {
			tracing::info!("- {name} bonded with: {}", Short(bond.peer().id()));
		}
		if peers.iter().all(|p| {
			group
				.bonds()
				.iter()
				.any(|b| *b.peer().id() == p.local().id())
		}) {
			tracing::info!("{name} group fully formed");
			break;
		}

		group.bonds().changed().await;
	}
}

/// Resolves when all groups have converged on the same leader and returns the
/// ID of the leader.
async fn leaders_converged<M: StateMachine>(
	groups: impl IntoIterator<Item = &mosaik::Group<M>>,
) -> PeerId {
	let groups = groups.into_iter().collect::<Vec<_>>();
	assert!(!groups.is_empty());

	loop {
		let leaders = groups.iter().map(|g| g.leader()).collect::<Vec<_>>();

		if let Some(first_leader) = leaders.first().and_then(|l| *l)
			&& leaders.iter().all(|&l| l == Some(first_leader))
		{
			return first_leader;
		}

		let mut changes: FuturesUnordered<_> =
			groups.iter().map(|g| g.when().leader_changed()).collect();
		changes.next().await.unwrap();
	}
}