use std::convert::{TryFrom, TryInto};
use std::fmt;
use std::ops::Deref;
use async_trait::async_trait;
use destream::{de, en, EncodeMap};
use futures::future::TryFutureExt;
use log::debug;
use safecast::{CastFrom, TryCastFrom, TryCastInto};
use tc_error::*;
use tc_transact::fs::{Dir, File, Persist};
use tc_transact::{IntoView, Transact, Transaction, TxnId};
use tcgeneric::*;
use crate::fs;
use crate::scalar::{Link, Scalar, Value};
use crate::state::{State, StateView};
use crate::txn::Txn;
mod block;
mod data;
mod sync;
use block::BlockSeq;
use crate::fs::FileEntry;
pub use block::BlockChain;
pub use data::ChainBlock;
pub use sync::SyncChain;
const CHAIN: Label = label("chain");
const NULL_HASH: Vec<u8> = vec![];
const PREFIX: PathLabel = path_label(&["state", "chain"]);
const SUBJECT: Label = label("subject");
const ERR_INVALID_SCHEMA: &str = "invalid Chain schema";
pub const EXT: &str = "chain";
#[derive(Clone)]
pub enum Schema {
Value(Value),
}
impl CastFrom<Value> for Schema {
fn cast_from(value: Value) -> Self {
Self::Value(value)
}
}
#[async_trait]
impl de::FromStream for Schema {
type Context = ();
async fn from_stream<D: de::Decoder>(cxt: (), decoder: &mut D) -> Result<Self, D::Error> {
Value::from_stream(cxt, decoder).map_ok(Self::Value).await
}
}
impl<'en> en::IntoStream<'en> for Schema {
fn into_stream<E: en::Encoder<'en>>(self, encoder: E) -> Result<E::Ok, E::Error> {
match self {
Self::Value(value) => value.into_stream(encoder),
}
}
}
#[derive(Clone)]
pub enum Subject {
Value(fs::File<Value>),
}
impl Subject {
pub async fn create(schema: &Schema, dir: &fs::Dir, txn_id: TxnId) -> TCResult<Self> {
match schema {
Schema::Value(value) => {
let file = dir
.create_file(txn_id, SUBJECT.into(), value.class().into())
.await?;
let file = fs::File::<Value>::try_from(file)?;
file.create_block(txn_id, SUBJECT.into(), value.clone())
.await?;
Ok(Self::Value(file))
}
}
}
pub async fn at(&self, txn_id: &TxnId) -> TCResult<State> {
debug!("Subject::at {}", txn_id);
match self {
Self::Value(file) => {
let value = file.read_block(txn_id, &SUBJECT.into()).await?;
Ok(value.deref().clone().into())
}
}
}
pub async fn put(
&self,
txn_id: TxnId,
path: TCPathBuf,
key: Value,
value: State,
) -> TCResult<()> {
match self {
Self::Value(file) => {
const ERR_NO_SUCH: &str = "Value has no such property";
if !path.is_empty() {
return Err(TCError::bad_request(ERR_NO_SUCH, path));
}
if key.is_some() {
return Err(TCError::bad_request(ERR_NO_SUCH, key));
}
let new_value = Value::try_cast_from(value, |v| {
TCError::bad_request("cannot update a Value to", v)
})?;
let mut block = file.write_block(txn_id, SUBJECT.into()).await?;
debug!(
"set new Value of chain subject to {} at {}",
new_value, txn_id
);
*block = new_value;
Ok(())
}
}
}
async fn load(schema: &Schema, dir: &fs::Dir, txn_id: TxnId) -> TCResult<Self> {
if let Some(file) = dir.get_file(&txn_id, &SUBJECT.into()).await? {
match schema {
Schema::Value(_) => file.try_into().map(Self::Value),
}
} else {
Self::create(schema, dir, txn_id).await
}
}
}
#[async_trait]
impl Transact for Subject {
async fn commit(&self, txn_id: &TxnId) {
debug!(
"commit subject with value {} at {}",
self.at(txn_id).await.unwrap(),
txn_id
);
match self {
Self::Value(file) => file.commit(txn_id).await,
}
}
async fn finalize(&self, txn_id: &TxnId) {
match self {
Self::Value(file) => file.finalize(txn_id).await,
}
}
}
#[async_trait]
impl de::FromStream for Subject {
type Context = Txn;
async fn from_stream<D: de::Decoder>(txn: Txn, decoder: &mut D) -> Result<Self, D::Error> {
let value = Value::from_stream((), decoder).await?;
let file: FileEntry = txn
.context()
.create_file(*txn.id(), SUBJECT.into(), value.class().into())
.map_err(de::Error::custom)
.await?;
let file: fs::File<Value> = file.try_into().map_err(de::Error::custom)?;
file.create_block(*txn.id(), SUBJECT.into(), value)
.map_err(de::Error::custom)
.await?;
Ok(Self::Value(file))
}
}
#[async_trait]
pub trait ChainInstance {
async fn append(
&self,
txn_id: TxnId,
path: TCPathBuf,
key: Value,
value: Scalar,
) -> TCResult<()>;
async fn last_commit(&self, txn_id: &TxnId) -> TCResult<Option<TxnId>>;
fn subject(&self) -> &Subject;
async fn replicate(&self, txn: &Txn, source: Link) -> TCResult<()>;
}
#[derive(Clone, Copy, Eq, PartialEq)]
pub enum ChainType {
Block,
Sync,
}
impl Class for ChainType {
type Instance = Chain;
}
impl NativeClass for ChainType {
fn from_path(path: &[PathSegment]) -> Option<Self> {
if path.len() == 3 && &path[0..2] == &PREFIX[..] {
match path[2].as_str() {
"block" => Some(Self::Block),
"sync" => Some(Self::Sync),
_ => None,
}
} else {
None
}
}
fn path(&self) -> TCPathBuf {
let suffix = match self {
Self::Block => "block",
Self::Sync => "sync",
};
TCPathBuf::from(PREFIX).append(label(suffix))
}
}
impl fmt::Display for ChainType {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.write_str(match self {
Self::Block => "type BlockChain",
Self::Sync => "type SyncChain",
})
}
}
#[derive(Clone)]
pub enum Chain {
Block(block::BlockChain),
Sync(sync::SyncChain),
}
impl Instance for Chain {
type Class = ChainType;
fn class(&self) -> Self::Class {
match self {
Self::Block(_) => ChainType::Block,
Self::Sync(_) => ChainType::Sync,
}
}
}
#[async_trait]
impl ChainInstance for Chain {
async fn append(
&self,
txn_id: TxnId,
path: TCPathBuf,
key: Value,
value: Scalar,
) -> TCResult<()> {
match self {
Self::Block(chain) => chain.append(txn_id, path, key, value).await,
Self::Sync(chain) => chain.append(txn_id, path, key, value).await,
}
}
async fn last_commit(&self, txn_id: &TxnId) -> TCResult<Option<TxnId>> {
match self {
Self::Block(chain) => chain.last_commit(txn_id).await,
Self::Sync(chain) => chain.last_commit(txn_id).await,
}
}
fn subject(&self) -> &Subject {
match self {
Self::Block(chain) => chain.subject(),
Self::Sync(chain) => chain.subject(),
}
}
async fn replicate(&self, txn: &Txn, source: Link) -> TCResult<()> {
match self {
Self::Block(chain) => chain.replicate(txn, source).await,
Self::Sync(chain) => chain.replicate(txn, source).await,
}
}
}
#[async_trait]
impl Transact for Chain {
async fn commit(&self, txn_id: &TxnId) {
match self {
Self::Block(chain) => chain.commit(txn_id).await,
Self::Sync(chain) => chain.commit(txn_id).await,
}
}
async fn finalize(&self, txn_id: &TxnId) {
match self {
Self::Block(chain) => chain.finalize(txn_id).await,
Self::Sync(chain) => chain.finalize(txn_id).await,
}
}
}
#[async_trait]
impl de::FromStream for Chain {
type Context = Txn;
async fn from_stream<D: de::Decoder>(txn: Txn, decoder: &mut D) -> Result<Self, D::Error> {
decoder.decode_map(ChainVisitor::new(txn)).await
}
}
#[async_trait]
impl<'en> IntoView<'en, fs::Dir> for Chain {
type Txn = Txn;
type View = ChainView;
async fn into_view(self, txn: Self::Txn) -> TCResult<Self::View> {
let class = self.class();
let data = match self {
Self::Block(chain) => chain.into_view(txn).map_ok(ChainViewData::Block).await,
Self::Sync(chain) => {
chain
.into_view(txn)
.map_ok(Box::new)
.map_ok(ChainViewData::Sync)
.await
}
}?;
Ok(ChainView { class, data })
}
}
impl fmt::Display for Chain {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "instance of {}", self.class())
}
}
pub enum ChainViewData {
Block((Schema, BlockSeq)),
Sync(Box<(Schema, StateView)>),
}
pub struct ChainView {
class: ChainType,
data: ChainViewData,
}
impl<'en> en::IntoStream<'en> for ChainView {
fn into_stream<E: en::Encoder<'en>>(self, encoder: E) -> Result<E::Ok, E::Error> {
let mut map = encoder.encode_map(Some(1))?;
map.encode_key(self.class.path().to_string())?;
match self.data {
ChainViewData::Block(view) => map.encode_value(view),
ChainViewData::Sync(view) => map.encode_value(view),
}?;
map.end()
}
}
pub async fn load(class: ChainType, schema: Value, dir: fs::Dir, txn_id: TxnId) -> TCResult<Chain> {
let schema = schema.try_cast_into(|v| TCError::bad_request(ERR_INVALID_SCHEMA, v))?;
match class {
ChainType::Block => {
BlockChain::load(schema, dir, txn_id)
.map_ok(Chain::Block)
.await
}
ChainType::Sync => {
SyncChain::load(schema, dir, txn_id)
.map_ok(Chain::Sync)
.await
}
}
}
pub struct ChainVisitor {
txn: Txn,
}
impl ChainVisitor {
pub fn new(txn: Txn) -> Self {
Self { txn }
}
pub async fn visit_map_value<A: de::MapAccess>(
self,
class: ChainType,
access: &mut A,
) -> Result<Chain, A::Error> {
match class {
ChainType::Block => {
access
.next_value(self.txn)
.map_ok(Chain::Block)
.map_err(|e| de::Error::custom(format!("invalid BlockChain stream: {}", e)))
.await
}
ChainType::Sync => access.next_value(self.txn).map_ok(Chain::Sync).await,
}
}
}
#[async_trait]
impl de::Visitor for ChainVisitor {
type Value = Chain;
fn expecting() -> &'static str {
"a Chain"
}
async fn visit_map<A: de::MapAccess>(self, mut map: A) -> Result<Self::Value, A::Error> {
let class = if let Some(path) = map.next_key::<TCPathBuf>(()).await? {
ChainType::from_path(&path)
.ok_or_else(|| de::Error::invalid_value(path, "a Chain class"))?
} else {
return Err(de::Error::custom("expected a Chain class"));
};
self.visit_map_value(class, &mut map).await
}
}