use std::path::PathBuf;
use std::sync::Arc;
use futures::future::TryFutureExt;
pub use tc_error::*;
pub mod cluster;
pub mod gateway;
pub mod kernel;
pub mod txn {
pub type Hypothetical = tc_fs::hypothetical::Hypothetical<tc_state::State>;
pub type Txn = tc_fs::Txn<tc_state::State>;
}
mod http;
mod public;
pub const MIN_CACHE_SIZE: usize = 5000;
type TokioError = Box<dyn std::error::Error + Send + Sync + 'static>;
type UserSpace = (kernel::Class, kernel::Library, kernel::Service);
pub struct Builder {
cache: Arc<freqfs::Cache<tc_fs::CacheBlock>>,
data_dir: PathBuf,
gateway: Option<gateway::Config>,
lead: Option<tc_value::Host>,
public_key: Option<bytes::Bytes>,
workspace: freqfs::DirLock<tc_fs::CacheBlock>,
}
impl Builder {
pub async fn load(cache_size: usize, data_dir: PathBuf, workspace: PathBuf) -> Self {
assert!(
data_dir.exists(),
"data directory not found: {}",
data_dir.display()
);
Self::maybe_create(&workspace);
let cache = freqfs::Cache::<tc_fs::CacheBlock>::new(cache_size.into(), None);
let workspace = cache.clone().load(workspace).expect("workspace");
Self {
cache,
data_dir,
gateway: None,
lead: None,
public_key: None,
workspace,
}
}
pub fn with_gateway(mut self, gateway: gateway::Config) -> Self {
self.gateway = Some(gateway);
self
}
pub fn with_lead(mut self, lead: Option<tc_value::Host>) -> Self {
self.lead = lead;
self
}
pub fn with_public_key(mut self, public_key: Option<String>) -> Self {
if let Some(public_key) = public_key {
let public_key = hex::decode(public_key).expect("public key");
let len = public_key.len();
assert_eq!(len, 32, "an Ed25519 public key has 32 bytes, not {}", len);
self.public_key = Some(public_key.into())
}
self
}
fn maybe_create(path: &PathBuf) {
if !path.exists() {
log::info!(
"directory {} does not exist, attempting to create it...",
path.display()
);
std::fs::create_dir_all(path).expect("create directory hierarchy");
}
}
async fn load_dir(&self, path: PathBuf, txn_id: tc_transact::TxnId) -> tc_fs::Dir {
Self::maybe_create(&path);
log::debug!("load {} into cache", path.display());
let cache = self.cache.clone().load(path).expect("cache dir");
log::debug!("load {:?} into the transactional filesystem", cache);
tc_fs::Dir::load(txn_id, cache).await.expect("store")
}
async fn load_or_create<T>(
&self,
txn: &txn::Txn,
path_label: tcgeneric::PathLabel,
) -> cluster::Cluster<T>
where
cluster::Cluster<T>: tc_transact::fs::Persist<tc_fs::CacheBlock, Schema = cluster::Schema, Txn = txn::Txn>
+ Send
+ Sync,
{
use tc_transact::fs::Persist;
use tc_transact::Transaction;
log::debug!("load or create cluster...");
let txn_id = *txn.id();
let host = self.gateway.as_ref().expect("gateway config").host();
let dir = {
let mut path = self.data_dir.clone();
path.extend(&path_label[..]);
self.load_dir(path, txn_id).await
};
log::debug!("loaded {:?}", dir);
let actor_id = tcgeneric::TCPathBuf::default().into();
let actor = if let Some(public_key) = &self.public_key {
tc_fs::Actor::with_public_key(actor_id, public_key)
.map(Arc::new)
.expect("actor")
} else {
Arc::new(tc_fs::Actor::new(actor_id))
};
let schema = cluster::Schema::new(host, path_label.into(), self.lead.clone(), actor);
cluster::Cluster::<T>::load_or_create(txn_id, schema, dir.into())
.await
.expect("cluster")
}
async fn load_userspace(
&self,
txn_server: tc_fs::TxnServer,
gateway: gateway::Gateway,
) -> UserSpace {
use tc_chain::Recover;
use tc_transact::Transact;
let txn_id = tc_transact::TxnId::new(gateway::Gateway::time());
let token = gateway.new_token(&txn_id).expect("token");
log::debug!("loading userspace...");
let txn = txn_server
.new_txn(Arc::new(gateway), txn_id, token)
.expect("transaction");
log::trace!("new txn at {txn_id}");
let class: kernel::Class = self.load_or_create(&txn, kernel::CLASS).await;
log::trace!("loaded class");
let library: kernel::Library = self.load_or_create(&txn, kernel::LIB).await;
log::trace!("loaded library");
let service: kernel::Service = self.load_or_create(&txn, kernel::SERVICE).await;
log::trace!("loaded service");
futures::try_join!(
class.recover(&txn),
library.recover(&txn),
service.recover(&txn),
)
.expect("recover userspace");
futures::join!(
class.commit(txn_id),
library.commit(txn_id),
service.commit(txn_id),
);
(class, library, service)
}
async fn bootstrap(self) -> (gateway::Gateway, UserSpace) {
log::debug!("running bootstrap...");
let gateway_config = self.gateway.clone().expect("gateway config");
let kernel = kernel::Kernel::bootstrap();
let txn_server = tc_fs::TxnServer::new(self.workspace.clone()).await;
let gateway = gateway::Gateway::new(gateway_config.clone(), kernel, txn_server.clone());
let (class, library, service) = self.load_userspace(txn_server.clone(), gateway).await;
let kernel =
kernel::Kernel::with_userspace(class.clone(), library.clone(), service.clone());
let gateway = gateway::Gateway::new(gateway_config, kernel, txn_server.clone());
(gateway, (class, library, service))
}
async fn replicate(gateway: gateway::Gateway, userspace: UserSpace) -> TCResult<()> {
async fn replicate_cluster<T>(
gateway: gateway::Gateway,
cluster: cluster::Cluster<T>,
) -> TCResult<()>
where
T: cluster::Replica + tc_transact::Transact + Send + Sync,
{
log::trace!("replicate cluster {}...", cluster.link().path());
let txn_id = tc_transact::TxnId::new(gateway::Gateway::time());
let txn = gateway.new_txn(txn_id, None)?;
log::trace!("{:?} replication will use txn {}", cluster, txn_id);
let txn = cluster.claim(&txn).await?;
log::trace!("{:?} will add replica at {}...", cluster, txn.host());
cluster.add_replica(&txn, txn.host().clone()).await?;
cluster.distribute_commit(&txn).await
}
futures::try_join!(
replicate_cluster(gateway.clone(), userspace.0),
replicate_cluster(gateway.clone(), userspace.1),
replicate_cluster(gateway.clone(), userspace.2),
)?;
Ok(())
}
pub async fn replicate_and_serve(self) -> Result<(), TokioError> {
let (gateway, userspace) = self.bootstrap().await;
futures::try_join!(
gateway.clone().listen().map_err(TokioError::from),
Self::replicate(gateway, userspace).map_err(TokioError::from)
)?;
Ok(())
}
}