use std::collections::{HashMap, HashSet};
use std::fmt;
use std::hash::{Hash, Hasher};
use std::iter::FromIterator;
use std::ops::Deref;
use std::sync::Arc;
use async_trait::async_trait;
use futures::future::{join_all, try_join_all};
use futures::{join, StreamExt};
use log::{debug, info, warn};
use safecast::TryCastFrom;
use uplock::RwLock;
use tc_error::*;
use tc_transact::lock::{Mutable, TxnLock};
use tc_transact::{Transact, Transaction};
use tcgeneric::*;
use crate::chain::{Chain, ChainInstance};
use crate::object::InstanceClass;
use crate::scalar::{Link, OpDef, Value};
use crate::state::State;
use crate::txn::{Actor, Scope, Txn, TxnId};
mod load;
mod owner;
use owner::Owner;
use futures::stream::FuturesUnordered;
pub use load::instantiate;
pub const REPLICAS: Label = label("replicas");
pub struct ClusterType;
impl Class for ClusterType {
type Instance = Cluster;
}
impl fmt::Display for ClusterType {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.write_str("Cluster")
}
}
pub struct Cluster {
link: Link,
actor: Arc<Actor>,
chains: Map<Chain>,
classes: Map<InstanceClass>,
confirmed: RwLock<TxnId>,
owned: RwLock<HashMap<TxnId, Owner>>,
installed: TxnLock<Mutable<HashMap<Link, HashSet<Scope>>>>,
replicas: TxnLock<Mutable<HashSet<Link>>>,
}
impl Cluster {
pub fn chain(&self, name: &Id) -> Option<&Chain> {
self.chains.get(name)
}
pub fn class(&self, name: &Id) -> Option<&InstanceClass> {
self.classes.get(name)
}
pub fn public_key(&self) -> &[u8] {
self.actor.public_key().as_bytes()
}
pub fn link(&self) -> &Link {
&self.link
}
pub fn path(&'_ self) -> &'_ [PathSegment] {
self.link.path()
}
pub async fn replicas(&self, txn_id: &TxnId) -> TCResult<HashSet<Link>> {
let replicas = self.replicas.read(txn_id).await?;
Ok(replicas.deref().clone())
}
pub async fn claim(&self, txn: &Txn) -> TCResult<Txn> {
let last_commit = self.confirmed.read().await;
if txn.id() <= &*last_commit {
return Err(TCError::unsupported(format!(
"cluster at {} cannot claim transaction {} because the last commit is at {}",
self.link,
txn.id(),
*last_commit
)));
}
let mut owned = self.owned.write().await;
if owned.contains_key(txn.id()) {
return Err(TCError::bad_request("received an unclaimed transaction, but there is a record of an owner for this transaction at cluster", self.link.path()));
}
let txn = txn
.clone()
.claim(&self.actor, self.link.path().clone())
.await?;
owned.insert(*txn.id(), Owner::new());
Ok(txn)
}
pub async fn authorize(&self, txn: &Txn, scope: &Scope) -> TCResult<()> {
debug!("authorize scope {}...", scope);
let installed = self.installed.read(txn.id()).await?;
debug!("{} authorized callers installed", installed.len());
for (host, actor_id, scopes) in txn.request().scopes().iter() {
debug!(
"token has scopes {} issued by {}: {}",
Tuple::<Scope>::from_iter(scopes.to_vec()),
host,
actor_id
);
if actor_id.is_none() {
if let Some(authorized) = installed.get(host) {
if authorized.contains(scope) {
if scopes.contains(scope) {
return Ok(());
}
}
}
}
}
Err(TCError::unauthorized(format!(
"no trusted caller authorized the required scope \"{}\"",
scope
)))
}
pub async fn grant(
&self,
txn: Txn,
scope: Scope,
op: OpDef,
context: Map<State>,
) -> TCResult<State> {
debug!("Cluster received grant request for scope {}", scope);
let txn = txn
.grant(&self.actor, self.link.path().clone(), vec![scope])
.await?;
OpDef::call(op.into_def(), txn, context).await
}
pub async fn install(
&self,
txn_id: TxnId,
other: Link,
scopes: HashSet<Scope>,
) -> TCResult<()> {
info!(
"{} will now trust {} to issue scopes [{}]",
self,
other,
scopes
.iter()
.map(|s| s.to_string())
.collect::<Vec<String>>()
.join(", ")
);
let mut installed = self.installed.write(txn_id).await?;
installed.insert(other, scopes);
Ok(())
}
pub async fn add_replica(&self, txn: &Txn, replica: Link) -> TCResult<()> {
let self_link = txn.link(self.link.path().clone());
debug!("cluster at {} adding replica {}...", self_link, replica);
if replica == self_link {
if self.link.host().is_none() || self.link == self_link {
debug!("{} cannot replicate itself", self);
return Ok(());
}
debug!(
"{} replica at {} got add request for self: {}",
self, replica, self_link
);
let replicas = txn
.get(self.link.clone().append(REPLICAS.into()), Value::None)
.await?;
if replicas.is_some() {
let replicas = Tuple::<Link>::try_cast_from(replicas, |s| {
TCError::bad_request("invalid replica set", s)
})?;
debug!("{} has replicas: {}", self, replicas);
let mut replicas: HashSet<Link> = HashSet::from_iter(replicas);
replicas.remove(&self_link);
try_join_all(replicas.iter().map(|replica| {
txn.put(
replica.clone().append(REPLICAS.into()),
Value::None,
self_link.clone().into(),
)
}))
.await?;
(*self.replicas.write(*txn.id()).await?).extend(replicas);
} else {
warn!("{} has no other replicas", self);
}
self.replicate(txn).await?;
} else {
debug!("add replica {}", replica);
(*self.replicas.write(*txn.id()).await?).insert(replica);
}
Ok(())
}
pub async fn remove_replicas(&self, txn: &Txn, to_remove: &[Link]) -> TCResult<()> {
let self_link = txn.link(self.link.path().clone());
let mut replicas = self.replicas.write(*txn.id()).await?;
for replica in to_remove {
if replica == &self_link {
panic!("{} received remove replica request for itself", self);
}
replicas.remove(replica);
}
Ok(())
}
async fn replicate(&self, txn: &Txn) -> TCResult<()> {
let replication = self.chains.iter().map(|(name, chain)| {
let mut path = self.link.path().to_vec();
path.push(name.clone());
chain.replicate(txn, self.link.clone().append(name.clone()))
});
try_join_all(replication).await?;
Ok(())
}
pub async fn mutate(&self, txn: &Txn, participant: Link) -> TCResult<()> {
if participant.path() == self.link.path() {
log::warn!(
"got participant message within Cluster {}",
self.link.path()
);
return Ok(());
}
let owned = self.owned.write().await;
let owner = owned.get(txn.id()).ok_or_else(|| {
TCError::bad_request(
format!(
"{} does not own transaction",
txn.link(self.link.path().clone())
),
txn.id(),
)
})?;
owner.mutate(participant).await;
Ok(())
}
pub async fn distribute_commit(&self, txn: Txn) -> TCResult<()> {
let replicas = self.replicas.read(txn.id()).await?;
if let Some(owner) = self.owned.read().await.get(txn.id()) {
owner.commit(&txn).await?;
}
let self_link = txn.link(self.link.path().clone());
let mut replica_commits = FuturesUnordered::from_iter(
replicas
.iter()
.filter(|replica| *replica != &self_link)
.map(|replica| {
debug!("commit replica {}...", replica);
txn.post(replica.clone(), State::Map(Map::default()))
}),
);
while let Some(result) = replica_commits.next().await {
match result {
Ok(_) => {}
Err(cause) => log::error!("commit failure: {}", cause),
}
}
self.commit(txn.id()).await;
Ok(())
}
pub async fn distribute_rollback(&self, txn: Txn) {
let replicas = self.replicas.read(txn.id()).await;
if let Some(owner) = self.owned.read().await.get(txn.id()) {
if let Err(cause) = owner.rollback(&txn).await {
warn!("failed to rollback transaction: {}", cause);
}
}
if let Ok(replicas) = replicas {
let self_link = txn.link(self.link.path().clone());
join_all(
replicas
.iter()
.filter(|replica| *replica != &self_link)
.map(|replica| txn.delete(replica.clone(), Value::None)),
)
.await;
}
self.finalize(txn.id()).await;
}
}
impl Eq for Cluster {}
impl PartialEq for Cluster {
fn eq(&self, other: &Self) -> bool {
self.path() == other.path()
}
}
impl Hash for Cluster {
fn hash<H: Hasher>(&self, h: &mut H) {
self.path().hash(h)
}
}
impl Instance for Cluster {
type Class = ClusterType;
fn class(&self) -> Self::Class {
ClusterType
}
}
#[async_trait]
impl Transact for Cluster {
async fn commit(&self, txn_id: &TxnId) {
let mut confirmed = self.confirmed.write().await;
{
debug!(
"replicas at commit: {}",
Value::from_iter(self.replicas.read(txn_id).await.unwrap().iter().cloned())
);
}
join_all(self.chains.values().map(|chain| chain.commit(txn_id))).await;
join!(self.installed.commit(txn_id), self.replicas.commit(txn_id));
{
debug!(
"replicas after commit: {}",
Value::from_iter(self.replicas.read(txn_id).await.unwrap().iter().cloned())
);
}
*confirmed = *txn_id;
}
async fn finalize(&self, txn_id: &TxnId) {
join_all(self.chains.values().map(|chain| chain.finalize(txn_id))).await;
self.owned.write().await.remove(txn_id);
join!(
self.installed.finalize(txn_id),
self.replicas.finalize(txn_id)
);
}
}
impl fmt::Display for Cluster {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "Cluster {}", self.link.path())
}
}