1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
use std::collections::{HashMap, HashSet};
use std::convert::{TryFrom, TryInto};
use std::sync::Arc;

use log::debug;
use uplock::RwLock;

use tc_error::*;
use tc_transact::lock::TxnLock;
use tcgeneric::*;

use crate::chain::{self, Chain, ChainType};
use crate::fs;
use crate::object::{InstanceClass, InstanceExt};
use crate::scalar::{Link, LinkHost, OpRef, Scalar, Value};
use crate::txn::{Actor, TxnId};

use super::Cluster;

/// Load a cluster from the filesystem, or instantiate a new one.
pub async fn instantiate(
    host: LinkHost,
    class: InstanceClass,
    data_dir: fs::Dir,
    txn_id: TxnId,
) -> TCResult<InstanceExt<Cluster>> {
    let (link, proto) = class.into_inner();
    let link = link.ok_or_else(|| {
        TCError::unsupported("cluster config must specify a Link to the cluster to host")
    })?;

    let mut chain_schema = HashMap::new();
    let mut cluster_proto = HashMap::new();
    let mut classes = HashMap::new();

    for (id, scalar) in proto.into_iter() {
        debug!("Cluster member: {}", scalar);

        match scalar {
            Scalar::Ref(tc_ref) => {
                let op_ref = OpRef::try_from(*tc_ref)?;
                match op_ref {
                    OpRef::Get((subject, schema)) => {
                        let classpath = TCPathBuf::try_from(subject)?;
                        let ct = ChainType::from_path(&classpath)
                            .ok_or_else(|| TCError::bad_request("not a Chain", classpath))?;

                        let schema: Value = schema.try_into()?;
                        chain_schema.insert(id, (ct, schema));
                    }
                    OpRef::Post((extends, proto)) => {
                        let extends = extends.try_into()?;
                        classes.insert(id, InstanceClass::new(Some(extends), proto));
                    }
                    other => return Err(TCError::bad_request("expected a Chain but found", other)),
                }
            }
            Scalar::Op(op_def) => {
                cluster_proto.insert(id, Scalar::Op(op_def));
            }
            other => {
                return Err(TCError::bad_request(
                    "Cluster member must be a Chain (for mutable data), or an immutable OpDef, not",
                    other,
                ))
            }
        }
    }

    let dir = get_or_create_dir(data_dir, txn_id, link.path()).await?;
    let mut replicas = HashSet::new();
    replicas.insert((host, link.path().clone()).into());

    let mut chains = HashMap::<Id, Chain>::new();
    for (id, (class, schema)) in chain_schema.into_iter() {
        let dir = dir.get_or_create_dir(txn_id, id.clone()).await?;
        let chain = chain::load(class, schema, dir, txn_id).await?;
        chains.insert(id, chain);
    }

    let actor_id = Value::from(Link::default());

    let cluster = Cluster {
        link: link.clone(),
        actor: Arc::new(Actor::new(actor_id)),
        chains: chains.into(),
        classes: classes.into(),
        confirmed: RwLock::new(txn_id),
        owned: RwLock::new(HashMap::new()),
        installed: TxnLock::new(
            format!("Cluster {} installed deps", link),
            HashMap::new().into(),
        ),
        replicas: TxnLock::new(format!("Cluster {} replicas", link), replicas.into()),
    };

    let class = InstanceClass::new(Some(link), cluster_proto.into());

    Ok(InstanceExt::new(cluster, class))
}

async fn get_or_create_dir(
    data_dir: fs::Dir,
    txn_id: TxnId,
    path: &[PathSegment],
) -> TCResult<fs::Dir> {
    let mut dir = data_dir;
    for name in path {
        dir = dir.get_or_create_dir(txn_id, name.clone()).await?;
    }

    Ok(dir)
}