use {
super::{
CollectionConfig,
CollectionFromDef,
Error,
READER,
SyncConfig,
WRITER,
When,
primitives::{StoreId, Value, Version},
},
crate::{
Group,
GroupId,
Network,
PeerId,
UniqueId,
collections::sync::{
Snapshot,
SnapshotStateMachine,
SnapshotSync,
protocol::SnapshotRequest,
},
groups::{
ApplyContext,
CommandError,
Cursor,
LeadershipPreference,
StateMachine,
},
primitives::{EncodeError, Encoded, ShortFmtExt},
},
core::{
any::type_name,
ops::{Deref, Range},
},
futures::{FutureExt, TryFutureExt},
serde::{Deserialize, Serialize},
std::sync::OnceLock,
tokio::sync::watch,
};
pub type CellWriter<T> = Cell<T, WRITER>;
pub type CellReader<T> = Cell<T, READER>;
pub struct Cell<T: Value, const IS_WRITER: bool = WRITER> {
when: When,
group: Group<CellStateMachine<T>>,
data: watch::Receiver<Option<T>>,
}
impl<T: Value, const IS_WRITER: bool> Cell<T, IS_WRITER> {
pub fn read(&self) -> Option<T> {
self.data.borrow().clone()
}
pub fn get(&self) -> Option<T> {
self.read()
}
pub fn is_empty(&self) -> bool {
self.data.borrow().is_none()
}
pub fn is_none(&self) -> bool {
self.is_empty()
}
pub fn is_some(&self) -> bool {
!self.is_empty()
}
pub const fn when(&self) -> &When {
&self.when
}
pub fn version(&self) -> Version {
Version(self.group.committed())
}
pub fn group_id(&self) -> &GroupId {
self.group.id()
}
}
impl<T: Value> CellWriter<T> {
pub fn writer(network: &Network, store_id: impl Into<StoreId>) -> Self {
Self::writer_with_config(network, store_id, CollectionConfig::default())
}
pub fn writer_with_config(
network: &Network,
store_id: impl Into<StoreId>,
config: impl Into<CollectionConfig>,
) -> Self {
Self::create::<WRITER>(network, store_id, config.into())
}
pub fn new(network: &Network, store_id: impl Into<StoreId>) -> Self {
Self::writer(network, store_id)
}
pub fn new_with_config(
network: &Network,
store_id: impl Into<StoreId>,
config: impl Into<CollectionConfig>,
) -> Self {
Self::writer_with_config(network, store_id, config)
}
pub fn write(
&self,
value: T,
) -> impl Future<Output = Result<Version, Error<T>>> + Send + Sync + 'static
{
let value = Encoded(value);
self.execute(
CellCommand::Write { value },
|cmd| match cmd {
CellCommand::Write { value } => Error::Offline(value.0),
_ => unreachable!(),
},
|cmd, e| match cmd {
CellCommand::Write { value } => Error::Encoding(value.0, e),
_ => unreachable!(),
},
)
}
pub fn set(
&self,
value: T,
) -> impl Future<Output = Result<Version, Error<T>>> + Send + Sync + 'static
{
self.write(value)
}
#[allow(clippy::type_complexity)]
pub fn compare_exchange(
&self,
current: Option<T>,
new: Option<T>,
) -> impl Future<Output = Result<Version, Error<(Option<T>, Option<T>)>>>
+ Send
+ Sync
+ 'static {
let current = current.map(Encoded);
let new = new.map(Encoded);
self.execute(
CellCommand::CompareExchange { current, new },
|cmd| match cmd {
CellCommand::CompareExchange { current, new } => {
Error::Offline((current.map(|v| v.0), new.map(|v| v.0)))
}
_ => unreachable!(),
},
|cmd, e| match cmd {
CellCommand::CompareExchange { current, new } => {
Error::Encoding((current.map(|v| v.0), new.map(|v| v.0)), e)
}
_ => unreachable!(),
},
)
}
pub fn clear(
&self,
) -> impl Future<Output = Result<Version, Error<()>>> + Send + Sync + 'static
{
self.execute(
CellCommand::Clear,
|_| Error::Offline(()),
|_, _| unreachable!(),
)
}
}
impl<T: Value, const IS_WRITER: bool> Cell<T, IS_WRITER> {
pub fn reader(
network: &Network,
store_id: impl Into<StoreId>,
) -> CellReader<T> {
Self::reader_with_config(network, store_id, CollectionConfig::default())
}
pub fn reader_with_config(
network: &Network,
store_id: impl Into<StoreId>,
config: impl Into<CollectionConfig>,
) -> CellReader<T> {
Self::create::<READER>(network, store_id, config.into())
}
fn create<const W: bool>(
network: &Network,
store_id: impl Into<StoreId>,
config: CollectionConfig,
) -> Cell<T, W> {
let store_id = store_id.into();
let machine = CellStateMachine::new(
store_id, W,
config.sync,
network.local().id(),
);
let data = machine.data();
let mut builder = network
.groups()
.with_key(store_id)
.with_state_machine(machine);
for validator in config.auth {
builder = builder.require_ticket(validator);
}
let group = builder.join();
let when = When::new(group.when().clone());
Cell::<T, W> { when, group, data }
}
}
impl<T: Value, const WRITER: bool> CollectionFromDef for Cell<T, WRITER> {
type Reader = CellReader<T>;
type Writer = CellWriter<T>;
fn reader_with_config(
network: &Network,
store_id: StoreId,
config: CollectionConfig,
) -> Self::Reader {
Self::Reader::reader_with_config(network, store_id, config)
}
fn writer_with_config(
network: &Network,
store_id: StoreId,
config: CollectionConfig,
) -> Self::Writer {
Self::Writer::writer_with_config(network, store_id, config)
}
}
impl<T: Value> CellWriter<T> {
fn execute<TErr>(
&self,
command: CellCommand<T>,
offline_err: impl FnOnce(CellCommand<T>) -> Error<TErr> + Send + Sync + 'static,
encoding_err: impl FnOnce(CellCommand<T>, EncodeError) -> Error<TErr>
+ Send
+ Sync
+ 'static,
) -> impl Future<Output = Result<Version, Error<TErr>>> + Send + Sync + 'static
{
self
.group
.execute(command)
.map_err(|err| match err {
CommandError::Offline(mut items) => offline_err(items.remove(0)),
CommandError::Encoding(mut items, err) => {
encoding_err(items.remove(0), err)
}
CommandError::GroupTerminated => Error::NetworkDown,
CommandError::NoCommands => unreachable!(),
})
.map(|pos| pos.map(Version))
}
}
struct CellStateMachine<T: Value> {
data: Option<T>,
latest: watch::Sender<Option<T>>,
store_id: StoreId,
local_id: PeerId,
state_sync: SnapshotSync<Self>,
is_writer: bool,
metrics_labels: OnceLock<[(&'static str, String); 2]>,
}
impl<T: Value> CellStateMachine<T> {
pub fn new(
store_id: StoreId,
is_writer: bool,
sync_config: SyncConfig,
local_id: PeerId,
) -> Self {
let data = None;
let state_sync = SnapshotSync::new(sync_config, |request| {
CellCommand::TakeSnapshot(request)
});
let latest = watch::Sender::new(data.clone());
Self {
data,
latest,
store_id,
local_id,
state_sync,
is_writer,
metrics_labels: OnceLock::new(),
}
}
pub fn data(&self) -> watch::Receiver<Option<T>> {
self.latest.subscribe()
}
}
impl<T: Value> StateMachine for CellStateMachine<T> {
type Command = CellCommand<T>;
type Query = ();
type QueryResult = ();
type StateSync = SnapshotSync<Self>;
fn apply(&mut self, command: Self::Command, ctx: &dyn ApplyContext) {
self.apply_batch([command], ctx);
}
fn apply_batch(
&mut self,
commands: impl IntoIterator<Item = Self::Command>,
ctx: &dyn ApplyContext,
) {
let mut commands_len = 0usize;
let mut sync_requests = vec![];
for command in commands {
match command {
CellCommand::Write { value } => {
self.data = Some(value.0);
}
CellCommand::CompareExchange { current, new } => {
if self.data.as_ref().map(|v| v.encode().ok())
== current.map(|v| v.encode().ok())
{
self.data = new.map(|v| v.0);
}
}
CellCommand::Clear => {
self.data = None;
}
CellCommand::TakeSnapshot(request) => {
if request.requested_by != self.local_id
&& !self.state_sync.is_expired(&request)
{
sync_requests.push(request);
}
}
}
commands_len += 1;
}
self.latest.send_replace(self.data.clone());
let labels = self.metrics_labels.get_or_init(|| {
[
("network", ctx.network_id().short().to_string()),
("group", ctx.group_id().short().to_string()),
]
});
metrics::gauge!("mosaik.collections.cell.size", labels.as_slice())
.set(f64::from(u8::from(self.data.is_some())));
if !sync_requests.is_empty() {
let snapshot = self.create_snapshot();
let position = Cursor::new(
ctx.current_term(),
ctx.committed().index() + commands_len as u64,
);
metrics::counter!("mosaik.collections.syncs.started", labels.as_slice())
.increment(sync_requests.len() as u64);
for request in sync_requests {
self
.state_sync
.serve_snapshot(request, position, snapshot.clone());
}
}
}
fn signature(&self) -> crate::UniqueId {
UniqueId::from("mosaik_collections_cell")
.derive(self.store_id)
.derive(type_name::<T>())
}
fn query(&self, (): Self::Query) {}
fn state_sync(&self) -> Self::StateSync {
self.state_sync.clone()
}
fn leadership_preference(&self) -> LeadershipPreference {
if self.is_writer {
LeadershipPreference::Normal
} else {
LeadershipPreference::Observer
}
}
}
impl<T: Value> SnapshotStateMachine for CellStateMachine<T> {
type Snapshot = CellSnapshot<T>;
fn create_snapshot(&self) -> Self::Snapshot {
CellSnapshot {
data: self.data.clone().map(Encoded),
}
}
fn install_snapshot(&mut self, snapshot: Self::Snapshot) {
self.data = snapshot.data.map(|d| d.0);
self.latest.send_replace(self.data.clone());
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(bound = "T: Value")]
enum CellCommand<T> {
Write {
value: Encoded<T>,
},
CompareExchange {
current: Option<Encoded<T>>,
new: Option<Encoded<T>>,
},
Clear,
TakeSnapshot(SnapshotRequest),
}
#[derive(Debug, Clone)]
pub struct CellSnapshot<T: Value> {
data: Option<Encoded<T>>,
}
impl<T: Value> Default for CellSnapshot<T> {
fn default() -> Self {
Self { data: None }
}
}
impl<T: Value> Snapshot for CellSnapshot<T> {
type Item = Encoded<T>;
fn len(&self) -> u64 {
u64::from(self.data.is_some())
}
fn iter_range(
&self,
range: Range<u64>,
) -> Option<impl Iterator<Item = Self::Item>> {
if range.contains(&0) {
Some(self.data.clone().into_iter())
} else {
None
}
}
fn append(&mut self, items: impl IntoIterator<Item = Self::Item>) {
for item in items {
self.data = Some(item);
}
}
}