flow_graph_interpreter/
interpreter.rs

1pub(crate) mod channel;
2pub(crate) mod components;
3pub(crate) mod error;
4pub(crate) mod event_loop;
5pub(crate) mod executor;
6pub(crate) mod program;
7
8use std::collections::HashMap;
9use std::sync::Arc;
10use std::time::Duration;
11
12use flow_component::{Component, ComponentError, LocalScope};
13use futures::{FutureExt, TryFutureExt};
14use parking_lot::Mutex;
15use tracing::{info_span, Span};
16use wick_interface_types::ComponentSignature;
17use wick_packet::{Entity, Invocation, PacketStream, RuntimeConfig};
18
19use self::channel::InterpreterDispatchChannel;
20use self::components::HandlerMap;
21use self::error::Error;
22use self::event_loop::EventLoop;
23use self::program::Program;
24use crate::graph::types::*;
25use crate::interpreter::channel::InterpreterChannel;
26use crate::interpreter::components::component::ComponentComponent;
27use crate::interpreter::components::null::NullComponent;
28use crate::interpreter::components::self_component::SelfComponent;
29use crate::interpreter::executor::error::ExecutionError;
30use crate::{NamespaceHandler, Observer};
31
32#[must_use]
33#[derive()]
34pub struct Interpreter {
35  program: Program,
36  event_loop: EventLoop,
37  signature: ComponentSignature,
38  components: Arc<HandlerMap>,
39  self_component: SelfComponent,
40  dispatcher: InterpreterDispatchChannel,
41  namespace: Option<String>,
42  callback: LocalScope,
43  exposed_ops: HashMap<String, NamespaceHandler>, // A map from op name to the ns of the handler that exposes it.
44  span: Span,
45}
46
47impl std::fmt::Debug for Interpreter {
48  fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
49    f.debug_struct("Interpreter")
50      .field("program", &self.program)
51      .field("event_loop", &self.event_loop)
52      .field("signature", &self.signature)
53      .field("components", &self.components)
54      .field("dispatcher", &self.dispatcher)
55      .finish()
56  }
57}
58
59impl Interpreter {
60  pub fn new(
61    network: Network,
62    namespace: Option<String>,
63    components: Option<HandlerMap>,
64    callback: LocalScope,
65    root_config: Option<&RuntimeConfig>,
66    parent_span: &Span,
67  ) -> Result<Self, Error> {
68    let span = info_span!(parent: parent_span, "interpreter");
69
70    let _guard = span.enter();
71    let mut handlers = components.unwrap_or_default();
72    debug!(handlers = ?handlers.keys(), "initializing interpreter");
73    let mut exposed_ops = HashMap::new();
74
75    let exposed = handlers.inner().values().filter(|h| h.is_exposed()).collect::<Vec<_>>();
76    if exposed.len() > 1 {
77      return Err(Error::ExposedLimit(
78        exposed.iter().map(|h| h.namespace().to_owned()).collect(),
79      ));
80    }
81    let signature = exposed.get(0).copied().map(|handler| {
82      for op in &handler.component.signature().operations {
83        trace!(operation = op.name, "interpreter:exposing operation");
84        exposed_ops.insert(op.name.clone(), handler.clone());
85      }
86      handler.component.signature().clone()
87    });
88
89    handlers.add(NamespaceHandler::new(NullComponent::ID, Box::new(NullComponent::new())))?;
90
91    // Add the component:: component
92    let component_component = ComponentComponent::new(&handlers);
93    handlers.add(NamespaceHandler::new(
94      ComponentComponent::ID,
95      Box::new(component_component),
96    ))?;
97
98    handlers.add_core(&network)?;
99
100    let mut signatures = handlers.component_signatures();
101    program::generate_self_signature(&network, &mut signatures).map_err(Error::EarlyError)?;
102    let program = Program::new(network, signatures)?;
103
104    program.validate()?;
105
106    let channel = InterpreterChannel::new();
107    let dispatcher = channel.dispatcher(Some(span.clone()));
108
109    // Make the self:: component
110    let components = Arc::new(handlers);
111    let self_component = SelfComponent::new(components.clone(), program.state(), &dispatcher, root_config);
112
113    // If we expose a component, expose its signature as our own.
114    // Otherwise expose our self signature.
115    let signature = signature.unwrap_or_else(|| self_component.signature().clone());
116
117    debug!(?signature, "signature");
118
119    let event_loop = EventLoop::new(channel, &span);
120    let mut handled_opts = program.operations().iter().map(|s| s.name()).collect::<Vec<_>>();
121    handled_opts.extend(exposed_ops.keys().map(|s: &String| s.as_str()));
122
123    debug!(
124      operations = ?handled_opts,
125      components = ?components.inner().keys().cloned().collect::<Vec<_>>(),
126      "interpreter:scope"
127    );
128    drop(_guard);
129
130    Ok(Self {
131      program,
132      dispatcher,
133      signature,
134      components,
135      self_component,
136      event_loop,
137      namespace,
138      exposed_ops,
139      callback,
140      span,
141    })
142  }
143
144  fn get_callback(&self) -> LocalScope {
145    let outside_callback = self.callback.clone();
146    let internal_components = self.components.clone();
147    let self_component = self.self_component.clone();
148
149    let scope_hack = Arc::new(Mutex::new(None));
150
151    let inner_cb = scope_hack.clone();
152    let scope_proxy = LocalScope::new(Arc::new(move |compref, op, stream, inherent, config, span| {
153      let internal_components = internal_components.clone();
154      let inner_cb = inner_cb.clone();
155      let outer_scope = outside_callback.clone();
156      let self_component = self_component.clone();
157      let span = span.clone();
158      Box::pin(async move {
159        span.in_scope(|| trace!(op, %compref, "invoke:component reference"));
160        if compref.get_target_id() == SelfComponent::ID {
161          span.in_scope(|| trace!(op, %compref, "handling component invocation for self"));
162          let cb = inner_cb.lock().clone().unwrap();
163          let invocation = compref.to_invocation(&op, stream, inherent, &span);
164          self_component.handle(invocation, config, cb).await
165        } else if let Some(handler) = internal_components.get(compref.get_target_id()) {
166          span.in_scope(|| trace!(op, %compref, "handling component invocation internal to this interpreter"));
167          let cb = inner_cb.lock().clone().unwrap();
168          let invocation = compref.to_invocation(&op, stream, inherent, &span);
169          handler.component().handle(invocation, config, cb).await
170        } else {
171          outer_scope.invoke(compref, op, stream, inherent, config, &span).await
172        }
173      })
174    }));
175    scope_hack.lock().replace(scope_proxy.clone());
176    scope_proxy
177  }
178
179  pub async fn invoke(&self, invocation: Invocation, config: Option<RuntimeConfig>) -> Result<PacketStream, Error> {
180    let cb = self.get_callback();
181    let stream = self
182      .handle(invocation, config, cb)
183      .await
184      .map_err(ExecutionError::ComponentError)?;
185
186    Ok(stream)
187  }
188
189  pub async fn start(
190    &mut self,
191    options: Option<InterpreterOptions>,
192    observer: Option<Box<dyn Observer + Send + Sync>>,
193  ) {
194    self.event_loop.start(options.unwrap_or_default(), observer).await;
195  }
196
197  pub async fn stop(&self) -> Result<(), Error> {
198    let shutdown = self.event_loop.shutdown().await;
199    if let Err(error) = &shutdown {
200      self.span.in_scope(|| error!(%error,"error shutting down event loop"));
201    };
202    for (ns, components) in self.components.inner() {
203      self
204        .span
205        .in_scope(|| debug!(namespace = %ns, "shutting down component"));
206      if let Err(error) = components
207        .component
208        .shutdown()
209        .await
210        .map_err(|e| Error::ComponentShutdown(e.to_string()))
211      {
212        self.span.in_scope(|| warn!(%error,"error during shutdown"));
213      };
214    }
215
216    shutdown
217  }
218
219  pub fn components(&self) -> &HandlerMap {
220    &self.components
221  }
222
223  pub fn render_dotviz(&self, op: &str) -> Result<String, Error> {
224    self.program.dotviz(op)
225  }
226}
227
228impl Component for Interpreter {
229  fn handle(
230    &self,
231    mut invocation: Invocation,
232    config: Option<RuntimeConfig>,
233    cb: LocalScope,
234  ) -> flow_component::BoxFuture<Result<PacketStream, ComponentError>> {
235    let known_targets = || {
236      let mut hosted: Vec<_> = self.components.inner().keys().cloned().collect();
237      if let Some(ns) = &self.namespace {
238        hosted.push(ns.clone());
239      }
240      hosted
241    };
242    let span = invocation.span().clone();
243
244    span
245      .in_scope(|| trace!(target=%invocation.target().url(),tx_id=%invocation.tx_id(),id=%invocation.id(), "invoking"));
246    let from_exposed = self.exposed_ops.get(invocation.target().operation_id());
247
248    Box::pin(async move {
249      let stream = match invocation.target() {
250        Entity::Operation(ns, _) => {
251          if ns == SelfComponent::ID || ns == Entity::LOCAL || Some(ns) == self.namespace.as_ref() {
252            if let Some(handler) = from_exposed {
253              let new_target = Entity::operation(handler.namespace(), invocation.target().operation_id());
254              span.in_scope(|| trace!(origin=%invocation.origin(),original_target=%invocation.target(), %new_target, "invoke::exposed::operation"));
255              invocation = invocation.redirect(new_target);
256              return handler.component.handle(invocation, config, cb).await;
257            }
258            span.in_scope(
259              || trace!(origin=%invocation.origin(),target=%invocation.target(), "invoke::composite::operation"),
260            );
261            self
262              .self_component
263              .handle(invocation, config, self.get_callback())
264              .await?
265          } else if let Some(handler) = self.components.get(ns) {
266            span.in_scope(
267              || trace!(origin=%invocation.origin(),target=%invocation.target(), "invoke::handler::operation"),
268            );
269            handler.component.handle(invocation, config, cb).await?
270          } else {
271            return Err(ComponentError::new(Error::TargetNotFound(
272              invocation.target().clone(),
273              known_targets(),
274            )));
275          }
276        }
277        _ => return Err(ComponentError::new(Error::InvalidEntity(invocation.target().clone()))),
278      };
279
280      Ok::<_, ComponentError>(stream)
281    })
282  }
283
284  fn signature(&self) -> &ComponentSignature {
285    &self.signature
286  }
287
288  fn shutdown(&self) -> flow_component::BoxFuture<Result<(), ComponentError>> {
289    self.stop().map_err(ComponentError::new).boxed()
290  }
291}
292
293#[derive(Debug, Clone)]
294#[allow(missing_copy_implementations)]
295#[non_exhaustive]
296pub struct InterpreterOptions {
297  /// Timeout after which a component that has received no output is considered dead.
298  pub output_timeout: Duration,
299}
300
301impl Default for InterpreterOptions {
302  fn default() -> Self {
303    Self {
304      output_timeout: Duration::from_secs(500),
305    }
306  }
307}
308
309#[cfg(test)]
310mod test {
311  use anyhow::Result;
312
313  use super::*;
314
315  const fn sync_send<T>()
316  where
317    T: Sync + Send,
318  {
319  }
320
321  #[test]
322  const fn test_sync_send() -> Result<()> {
323    sync_send::<Interpreter>();
324    Ok(())
325  }
326}