use {
super::{
CollectionConfig,
CollectionFromDef,
Error,
READER,
SyncConfig,
WRITER,
When,
primitives::{StoreId, Value, Version},
},
crate::{
Group,
GroupId,
Network,
PeerId,
UniqueId,
collections::sync::{
Snapshot,
SnapshotStateMachine,
SnapshotSync,
protocol::SnapshotRequest,
},
groups::{
ApplyContext,
CommandError,
Cursor,
LeadershipPreference,
StateMachine,
},
primitives::{EncodeError, Encoded},
},
core::{
any::type_name,
ops::{Deref, Range},
},
futures::{FutureExt, TryFutureExt},
serde::{Deserialize, Serialize},
tokio::sync::watch,
};
pub type OnceWriter<T> = Once<T, WRITER>;
pub type OnceReader<T> = Once<T, READER>;
pub struct Once<T: Value, const IS_WRITER: bool = WRITER> {
when: When,
group: Group<OnceStateMachine<T>>,
data: watch::Receiver<Option<T>>,
}
impl<T: Value, const IS_WRITER: bool> Once<T, IS_WRITER> {
pub fn read(&self) -> Option<T> {
self.data.borrow().clone()
}
pub fn get(&self) -> Option<T> {
self.read()
}
pub fn is_empty(&self) -> bool {
self.data.borrow().is_none()
}
pub fn is_none(&self) -> bool {
self.is_empty()
}
pub fn is_some(&self) -> bool {
!self.is_empty()
}
pub const fn when(&self) -> &When {
&self.when
}
pub fn version(&self) -> Version {
Version(self.group.committed())
}
pub fn group_id(&self) -> &GroupId {
self.group.id()
}
pub async fn await_value(&self) -> T {
self.when().online().await;
loop {
let updated_fut = self.when().updated();
if let Some(value) = self.read() {
return value;
}
updated_fut.await;
}
}
}
impl<T: Value> OnceWriter<T> {
pub fn writer(network: &Network, store_id: impl Into<StoreId>) -> Self {
Self::writer_with_config(network, store_id, CollectionConfig::default())
}
pub fn writer_with_config(
network: &Network,
store_id: impl Into<StoreId>,
config: impl Into<CollectionConfig>,
) -> Self {
Self::create::<WRITER>(network, store_id, config.into())
}
pub fn new(network: &Network, store_id: impl Into<StoreId>) -> Self {
Self::writer(network, store_id)
}
pub fn new_with_config(
network: &Network,
store_id: impl Into<StoreId>,
config: impl Into<CollectionConfig>,
) -> Self {
Self::writer_with_config(network, store_id, config)
}
pub fn write(
&self,
value: T,
) -> impl Future<Output = Result<Version, Error<T>>> + Send + Sync + 'static
{
let value = Encoded(value);
self.execute(
OnceCommand::Write { value },
|cmd| match cmd {
OnceCommand::Write { value } => Error::Offline(value.0),
OnceCommand::TakeSnapshot(_) => unreachable!(),
},
|cmd, e| match cmd {
OnceCommand::Write { value } => Error::Encoding(value.0, e),
OnceCommand::TakeSnapshot(_) => unreachable!(),
},
)
}
pub fn set(
&self,
value: T,
) -> impl Future<Output = Result<Version, Error<T>>> + Send + Sync + 'static
{
self.write(value)
}
}
impl<T: Value, const IS_WRITER: bool> Once<T, IS_WRITER> {
pub fn reader(
network: &Network,
store_id: impl Into<StoreId>,
) -> OnceReader<T> {
Self::reader_with_config(network, store_id, CollectionConfig::default())
}
pub fn reader_with_config(
network: &Network,
store_id: impl Into<StoreId>,
config: impl Into<CollectionConfig>,
) -> OnceReader<T> {
Self::create::<READER>(network, store_id, config.into())
}
fn create<const W: bool>(
network: &Network,
store_id: impl Into<StoreId>,
config: CollectionConfig,
) -> Once<T, W> {
let store_id = store_id.into();
let machine = OnceStateMachine::new(
store_id, W,
config.sync,
network.local().id(),
);
let data = machine.data();
let mut builder = network
.groups()
.with_key(store_id)
.with_state_machine(machine);
for validator in config.auth {
builder = builder.require_ticket(validator);
}
let group = builder.join();
let when = When::new(group.when().clone());
Once::<T, W> { when, group, data }
}
}
impl<T: Value, const WRITER: bool> CollectionFromDef for Once<T, WRITER> {
type Reader = OnceReader<T>;
type Writer = OnceWriter<T>;
fn reader_with_config(
network: &Network,
store_id: StoreId,
config: CollectionConfig,
) -> Self::Reader {
Self::Reader::reader_with_config(network, store_id, config)
}
fn writer_with_config(
network: &Network,
store_id: StoreId,
config: CollectionConfig,
) -> Self::Writer {
Self::Writer::writer_with_config(network, store_id, config)
}
}
impl<T: Value> OnceWriter<T> {
fn execute<TErr>(
&self,
command: OnceCommand<T>,
offline_err: impl FnOnce(OnceCommand<T>) -> Error<TErr> + Send + Sync + 'static,
encoding_err: impl FnOnce(OnceCommand<T>, EncodeError) -> Error<TErr>
+ Send
+ Sync
+ 'static,
) -> impl Future<Output = Result<Version, Error<TErr>>> + Send + Sync + 'static
{
self
.group
.execute(command)
.map_err(|err| match err {
CommandError::Offline(mut items) => offline_err(items.remove(0)),
CommandError::Encoding(mut items, err) => {
encoding_err(items.remove(0), err)
}
CommandError::GroupTerminated => Error::NetworkDown,
CommandError::NoCommands => unreachable!(),
})
.map(|pos| pos.map(Version))
}
}
struct OnceStateMachine<T: Value> {
data: Option<T>,
latest: watch::Sender<Option<T>>,
store_id: StoreId,
local_id: PeerId,
state_sync: SnapshotSync<Self>,
is_writer: bool,
}
impl<T: Value> OnceStateMachine<T> {
pub fn new(
store_id: StoreId,
is_writer: bool,
sync_config: SyncConfig,
local_id: PeerId,
) -> Self {
let data = None;
let state_sync = SnapshotSync::new(sync_config, |request| {
OnceCommand::TakeSnapshot(request)
});
let latest = watch::Sender::new(data.clone());
Self {
data,
latest,
store_id,
local_id,
state_sync,
is_writer,
}
}
pub fn data(&self) -> watch::Receiver<Option<T>> {
self.latest.subscribe()
}
}
impl<T: Value> StateMachine for OnceStateMachine<T> {
type Command = OnceCommand<T>;
type Query = ();
type QueryResult = ();
type StateSync = SnapshotSync<Self>;
fn apply(&mut self, command: Self::Command, ctx: &dyn ApplyContext) {
self.apply_batch([command], ctx);
}
fn apply_batch(
&mut self,
commands: impl IntoIterator<Item = Self::Command>,
ctx: &dyn ApplyContext,
) {
let mut commands_len = 0usize;
let mut sync_requests = vec![];
for command in commands {
match command {
OnceCommand::Write { value } => {
if self.data.is_none() {
self.data = Some(value.0);
}
}
OnceCommand::TakeSnapshot(request) => {
if request.requested_by != self.local_id
&& !self.state_sync.is_expired(&request)
{
sync_requests.push(request);
}
}
}
commands_len += 1;
}
self.latest.send_replace(self.data.clone());
if !sync_requests.is_empty() {
let snapshot = self.create_snapshot();
let position = Cursor::new(
ctx.current_term(),
ctx.committed().index() + commands_len as u64,
);
for request in sync_requests {
self
.state_sync
.serve_snapshot(request, position, snapshot.clone());
}
}
}
fn signature(&self) -> crate::UniqueId {
UniqueId::from("mosaik_collections_once")
.derive(self.store_id)
.derive(type_name::<T>())
}
fn query(&self, (): Self::Query) {}
fn state_sync(&self) -> Self::StateSync {
self.state_sync.clone()
}
fn leadership_preference(&self) -> LeadershipPreference {
if self.is_writer {
LeadershipPreference::Normal
} else {
LeadershipPreference::Observer
}
}
}
impl<T: Value> SnapshotStateMachine for OnceStateMachine<T> {
type Snapshot = OnceSnapshot<T>;
fn create_snapshot(&self) -> Self::Snapshot {
OnceSnapshot {
data: self.data.clone().map(Encoded),
}
}
fn install_snapshot(&mut self, snapshot: Self::Snapshot) {
self.data = snapshot.data.map(|d| d.0);
self.latest.send_replace(self.data.clone());
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(bound = "T: Value")]
enum OnceCommand<T> {
Write { value: Encoded<T> },
TakeSnapshot(SnapshotRequest),
}
#[derive(Debug, Clone)]
pub struct OnceSnapshot<T: Value> {
data: Option<Encoded<T>>,
}
impl<T: Value> Default for OnceSnapshot<T> {
fn default() -> Self {
Self { data: None }
}
}
impl<T: Value> Snapshot for OnceSnapshot<T> {
type Item = Encoded<T>;
fn len(&self) -> u64 {
u64::from(self.data.is_some())
}
fn iter_range(
&self,
range: Range<u64>,
) -> Option<impl Iterator<Item = Self::Item>> {
if range.contains(&0) {
Some(self.data.clone().into_iter())
} else {
None
}
}
fn append(&mut self, items: impl IntoIterator<Item = Self::Item>) {
for item in items {
self.data = Some(item);
}
}
}