use std::fmt;
use bytes::Bytes;
use futures::future::{self, TryFutureExt};
use futures::stream::{FuturesUnordered, StreamExt};
use log::*;
use safecast::{TryCastFrom, TryCastInto};
use tc_error::*;
use tc_state::object::InstanceClass;
use tc_state::State;
use tc_transact::public::{
DeleteHandler, GetHandler, Handler, PostHandler, Public, PutHandler, Route,
};
use tc_transact::{RPCClient, Transact, Transaction};
use tc_value::{Host, Link, Value};
use tcgeneric::{label, PathSegment, TCPath, TCPathBuf, Tuple};
use crate::cluster::{Cluster, Replica, REPLICAS};
use crate::txn::Txn;
mod class;
mod dir;
mod library;
mod service;
pub struct ClusterHandler<'a, T> {
cluster: &'a Cluster<T>,
}
impl<'a, T> Handler<'a, State> for ClusterHandler<'a, T>
where
T: Transact + Public<State> + Send + Sync,
{
fn get<'b>(self: Box<Self>) -> Option<GetHandler<'a, 'b, Txn, State>>
where
'b: 'a,
{
Some(Box::new(move |_txn, key| {
Box::pin(future::ready((|key: Value| {
trace!("GET key {} from {:?}", key, self.cluster);
if key.is_none() {
let public_key = Bytes::from(self.cluster.public_key().to_vec());
return Ok(Value::from(public_key).into());
}
let key = TCPathBuf::try_cast_from(key, |v| {
TCError::unexpected(v, "a key specification")
})?;
if key == TCPathBuf::default() {
let public_key = Bytes::from(self.cluster.schema().public_key().to_vec());
Ok(Value::from(public_key).into())
} else {
Err(TCError::not_found(key))
}
})(key)))
}))
}
fn put<'b>(self: Box<Self>) -> Option<PutHandler<'a, 'b, Txn, State>>
where
'b: 'a,
{
Some(Box::new(|txn, key, value| {
Box::pin(async move {
if key.is_none() {
let participant =
value.try_cast_into(|v| TCError::unexpected(v, "a participant Link"))?;
return self.cluster.mutate(&txn, participant).await;
}
let value = if InstanceClass::can_cast_from(&value) {
let class =
InstanceClass::try_cast_from(value, |v| TCError::unexpected(v, "a Class"))?;
let link = class.extends();
if link.host().is_some() && link.host() != self.cluster.schema().lead() {
return Err(not_implemented!(
"install a Cluster with a different lead replica",
));
}
if !link.path().starts_with(self.cluster.path()) {
return Err(bad_request!(
"cannot install {} at {}",
link,
self.cluster.link().path()
));
}
State::Object(class.into())
} else {
value
};
self.cluster.state().put(&txn, &[], key.into(), value).await
})
}))
}
fn post<'b>(self: Box<Self>) -> Option<PostHandler<'a, 'b, Txn, State>>
where
'b: 'a,
{
Some(Box::new(|txn, params| {
Box::pin(async move {
if !params.is_empty() {
return Err(bad_request!("unrecognized commit parameters {:?}", params));
}
if let Some(owner) = txn.owner() {
if owner.host() == Some(txn.host()) && owner.path() == self.cluster.path() {
return Err(bad_request!(
"{:?} received a commit message for itself",
self.cluster,
));
}
} else {
#[cfg(debug_assertions)]
panic!(
"commit message for an ownerless transaction {} (token is {:?})",
txn.id(),
txn.request().token(),
);
#[cfg(not(debug_assertions))]
return Err(bad_request!(
"commit message for an ownerless transaction {} (token is {:?})",
txn.id(),
txn.request().token(),
));
}
#[cfg(debug_assertions)]
info!("{:?} got commit message for {}", self.cluster, txn.id());
let result = if !txn.has_leader(self.cluster.path()) {
info!(
"{:?} will lead and distribute the commit of {}...",
self.cluster,
txn.id()
);
self.cluster.lead_and_distribute_commit(txn.clone()).await
} else if txn.is_leader(self.cluster.path()) {
info!(
"{:?} will distribute the commit of {}...",
self.cluster,
txn.id()
);
self.cluster.distribute_commit(txn).await
} else {
info!("{:?} will commit {}...", self.cluster, txn.id());
self.cluster.commit(*txn.id()).await;
Ok(())
};
if result.is_ok() {
info!("{:?} commit {} succeeded", self.cluster, txn.id());
} else {
info!("{:?} commit {} failed", self.cluster, txn.id());
}
result.map(State::from)
})
}))
}
fn delete<'b>(self: Box<Self>) -> Option<DeleteHandler<'a, 'b, Txn>>
where
'b: 'a,
{
Some(Box::new(|txn, key| {
Box::pin(async move {
key.expect_none()?;
if txn.is_leader(self.cluster.path()) {
self.cluster.distribute_rollback(txn).await;
} else {
self.cluster.rollback(txn.id()).await;
}
Ok(())
})
}))
}
}
impl<'a, T> From<&'a Cluster<T>> for ClusterHandler<'a, T> {
fn from(cluster: &'a Cluster<T>) -> Self {
Self { cluster }
}
}
struct ReplicaHandler<'a, T> {
cluster: &'a Cluster<T>,
}
impl<'a, T> Handler<'a, State> for ReplicaHandler<'a, T>
where
T: Replica + Send + Sync,
{
fn get<'b>(self: Box<Self>) -> Option<GetHandler<'a, 'b, Txn, State>>
where
'b: 'a,
{
Some(Box::new(|txn, key| {
Box::pin(async move {
key.expect_none()?;
self.cluster
.replicas(*txn.id())
.map_ok(|replicas| Value::Tuple(replicas.iter().cloned().collect()))
.map_ok(State::from)
.await
})
}))
}
fn put<'b>(self: Box<Self>) -> Option<PutHandler<'a, 'b, Txn, State>>
where
'b: 'a,
{
Some(Box::new(|txn, key, link| {
Box::pin(async move {
key.expect_none()?;
let link = link.try_cast_into(|v| TCError::unexpected(v, "a Link to a Cluster"))?;
self.cluster.add_replica(txn, link).await?;
Ok(())
})
}))
}
fn post<'b>(self: Box<Self>) -> Option<PostHandler<'a, 'b, Txn, State>>
where
'b: 'a,
{
Some(Box::new(|txn, mut params| {
Box::pin(async move {
let new_replica = params.require::<Host>(&label("add").into())?;
params.expect_empty()?;
let txn_id = *txn.id();
if self.cluster.add_replica(txn, new_replica.clone()).await? {
let replicas = self.cluster.replicas(txn_id).await?;
let mut requests = replicas
.iter()
.filter(|replica| *replica != txn.host() && *replica != &new_replica)
.map(|replica| {
txn.put(
self.cluster.schema().link_to(replica).append(REPLICAS),
Value::default(),
new_replica.clone(),
)
})
.collect::<FuturesUnordered<_>>();
while let Some(result) = requests.next().await {
if let Err(cause) = result {
warn!("failed to propagate add replica request: {}", cause);
}
}
}
let state = self.cluster.state().state(txn_id).await?;
debug!("state of source replica {:?} is {:?}", self.cluster, state);
Ok(state)
})
}))
}
fn delete<'b>(self: Box<Self>) -> Option<DeleteHandler<'a, 'b, Txn>>
where
'b: 'a,
{
Some(Box::new(|txn, replicas| {
Box::pin(async move {
let replicas = Tuple::<Host>::try_cast_from(replicas, |v| {
TCError::unexpected(v, "a Link to a Cluster")
})?;
self.cluster.remove_replicas(*txn.id(), &replicas).await
})
}))
}
}
impl<'a, T> From<&'a Cluster<T>> for ReplicaHandler<'a, T> {
fn from(cluster: &'a Cluster<T>) -> Self {
Self { cluster }
}
}
impl<T> Route<State> for Cluster<T>
where
T: Replica + Route<State> + Transact + fmt::Debug + Send + Sync,
{
fn route<'a>(&'a self, path: &'a [PathSegment]) -> Option<Box<dyn Handler<'a, State> + 'a>> {
trace!("Cluster::route {}", TCPath::from(path));
match path {
path if path.is_empty() => Some(Box::new(ClusterHandler::from(self))),
path if path == &[REPLICAS] => Some(Box::new(ReplicaHandler::from(self))),
path => self.state().route(path),
}
}
}
fn authorize_install(txn: &Txn, parent: &Link, entry_path: &TCPathBuf) -> TCResult<()> {
debug!(
"check authorization to install {} at {}",
entry_path, parent
);
let replicate_from_this_host = if let Some(lead) = parent.host() {
lead == txn.host()
} else {
true
};
if replicate_from_this_host {
let parent = if parent.host().is_some() {
parent.clone()
} else {
(txn.host().clone(), parent.path().clone()).into()
};
let authorized = txn
.request()
.token()
.get_claim(&parent, &TCPathBuf::default().into())
.ok_or_else(|| unauthorized!("install request for {}", parent))?;
if authorized.iter().any(|scope| entry_path.starts_with(scope)) {
Ok(())
} else {
Err(forbidden!("install a new Cluster at {}", entry_path))
}
} else {
Ok(())
}
}