1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
//! A [`super::Chain`] which keeps only the data needed to recover the state of its subject in the
//! event of a transaction failure.
//! INCOMPLETE AND UNSTABLE.

use std::convert::TryInto;

use async_trait::async_trait;
use bytes::Bytes;
use futures::join;
use log::debug;

use tc_error::*;
use tc_transact::fs::{Dir, File, Persist};
use tc_transact::{Transact, TxnId};
use tcgeneric::Instance;

use crate::fs;
use crate::scalar::OpRef;

use super::{ChainBlock, ChainInstance, ChainType, Schema, Subject, CHAIN, SUBJECT};

/// A [`super::Chain`] which keeps only the data needed to recover the state of its subject in the
/// event of a transaction failure.
#[derive(Clone)]
pub struct SyncChain {
    schema: Schema,
    subject: Subject,
    file: fs::File<ChainBlock>,
}

#[async_trait]
impl ChainInstance for SyncChain {
    async fn append(&self, txn_id: &TxnId, op_ref: OpRef) -> TCResult<()> {
        let block_id = SUBJECT.into();
        let mut block = fs::File::get_block_mut(&self.file, txn_id, &block_id).await?;
        block.append(op_ref);
        Ok(())
    }

    fn subject(&self) -> &Subject {
        &self.subject
    }
}

#[async_trait]
impl Persist for SyncChain {
    type Schema = Schema;
    type Store = fs::Dir;

    fn schema(&self) -> &'_ Schema {
        &self.schema
    }

    async fn load(schema: Self::Schema, dir: fs::Dir, txn_id: TxnId) -> TCResult<Self> {
        let subject = match &schema {
            Schema::Value(value) => {
                let file: fs::File<Bytes> =
                    if let Some(file) = dir.get_file(&txn_id, &SUBJECT.into()).await? {
                        file.try_into()?
                    } else {
                        let file = dir
                            .create_file(txn_id, SUBJECT.into(), value.class().into())
                            .await?;

                        file.try_into()?
                    };

                if !file.block_exists(&txn_id, &SUBJECT.into()).await? {
                    let as_bytes = serde_json::to_vec(value)
                        .map_err(|e| TCError::bad_request("unable to serialize value", e))?;

                    file.create_block(txn_id, SUBJECT.into(), Bytes::from(as_bytes))
                        .await?;

                    debug!("sync chain wrote new subject");
                } else {
                    debug!("sync chain found existing subject");
                }

                Subject::Value(file)
            }
        };

        let file = if let Some(file) = dir.get_file(&txn_id, &CHAIN.into()).await? {
            file.try_into()?
        } else {
            let file = dir
                .create_file(txn_id, CHAIN.into(), ChainType::Sync.into())
                .await?;
            file.try_into()?
        };

        Ok(SyncChain {
            schema,
            subject,
            file,
        })
    }
}

#[async_trait]
impl Transact for SyncChain {
    async fn commit(&self, txn_id: &TxnId) {
        join!(self.subject.commit(txn_id), self.file.commit(txn_id));
    }

    async fn finalize(&self, txn_id: &TxnId) {
        join!(self.subject.finalize(txn_id), self.file.finalize(txn_id));
    }
}