1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
use std::collections::HashMap;
use std::convert::{TryFrom, TryInto};
use std::sync::Arc;
use futures::future::TryFutureExt;
use log::debug;
use safecast::TryCastInto;
use uplock::RwLock;
use tc_error::*;
use tc_transact::fs::Persist;
use tc_transact::lock::TxnLock;
use tcgeneric::*;
use crate::chain::{Chain, ChainType, SyncChain};
use crate::fs;
use crate::object::{InstanceClass, InstanceExt};
use crate::scalar::{Link, OpRef, Scalar, Value};
use crate::txn::{Actor, TxnId};
use super::Cluster;
pub async fn instantiate(
class: InstanceClass,
data_dir: fs::Dir,
txn_id: TxnId,
) -> TCResult<InstanceExt<Cluster>> {
let (path, proto) = class.into_inner();
let path = path.ok_or_else(|| {
TCError::unsupported("cluster config must specify the path of the cluster to host")
})?;
let path = path.into_path();
let mut chain_schema = HashMap::new();
let mut cluster_proto = HashMap::new();
let mut classes = HashMap::new();
for (id, scalar) in proto.into_iter() {
debug!("Cluster member: {}", scalar);
match scalar {
Scalar::Ref(tc_ref) => {
let op_ref = OpRef::try_from(*tc_ref)?;
match op_ref {
OpRef::Get((subject, schema)) => {
let classpath = TCPathBuf::try_from(subject)?;
let ct = ChainType::from_path(&classpath)
.ok_or_else(|| TCError::bad_request("not a Chain", classpath))?;
let schema: Value = schema.try_into()?;
chain_schema.insert(id, (ct, schema));
}
OpRef::Post((extends, proto)) => {
let extends = extends.try_into()?;
classes.insert(id, InstanceClass::new(Some(extends), proto));
}
other => return Err(TCError::bad_request("expected a Chain but found", other)),
}
}
Scalar::Op(op_def) => {
cluster_proto.insert(id, Scalar::Op(op_def));
}
other => {
return Err(TCError::bad_request(
"Cluster member must be a Chain (for mutable data), or an immutable OpDef, not",
other,
))
}
}
}
let dir = get_or_create_dir(data_dir, txn_id, &path).await?;
let mut chains = HashMap::<Id, Chain>::new();
for (id, (class, schema)) in chain_schema.into_iter() {
let dir = dir.get_or_create_dir(txn_id, id.clone()).await?;
let chain = match class {
ChainType::Sync => {
let schema =
schema.try_cast_into(|v| TCError::bad_request("invalid Chain schema", v))?;
SyncChain::load(schema, dir, txn_id)
.map_ok(Chain::Sync)
.await?
}
};
chains.insert(id, chain);
}
let actor_id = Value::from(Link::default());
let cluster = Cluster {
actor: Arc::new(Actor::new(actor_id)),
path: path.clone(),
chains: chains.into(),
classes: classes.into(),
confirmed: RwLock::new(txn_id),
owned: RwLock::new(HashMap::new()),
installed: TxnLock::new(
format!("Cluster {} installed deps", path),
HashMap::new().into(),
),
};
let class = InstanceClass::new(Some(path.into()), cluster_proto.into());
Ok(InstanceExt::new(cluster, class))
}
async fn get_or_create_dir(
data_dir: fs::Dir,
txn_id: TxnId,
path: &[PathSegment],
) -> TCResult<fs::Dir> {
let mut dir = data_dir;
for name in path {
dir = dir.get_or_create_dir(txn_id, name.clone()).await?;
}
Ok(dir)
}