use {
super::{
CollectionConfig,
CollectionFromDef,
Error,
READER,
SyncConfig,
WRITER,
When,
primitives::{Key, StoreId, Value, Version},
sync::{
Snapshot,
SnapshotStateMachine,
SnapshotSync,
protocol::SnapshotRequest,
},
},
crate::{
Group,
Network,
PeerId,
UniqueId,
groups::*,
primitives::{EncodeError, Encoded, Short, ShortFmtExt, UnboundedChannel},
},
chrono::{DateTime, Utc},
core::{any::type_name, cmp::Ordering, ops::Range},
futures::{FutureExt, TryFutureExt},
serde::{Deserialize, Serialize},
std::sync::OnceLock,
tokio::sync::{broadcast, mpsc::UnboundedSender, watch},
};
pub type VecWriter<T> = Vec<T, WRITER>;
pub type VecReader<T> = Vec<T, READER>;
pub struct Vec<T: Value, const IS_WRITER: bool = WRITER> {
when: When,
group: Group<VecStateMachine<T>>,
data: watch::Receiver<im::Vector<T>>,
}
impl<T: Value, const IS_WRITER: bool> Vec<T, IS_WRITER> {
pub fn reader_with_config(
network: &Network,
store_id: impl Into<StoreId>,
config: impl Into<CollectionConfig>,
) -> Self {
Vec::<T, READER>::create(network, store_id, config.into())
}
pub fn reader(network: &Network, store_id: impl Into<StoreId>) -> Self {
Self::reader_with_config(network, store_id, CollectionConfig::default())
}
pub fn len(&self) -> usize {
self.data.borrow().len()
}
pub fn is_empty(&self) -> bool {
self.data.borrow().is_empty()
}
pub fn back(&self) -> Option<T> {
self.data.borrow().back().cloned()
}
pub fn last(&self) -> Option<T> {
self.data.borrow().last().cloned()
}
pub fn front(&self) -> Option<T> {
self.data.borrow().front().cloned()
}
pub fn head(&self) -> Option<T> {
self.data.borrow().head().cloned()
}
pub fn get(&self, index: u64) -> Option<T> {
self.data.borrow().get(index as usize).cloned()
}
pub fn iter(&self) -> impl Iterator<Item = T> {
let iter_clone = self.data.borrow().clone();
iter_clone.into_iter()
}
pub const fn when(&self) -> &When {
&self.when
}
pub fn version(&self) -> Version {
Version(self.group.committed())
}
pub fn group_id(&self) -> &GroupId {
self.group.id()
}
}
impl<T: Value + PartialEq, const IS_WRITER: bool> Vec<T, IS_WRITER> {
pub fn contains(&self, value: &T) -> bool {
self.data.borrow().clone().contains(value)
}
pub fn index_of(&self, value: &T) -> Option<u64> {
self.data.borrow().clone().index_of(value).map(|i| i as u64)
}
}
impl<T: Value> VecWriter<T> {
pub fn writer(network: &Network, store_id: impl Into<StoreId>) -> Self {
Self::writer_with_config(network, store_id, CollectionConfig::default())
}
pub fn writer_with_config(
network: &Network,
store_id: impl Into<StoreId>,
config: impl Into<CollectionConfig>,
) -> Self {
Self::create::<WRITER>(network, store_id, config.into())
}
pub fn new(network: &Network, store_id: impl Into<StoreId>) -> Self {
Self::writer(network, store_id)
}
pub fn new_with_config(
network: &Network,
store_id: impl Into<StoreId>,
config: impl Into<CollectionConfig>,
) -> Self {
Self::writer_with_config(network, store_id, config)
}
pub fn clear(
&self,
) -> impl Future<Output = Result<Version, Error<()>>> + Send + Sync + 'static
{
self.execute(
VecCommand::Clear,
|_| Error::Offline(()),
|_, _| unreachable!(),
)
}
pub fn push_back(
&self,
value: T,
) -> impl Future<Output = Result<Version, Error<T>>> + Send + Sync + 'static
{
let value = Encoded(value);
self.execute(
VecCommand::PushBack { value },
|cmd| match cmd {
VecCommand::PushBack { value } => Error::Offline(value.0),
_ => unreachable!(),
},
|cmd, e| match cmd {
VecCommand::PushBack { value } => Error::Encoding(value.0, e),
_ => unreachable!(),
},
)
}
pub fn push_front(
&self,
value: T,
) -> impl Future<Output = Result<Version, Error<T>>> + Send + Sync + 'static
{
let value = Encoded(value);
self.execute(
VecCommand::PushFront { value },
|cmd| match cmd {
VecCommand::PushFront { value } => Error::Offline(value.0),
_ => unreachable!(),
},
|cmd, e| match cmd {
VecCommand::PushFront { value } => Error::Encoding(value.0, e),
_ => unreachable!(),
},
)
}
pub fn swap(
&self,
i: u64,
j: u64,
) -> impl Future<Output = Result<Version, Error<()>>> + Send + Sync + 'static
{
self.execute(
VecCommand::Swap { i, j },
|_| Error::Offline(()),
|_, _| unreachable!(),
)
}
pub fn insert(
&self,
index: u64,
value: T,
) -> impl Future<Output = Result<Version, Error<T>>> + Send + Sync + 'static
{
let value = Encoded(value);
self.execute(
VecCommand::Insert { index, value },
|cmd| match cmd {
VecCommand::Insert { value, .. } => Error::Offline(value.0),
_ => unreachable!(),
},
|cmd, e| match cmd {
VecCommand::Insert { value, .. } => Error::Encoding(value.0, e),
_ => unreachable!(),
},
)
}
pub fn extend(
&self,
entries: impl IntoIterator<Item = T>,
) -> impl Future<Output = Result<Version, Error<std::vec::Vec<T>>>>
+ Send
+ Sync
+ 'static {
let entries: std::vec::Vec<Encoded<T>> =
entries.into_iter().map(Encoded).collect();
let is_empty = entries.is_empty();
let current_version = self.group.committed();
let fut = self.execute(
VecCommand::Extend { entries },
|cmd| match cmd {
VecCommand::Extend { entries } => {
Error::Offline(entries.into_iter().map(|e| e.0).collect())
}
_ => unreachable!(),
},
|cmd, e| match cmd {
VecCommand::Extend { entries } => {
Error::Encoding(entries.into_iter().map(|e| e.0).collect(), e)
}
_ => unreachable!(),
},
);
async move {
if is_empty {
Ok(Version(current_version))
} else {
fut.await
}
}
}
pub fn pop_back(
&self,
) -> impl Future<Output = Result<Version, Error<()>>> + Send + Sync + 'static
{
self.execute(
VecCommand::PopBack,
|_| Error::Offline(()),
|_, _| unreachable!(),
)
}
pub fn pop_front(
&self,
) -> impl Future<Output = Result<Version, Error<()>>> + Send + Sync + 'static
{
self.execute(
VecCommand::PopFront,
|_| Error::Offline(()),
|_, _| unreachable!(),
)
}
pub fn remove(
&self,
index: u64,
) -> impl Future<Output = Result<Version, Error<u64>>> + Send + Sync + 'static
{
self.execute(
VecCommand::Remove { index },
|cmd| match cmd {
VecCommand::Remove { index } => Error::Offline(index),
_ => unreachable!(),
},
|_, _| unreachable!(),
)
}
pub fn truncate(
&self,
len: usize,
) -> impl Future<Output = Result<Version, Error<()>>> + Send + Sync + 'static
{
let len = len as u64;
self.execute(
VecCommand::Truncate { len },
|_| Error::Offline(()),
|_, _| unreachable!(),
)
}
}
impl<T: Value, const IS_WRITER: bool> Vec<T, IS_WRITER> {
fn create<const W: bool>(
network: &Network,
store_id: impl Into<StoreId>,
config: CollectionConfig,
) -> Vec<T, W> {
let store_id = store_id.into();
let machine = VecStateMachine::new(
store_id, W,
config.sync,
network.local().id(),
);
let data = machine.data();
let mut builder = network
.groups()
.with_key(store_id)
.with_state_machine(machine);
for validator in config.auth {
builder = builder.require_ticket(validator);
}
let group = builder.join();
let when = When::new(group.when().clone());
Vec::<T, W> { when, group, data }
}
}
impl<T: Value, const WRITER: bool> CollectionFromDef for Vec<T, WRITER> {
type Reader = VecReader<T>;
type Writer = VecWriter<T>;
fn reader_with_config(
network: &Network,
store_id: StoreId,
config: CollectionConfig,
) -> Self::Reader {
Self::Reader::reader_with_config(network, store_id, config)
}
fn writer_with_config(
network: &Network,
store_id: StoreId,
config: CollectionConfig,
) -> Self::Writer {
Self::Writer::writer_with_config(network, store_id, config)
}
}
impl<T: Value> Vec<T, WRITER> {
fn execute<TErr>(
&self,
command: VecCommand<T>,
offline_err: impl FnOnce(VecCommand<T>) -> Error<TErr> + Send + Sync + 'static,
encoding_err: impl FnOnce(VecCommand<T>, EncodeError) -> Error<TErr>
+ Send
+ Sync
+ 'static,
) -> impl Future<Output = Result<Version, Error<TErr>>> + Send + Sync + 'static
{
self
.group
.execute(command)
.map_err(|e| match e {
CommandError::Offline(mut items) => {
let command = items.remove(0);
offline_err(command)
}
CommandError::Encoding(mut items, e) => {
let command = items.remove(0);
encoding_err(command, e)
}
CommandError::GroupTerminated => Error::NetworkDown,
CommandError::NoCommands => unreachable!(),
})
.map(|position| position.map(Version))
}
}
struct VecStateMachine<T: Value> {
data: im::Vector<T>,
latest: watch::Sender<im::Vector<T>>,
store_id: StoreId,
is_writer: bool,
state_sync: SnapshotSync<Self>,
local_id: PeerId,
metrics_labels: OnceLock<[(&'static str, String); 2]>,
}
impl<T: Value> VecStateMachine<T> {
pub fn new(
store_id: StoreId,
is_writer: bool,
sync_config: SyncConfig,
local_id: PeerId,
) -> Self {
let data = im::Vector::new();
let state_sync = SnapshotSync::new(sync_config, |request| {
VecCommand::TakeSnapshot(request)
});
let latest = watch::Sender::new(data.clone());
Self {
data,
latest,
store_id,
is_writer,
state_sync,
local_id,
metrics_labels: OnceLock::new(),
}
}
pub fn data(&self) -> watch::Receiver<im::Vector<T>> {
self.latest.subscribe()
}
}
impl<T: Value> StateMachine for VecStateMachine<T> {
type Command = VecCommand<T>;
type Query = ();
type QueryResult = ();
type StateSync = SnapshotSync<Self>;
fn apply(&mut self, command: Self::Command, ctx: &dyn ApplyContext) {
self.apply_batch([command], ctx);
}
fn apply_batch(
&mut self,
commands: impl IntoIterator<Item = Self::Command>,
ctx: &dyn ApplyContext,
) {
let mut commands_len = 0usize;
let mut sync_requests = vec![];
for command in commands {
match command {
VecCommand::Clear => {
self.data.clear();
}
VecCommand::Swap { i, j } => {
if i < self.data.len() as u64 && j < self.data.len() as u64 {
self.data.swap(i as usize, j as usize);
}
}
VecCommand::Insert { index, value } => {
self.data.insert(index as usize, value.0);
}
VecCommand::PushBack { value } => {
self.data.push_back(value.0);
}
VecCommand::PushFront { value } => {
self.data.push_front(value.0);
}
VecCommand::PopBack => {
self.data.pop_back();
}
VecCommand::PopFront => {
self.data.pop_front();
}
VecCommand::Remove { index } => {
if index < self.data.len() as u64 {
self.data.remove(index as usize);
}
}
VecCommand::Truncate { len } => {
let len = len as usize;
let len = len.min(self.data.len());
self.data.truncate(len);
}
VecCommand::Extend { entries } => {
self.data.extend(entries.into_iter().map(|e| e.0));
}
VecCommand::TakeSnapshot(request) => {
if request.requested_by != self.local_id
&& !self.state_sync.is_expired(&request)
{
sync_requests.push(request);
}
}
}
commands_len += 1;
}
self.latest.send_replace(self.data.clone());
let labels = self.metrics_labels.get_or_init(|| {
[
("network", ctx.network_id().short().to_string()),
("group", ctx.group_id().short().to_string()),
]
});
metrics::gauge!("mosaik.collections.vec.size", labels.as_slice())
.set(self.data.len() as f64);
if !sync_requests.is_empty() {
let snapshot = self.create_snapshot();
let position = Cursor::new(
ctx.current_term(),
ctx.committed().index() + commands_len as u64,
);
metrics::counter!("mosaik.collections.syncs.started", labels.as_slice())
.increment(sync_requests.len() as u64);
for request in sync_requests {
self
.state_sync
.serve_snapshot(request, position, snapshot.clone());
}
}
}
fn signature(&self) -> crate::UniqueId {
UniqueId::from("mosaik_collections_vec")
.derive(self.store_id)
.derive(type_name::<T>())
}
fn query(&self, (): Self::Query) {}
fn state_sync(&self) -> Self::StateSync {
self.state_sync.clone()
}
fn leadership_preference(&self) -> LeadershipPreference {
if self.is_writer {
LeadershipPreference::Normal
} else {
LeadershipPreference::Observer
}
}
}
impl<T: Value> SnapshotStateMachine for VecStateMachine<T> {
type Snapshot = VecSnapshot<T>;
fn create_snapshot(&self) -> Self::Snapshot {
VecSnapshot {
data: self.data.clone(),
}
}
fn install_snapshot(&mut self, snapshot: Self::Snapshot) {
self.data = snapshot.data;
self.latest.send_replace(self.data.clone());
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(bound = "T: Value")]
enum VecCommand<T> {
Clear,
Swap { i: u64, j: u64 },
Insert { index: u64, value: Encoded<T> },
PushBack { value: Encoded<T> },
PushFront { value: Encoded<T> },
PopBack,
PopFront,
Remove { index: u64 },
Truncate { len: u64 },
Extend { entries: std::vec::Vec<Encoded<T>> },
TakeSnapshot(SnapshotRequest),
}
#[derive(Debug, Clone)]
pub struct VecSnapshot<T: Value> {
data: im::Vector<T>,
}
impl<T: Value> Snapshot for VecSnapshot<T> {
type Item = Encoded<T>;
fn len(&self) -> u64 {
self.data.len() as u64
}
fn iter_range(
&self,
range: Range<u64>,
) -> Option<impl Iterator<Item = Self::Item>> {
let skip = range.start as usize;
let take = (range.end - range.start) as usize;
if skip + take > self.data.len() {
return None;
}
Some(self.data.skip(skip).take(take).into_iter().map(Encoded))
}
fn append(&mut self, items: impl IntoIterator<Item = Self::Item>) {
self.data.extend(items.into_iter().map(|e| e.0));
}
}
impl<T: Value> Default for VecSnapshot<T> {
fn default() -> Self {
Self {
data: im::Vector::new(),
}
}
}