1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
use std::collections::{HashMap, HashSet};
use futures::future::FutureExt;
use futures::stream::{FuturesUnordered, StreamExt};
use log::debug;
use tc_error::*;
use tcgeneric::{Id, Instance, Map};
use crate::route::Public;
use crate::scalar::{Refer, Scope};
use crate::state::State;
use crate::txn::Txn;
pub struct Executor<'a, T> {
txn: Txn,
scope: Scope<'a, T>,
}
impl<'a, T: Instance + Public> Executor<'a, T> {
pub fn new<S: Into<State>, I: IntoIterator<Item = (Id, S)>>(
txn: Txn,
subject: &'a T,
data: I,
) -> Self {
let scope = Scope::new(subject, data);
Self { txn, scope }
}
pub fn with_context<S: Into<State>, I: IntoIterator<Item = (Id, S)>>(
txn: Txn,
subject: &'a T,
context: Map<State>,
iter: I,
) -> Self {
let scope = Scope::with_context(subject, context, iter);
Self { txn, scope }
}
pub async fn capture(mut self, capture: Id) -> TCResult<State> {
debug!("execute op & capture {}", capture);
while self.scope.resolve_id(&capture)?.is_ref() {
let mut visited = HashSet::with_capacity(self.scope.len());
let mut pending = Vec::with_capacity(self.scope.len());
let mut unvisited = Vec::with_capacity(self.scope.len());
unvisited.push(capture.clone());
while let Some(id) = unvisited.pop() {
if visited.contains(&id) {
return Err(TCError::bad_request("circular dependency detected", id));
} else {
visited.insert(id.clone());
}
let state = self.scope.resolve_id(&id)?;
debug!("checking state {}: {}", id, state);
if state.is_ref() {
let mut deps = HashSet::new();
state.requires(&mut deps);
let mut ready = true;
for dep_id in deps.into_iter() {
if self.scope.resolve_id(&dep_id)?.is_ref() {
ready = false;
unvisited.push(dep_id);
}
}
if ready {
pending.push(id);
} else {
debug!("{} still has unresolved deps", id);
}
} else {
debug!("{} already resolved: {}", id, state);
}
}
if pending.is_empty() && self.scope.resolve_id(&capture)?.is_ref() {
return Err(TCError::bad_request(
"cannot resolve all dependencies of",
capture,
));
}
let mut resolved = HashMap::with_capacity(pending.len());
{
let mut providers = FuturesUnordered::new();
for id in pending.into_iter() {
let state = self.scope.resolve_id(&id)?.clone();
providers.push(state.resolve(&self.scope, &self.txn).map(|r| (id, r)));
}
while let Some((id, r)) = providers.next().await {
match r {
Ok(state) => {
resolved.insert(id, state);
}
Err(cause) => return Err(cause.consume(format!("error resolving {}", id))),
}
}
}
self.scope.extend(resolved);
}
self.scope
.into_inner()
.remove(&capture)
.ok_or_else(|| TCError::not_found(capture))
}
}