1pub(crate) mod channel;
2pub(crate) mod components;
3pub(crate) mod error;
4pub(crate) mod event_loop;
5pub(crate) mod executor;
6pub(crate) mod program;
7
8use std::collections::HashMap;
9use std::sync::Arc;
10use std::time::Duration;
11
12use flow_component::{Component, ComponentError, LocalScope};
13use futures::{FutureExt, TryFutureExt};
14use parking_lot::Mutex;
15use tracing::{info_span, Span};
16use wick_interface_types::ComponentSignature;
17use wick_packet::{Entity, Invocation, PacketStream, RuntimeConfig};
18
19use self::channel::InterpreterDispatchChannel;
20use self::components::HandlerMap;
21use self::error::Error;
22use self::event_loop::EventLoop;
23use self::program::Program;
24use crate::graph::types::*;
25use crate::interpreter::channel::InterpreterChannel;
26use crate::interpreter::components::component::ComponentComponent;
27use crate::interpreter::components::null::NullComponent;
28use crate::interpreter::components::self_component::SelfComponent;
29use crate::interpreter::executor::error::ExecutionError;
30use crate::{NamespaceHandler, Observer};
31
32#[must_use]
33#[derive()]
34pub struct Interpreter {
35 program: Program,
36 event_loop: EventLoop,
37 signature: ComponentSignature,
38 components: Arc<HandlerMap>,
39 self_component: SelfComponent,
40 dispatcher: InterpreterDispatchChannel,
41 namespace: Option<String>,
42 callback: LocalScope,
43 exposed_ops: HashMap<String, NamespaceHandler>, span: Span,
45}
46
47impl std::fmt::Debug for Interpreter {
48 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
49 f.debug_struct("Interpreter")
50 .field("program", &self.program)
51 .field("event_loop", &self.event_loop)
52 .field("signature", &self.signature)
53 .field("components", &self.components)
54 .field("dispatcher", &self.dispatcher)
55 .finish()
56 }
57}
58
59impl Interpreter {
60 pub fn new(
61 network: Network,
62 namespace: Option<String>,
63 components: Option<HandlerMap>,
64 callback: LocalScope,
65 root_config: Option<&RuntimeConfig>,
66 parent_span: &Span,
67 ) -> Result<Self, Error> {
68 let span = info_span!(parent: parent_span, "interpreter");
69
70 let _guard = span.enter();
71 let mut handlers = components.unwrap_or_default();
72 debug!(handlers = ?handlers.keys(), "initializing interpreter");
73 let mut exposed_ops = HashMap::new();
74
75 let exposed = handlers.inner().values().filter(|h| h.is_exposed()).collect::<Vec<_>>();
76 if exposed.len() > 1 {
77 return Err(Error::ExposedLimit(
78 exposed.iter().map(|h| h.namespace().to_owned()).collect(),
79 ));
80 }
81 let signature = exposed.get(0).copied().map(|handler| {
82 for op in &handler.component.signature().operations {
83 trace!(operation = op.name, "interpreter:exposing operation");
84 exposed_ops.insert(op.name.clone(), handler.clone());
85 }
86 handler.component.signature().clone()
87 });
88
89 handlers.add(NamespaceHandler::new(NullComponent::ID, Box::new(NullComponent::new())))?;
90
91 let component_component = ComponentComponent::new(&handlers);
93 handlers.add(NamespaceHandler::new(
94 ComponentComponent::ID,
95 Box::new(component_component),
96 ))?;
97
98 handlers.add_core(&network)?;
99
100 let mut signatures = handlers.component_signatures();
101 program::generate_self_signature(&network, &mut signatures).map_err(Error::EarlyError)?;
102 let program = Program::new(network, signatures)?;
103
104 program.validate()?;
105
106 let channel = InterpreterChannel::new();
107 let dispatcher = channel.dispatcher(Some(span.clone()));
108
109 let components = Arc::new(handlers);
111 let self_component = SelfComponent::new(components.clone(), program.state(), &dispatcher, root_config);
112
113 let signature = signature.unwrap_or_else(|| self_component.signature().clone());
116
117 debug!(?signature, "signature");
118
119 let event_loop = EventLoop::new(channel, &span);
120 let mut handled_opts = program.operations().iter().map(|s| s.name()).collect::<Vec<_>>();
121 handled_opts.extend(exposed_ops.keys().map(|s: &String| s.as_str()));
122
123 debug!(
124 operations = ?handled_opts,
125 components = ?components.inner().keys().cloned().collect::<Vec<_>>(),
126 "interpreter:scope"
127 );
128 drop(_guard);
129
130 Ok(Self {
131 program,
132 dispatcher,
133 signature,
134 components,
135 self_component,
136 event_loop,
137 namespace,
138 exposed_ops,
139 callback,
140 span,
141 })
142 }
143
144 fn get_callback(&self) -> LocalScope {
145 let outside_callback = self.callback.clone();
146 let internal_components = self.components.clone();
147 let self_component = self.self_component.clone();
148
149 let scope_hack = Arc::new(Mutex::new(None));
150
151 let inner_cb = scope_hack.clone();
152 let scope_proxy = LocalScope::new(Arc::new(move |compref, op, stream, inherent, config, span| {
153 let internal_components = internal_components.clone();
154 let inner_cb = inner_cb.clone();
155 let outer_scope = outside_callback.clone();
156 let self_component = self_component.clone();
157 let span = span.clone();
158 Box::pin(async move {
159 span.in_scope(|| trace!(op, %compref, "invoke:component reference"));
160 if compref.get_target_id() == SelfComponent::ID {
161 span.in_scope(|| trace!(op, %compref, "handling component invocation for self"));
162 let cb = inner_cb.lock().clone().unwrap();
163 let invocation = compref.to_invocation(&op, stream, inherent, &span);
164 self_component.handle(invocation, config, cb).await
165 } else if let Some(handler) = internal_components.get(compref.get_target_id()) {
166 span.in_scope(|| trace!(op, %compref, "handling component invocation internal to this interpreter"));
167 let cb = inner_cb.lock().clone().unwrap();
168 let invocation = compref.to_invocation(&op, stream, inherent, &span);
169 handler.component().handle(invocation, config, cb).await
170 } else {
171 outer_scope.invoke(compref, op, stream, inherent, config, &span).await
172 }
173 })
174 }));
175 scope_hack.lock().replace(scope_proxy.clone());
176 scope_proxy
177 }
178
179 pub async fn invoke(&self, invocation: Invocation, config: Option<RuntimeConfig>) -> Result<PacketStream, Error> {
180 let cb = self.get_callback();
181 let stream = self
182 .handle(invocation, config, cb)
183 .await
184 .map_err(ExecutionError::ComponentError)?;
185
186 Ok(stream)
187 }
188
189 pub async fn start(
190 &mut self,
191 options: Option<InterpreterOptions>,
192 observer: Option<Box<dyn Observer + Send + Sync>>,
193 ) {
194 self.event_loop.start(options.unwrap_or_default(), observer).await;
195 }
196
197 pub async fn stop(&self) -> Result<(), Error> {
198 let shutdown = self.event_loop.shutdown().await;
199 if let Err(error) = &shutdown {
200 self.span.in_scope(|| error!(%error,"error shutting down event loop"));
201 };
202 for (ns, components) in self.components.inner() {
203 self
204 .span
205 .in_scope(|| debug!(namespace = %ns, "shutting down component"));
206 if let Err(error) = components
207 .component
208 .shutdown()
209 .await
210 .map_err(|e| Error::ComponentShutdown(e.to_string()))
211 {
212 self.span.in_scope(|| warn!(%error,"error during shutdown"));
213 };
214 }
215
216 shutdown
217 }
218
219 pub fn components(&self) -> &HandlerMap {
220 &self.components
221 }
222
223 pub fn render_dotviz(&self, op: &str) -> Result<String, Error> {
224 self.program.dotviz(op)
225 }
226}
227
228impl Component for Interpreter {
229 fn handle(
230 &self,
231 mut invocation: Invocation,
232 config: Option<RuntimeConfig>,
233 cb: LocalScope,
234 ) -> flow_component::BoxFuture<Result<PacketStream, ComponentError>> {
235 let known_targets = || {
236 let mut hosted: Vec<_> = self.components.inner().keys().cloned().collect();
237 if let Some(ns) = &self.namespace {
238 hosted.push(ns.clone());
239 }
240 hosted
241 };
242 let span = invocation.span().clone();
243
244 span
245 .in_scope(|| trace!(target=%invocation.target().url(),tx_id=%invocation.tx_id(),id=%invocation.id(), "invoking"));
246 let from_exposed = self.exposed_ops.get(invocation.target().operation_id());
247
248 Box::pin(async move {
249 let stream = match invocation.target() {
250 Entity::Operation(ns, _) => {
251 if ns == SelfComponent::ID || ns == Entity::LOCAL || Some(ns) == self.namespace.as_ref() {
252 if let Some(handler) = from_exposed {
253 let new_target = Entity::operation(handler.namespace(), invocation.target().operation_id());
254 span.in_scope(|| trace!(origin=%invocation.origin(),original_target=%invocation.target(), %new_target, "invoke::exposed::operation"));
255 invocation = invocation.redirect(new_target);
256 return handler.component.handle(invocation, config, cb).await;
257 }
258 span.in_scope(
259 || trace!(origin=%invocation.origin(),target=%invocation.target(), "invoke::composite::operation"),
260 );
261 self
262 .self_component
263 .handle(invocation, config, self.get_callback())
264 .await?
265 } else if let Some(handler) = self.components.get(ns) {
266 span.in_scope(
267 || trace!(origin=%invocation.origin(),target=%invocation.target(), "invoke::handler::operation"),
268 );
269 handler.component.handle(invocation, config, cb).await?
270 } else {
271 return Err(ComponentError::new(Error::TargetNotFound(
272 invocation.target().clone(),
273 known_targets(),
274 )));
275 }
276 }
277 _ => return Err(ComponentError::new(Error::InvalidEntity(invocation.target().clone()))),
278 };
279
280 Ok::<_, ComponentError>(stream)
281 })
282 }
283
284 fn signature(&self) -> &ComponentSignature {
285 &self.signature
286 }
287
288 fn shutdown(&self) -> flow_component::BoxFuture<Result<(), ComponentError>> {
289 self.stop().map_err(ComponentError::new).boxed()
290 }
291}
292
293#[derive(Debug, Clone)]
294#[allow(missing_copy_implementations)]
295#[non_exhaustive]
296pub struct InterpreterOptions {
297 pub output_timeout: Duration,
299}
300
301impl Default for InterpreterOptions {
302 fn default() -> Self {
303 Self {
304 output_timeout: Duration::from_secs(500),
305 }
306 }
307}
308
309#[cfg(test)]
310mod test {
311 use anyhow::Result;
312
313 use super::*;
314
315 const fn sync_send<T>()
316 where
317 T: Sync + Send,
318 {
319 }
320
321 #[test]
322 const fn test_sync_send() -> Result<()> {
323 sync_send::<Interpreter>();
324 Ok(())
325 }
326}