use nodedb_types::TenantId;
use nodedb_types::id::VShardId;
use serde::{Deserialize, Serialize};
use crate::error::CalvinError;
use super::primitives::{DependentReadSpec, EngineKeySet};
#[derive(
Debug,
Clone,
PartialEq,
Eq,
Serialize,
Deserialize,
zerompk::ToMessagePack,
zerompk::FromMessagePack,
)]
pub struct ReadWriteSet(pub Vec<EngineKeySet>);
impl ReadWriteSet {
pub fn new(sets: Vec<EngineKeySet>) -> Self {
Self(sets)
}
pub fn is_empty(&self) -> bool {
self.0.iter().all(|s| s.is_empty())
}
pub fn participating_vshards(&self) -> Vec<VShardId> {
let mut seen = std::collections::HashSet::new();
let mut result = Vec::new();
for engine_set in &self.0 {
let vshard = VShardId::from_collection(engine_set.collection());
if seen.insert(vshard.as_u32()) {
result.push(vshard);
}
}
result.sort_by_key(|v| v.as_u32());
result
}
}
#[derive(
Debug,
Clone,
PartialEq,
Eq,
Serialize,
Deserialize,
zerompk::ToMessagePack,
zerompk::FromMessagePack,
)]
pub struct TxClass {
pub read_set: ReadWriteSet,
pub write_set: ReadWriteSet,
pub plans: Vec<u8>,
pub tenant_id: TenantId,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub dependent_reads: Option<DependentReadSpec>,
#[serde(skip)]
#[msgpack(ignore)]
participating_vshards: Vec<VShardId>,
}
impl TxClass {
pub fn new(
read_set: ReadWriteSet,
write_set: ReadWriteSet,
plans: Vec<u8>,
tenant_id: TenantId,
dependent_reads: Option<DependentReadSpec>,
) -> Result<Self, CalvinError> {
if write_set.is_empty() {
return Err(CalvinError::EmptyWriteSet);
}
let mut participating_vshards = write_set.participating_vshards();
if participating_vshards.len() < 2 {
let vshard = participating_vshards
.first()
.map(|v| v.as_u32())
.unwrap_or(0);
return Err(CalvinError::SingleVshardTxn { vshard });
}
if let Some(ref spec) = dependent_reads {
for &passive_vshard in spec.passive_reads.keys() {
let v = VShardId::new(passive_vshard);
if !participating_vshards
.iter()
.any(|e| e.as_u32() == passive_vshard)
{
participating_vshards.push(v);
}
}
participating_vshards.sort_by_key(|v| v.as_u32());
}
Ok(Self {
read_set,
write_set,
plans,
tenant_id,
dependent_reads,
participating_vshards,
})
}
pub fn new_dependent(
read_set: ReadWriteSet,
write_set: ReadWriteSet,
plans: Vec<u8>,
tenant_id: TenantId,
dependent_reads: DependentReadSpec,
) -> Result<Self, CalvinError> {
Self::new(read_set, write_set, plans, tenant_id, Some(dependent_reads))
}
pub fn participating_vshards(&self) -> &[VShardId] {
&self.participating_vshards
}
pub fn restore_derived(&mut self) {
let mut vshards = self.write_set.participating_vshards();
if let Some(ref spec) = self.dependent_reads {
for &passive_vshard in spec.passive_reads.keys() {
if !vshards.iter().any(|e| e.as_u32() == passive_vshard) {
vshards.push(VShardId::new(passive_vshard));
}
}
vshards.sort_by_key(|v| v.as_u32());
}
self.participating_vshards = vshards;
}
}