1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
use std::collections::{HashMap, HashSet};
use std::convert::{TryFrom, TryInto};
use std::sync::Arc;
use log::debug;
use uplock::RwLock;
use tc_error::*;
use tc_transact::lock::TxnLock;
use tc_transact::Transaction;
use tcgeneric::*;
use crate::chain::{self, Chain, ChainType, Schema};
use crate::fs;
use crate::object::{InstanceClass, InstanceExt};
use crate::scalar::{Link, LinkHost, OpRef, Scalar, Value};
use crate::txn::{Actor, Txn, TxnId};
use super::Cluster;
pub async fn instantiate(
txn: &Txn,
host: LinkHost,
class: InstanceClass,
data_dir: fs::Dir,
) -> TCResult<InstanceExt<Cluster>> {
let (link, proto) = class.into_inner();
let link = link.ok_or_else(|| {
TCError::unsupported("cluster config must specify a Link to the cluster to host")
})?;
let mut chain_schema = Map::new();
let mut cluster_proto = Map::new();
let mut classes = Map::new();
for (id, scalar) in proto.into_iter() {
debug!("Cluster member: {}", scalar);
match scalar {
Scalar::Ref(tc_ref) => {
let op_ref = OpRef::try_from(*tc_ref)?;
match op_ref {
OpRef::Get((class, schema)) => {
let classpath = TCPathBuf::try_from(class)?;
let ct = ChainType::from_path(&classpath)
.ok_or_else(|| TCError::bad_request("not a Chain", classpath))?;
debug!("an instance of {} with schema {}", ct, schema);
let schema = Schema::from_scalar(schema)?;
chain_schema.insert(id, (ct, schema));
}
OpRef::Post((extends, proto)) => {
let extends = extends.try_into()?;
classes.insert(id, InstanceClass::new(Some(extends), proto));
}
other => return Err(TCError::bad_request("expected a Chain but found", other)),
}
}
Scalar::Op(op_def) => {
cluster_proto.insert(id, Scalar::Op(op_def));
}
other => {
return Err(TCError::bad_request(
"Cluster member must be a Chain (for mutable data), or an immutable OpDef, not",
other,
))
}
}
}
let txn_id = *txn.id();
let dir = get_or_create_dir(data_dir, txn_id, link.path()).await?;
let mut replicas = HashSet::new();
replicas.insert((host, link.path().clone()).into());
let mut chains = Map::<Chain>::new();
for (id, (class, schema)) in chain_schema.into_iter() {
let dir = dir.get_or_create_dir(txn_id, id.clone()).await?;
let chain = chain::load(txn, class, schema, dir).await?;
chains.insert(id, chain);
}
let actor_id = Value::from(Link::default());
let cluster = Cluster {
link: link.clone(),
actor: Arc::new(Actor::new(actor_id)),
chains,
classes,
confirmed: RwLock::new(txn_id),
owned: RwLock::new(HashMap::new()),
installed: TxnLock::new(
format!("Cluster {} installed deps", link),
HashMap::new().into(),
),
replicas: TxnLock::new(format!("Cluster {} replicas", link), replicas.into()),
};
let class = InstanceClass::new(Some(link), cluster_proto.into());
Ok(InstanceExt::new(cluster, class))
}
async fn get_or_create_dir(
data_dir: fs::Dir,
txn_id: TxnId,
path: &[PathSegment],
) -> TCResult<fs::Dir> {
let mut dir = data_dir;
for name in path {
dir = dir.get_or_create_dir(txn_id, name.clone()).await?;
}
Ok(dir)
}