wick_packet/
collection_link.rs

1use serde::{Deserialize, Serialize};
2
3use crate::{Entity, Result};
4
5/// An implementation that encapsulates a collection link that components use to call out to components on other Wick collections.
6#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
7#[must_use]
8pub struct ComponentReference {
9  origin: Entity,
10  target: Entity,
11}
12
13impl ComponentReference {
14  /// Constructor for a [ComponentReference]
15  pub const fn new(origin: Entity, target: Entity) -> Self {
16    Self { origin, target }
17  }
18
19  #[cfg(feature = "invocation")]
20  /// Create an [crate::Invocation] for this component reference.
21  pub fn to_invocation(
22    &self,
23    operation: &str,
24    packets: impl Into<crate::PacketStream>,
25    inherent: crate::InherentData,
26    parent: &tracing::Span,
27  ) -> crate::Invocation {
28    let target = crate::Entity::operation(self.target.component_id(), operation);
29
30    crate::Invocation::new(self.origin.clone(), target, packets, inherent, parent)
31  }
32
33  #[must_use]
34  /// Get the URL for the called component
35  pub fn get_origin_url(&self) -> String {
36    self.origin.url()
37  }
38
39  /// Get target component ID.
40  #[must_use]
41  pub fn get_target_id(&self) -> &str {
42    self.target.component_id()
43  }
44
45  /// Make a call to the linked collection.
46  pub fn call(
47    &self,
48    operation: &str,
49    stream: wasmrs_rx::BoxFlux<wasmrs::RawPayload, wasmrs_frames::PayloadError>,
50    config: Option<crate::RuntimeConfig>,
51    previous_inherent: crate::InherentData,
52  ) -> Result<wasmrs_rx::BoxFlux<wasmrs::Payload, wasmrs_frames::PayloadError>> {
53    link_call(self.clone(), operation, stream, config, previous_inherent)
54  }
55}
56
57impl std::fmt::Display for ComponentReference {
58  fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
59    write!(f, "{}=>{}", self.origin, self.target)
60  }
61}
62
63#[derive(Debug, Clone, Serialize, Deserialize)]
64#[must_use]
65struct InvocationPayload {
66  reference: ComponentReference,
67  operation: String,
68}
69
70#[cfg(target_family = "wasm")]
71fn link_call(
72  compref: ComponentReference,
73  target_op: &str,
74  mut stream: wasmrs_rx::BoxFlux<wasmrs::RawPayload, wasmrs_frames::PayloadError>,
75  config: Option<crate::RuntimeConfig>,
76  previous_inherent: crate::InherentData,
77) -> Result<wasmrs_rx::BoxFlux<wasmrs::Payload, wasmrs_frames::PayloadError>> {
78  use tokio_stream::StreamExt;
79  use wasmrs::RSocket;
80  use wasmrs_guest::{FluxChannel, Observer};
81
82  let (tx, rx) = FluxChannel::new_parts();
83  let first = crate::ContextTransport {
84    config,
85    invocation: Some(crate::InvocationRequest {
86      reference: compref,
87      operation: target_op.to_owned(),
88    }),
89    inherent: previous_inherent,
90  };
91
92  let _ = tx.send_result(crate::Packet::encode("", first).into());
93  let _ = wasmrs_guest::runtime::spawn("comp_ref", async move {
94    loop {
95      if let Some(payload) = stream.next().await {
96        if let Err(_e) = tx.send_result(payload) {
97          // Error sending payload, channel probably closed.
98        };
99      } else {
100        break;
101      }
102    }
103  });
104
105  Ok(Box::pin(wasmrs_guest::Host::default().request_channel(rx.boxed()).map(
106    |r| {
107      r.and_then(|r| {
108        wasmrs::Payload::try_from(r).map_err(|e| wasmrs_frames::PayloadError::application_error(e.to_string(), None))
109      })
110    },
111  )))
112}
113
114#[cfg(not(target_family = "wasm"))]
115#[allow(clippy::needless_pass_by_value)]
116fn link_call(
117  _compref: ComponentReference,
118  _target_op: &str,
119  _input: wasmrs_rx::BoxFlux<wasmrs::RawPayload, wasmrs_frames::PayloadError>,
120  _config: Option<crate::RuntimeConfig>,
121  _previous_inherent: crate::InherentData,
122) -> Result<wasmrs_rx::BoxFlux<wasmrs::Payload, wasmrs_frames::PayloadError>> {
123  unimplemented!("Link calls from native components is not implemented yet")
124}