wick_packet/
collection_link.rs1use serde::{Deserialize, Serialize};
2
3use crate::{Entity, Result};
4
5#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
7#[must_use]
8pub struct ComponentReference {
9 origin: Entity,
10 target: Entity,
11}
12
13impl ComponentReference {
14 pub const fn new(origin: Entity, target: Entity) -> Self {
16 Self { origin, target }
17 }
18
19 #[cfg(feature = "invocation")]
20 pub fn to_invocation(
22 &self,
23 operation: &str,
24 packets: impl Into<crate::PacketStream>,
25 inherent: crate::InherentData,
26 parent: &tracing::Span,
27 ) -> crate::Invocation {
28 let target = crate::Entity::operation(self.target.component_id(), operation);
29
30 crate::Invocation::new(self.origin.clone(), target, packets, inherent, parent)
31 }
32
33 #[must_use]
34 pub fn get_origin_url(&self) -> String {
36 self.origin.url()
37 }
38
39 #[must_use]
41 pub fn get_target_id(&self) -> &str {
42 self.target.component_id()
43 }
44
45 pub fn call(
47 &self,
48 operation: &str,
49 stream: wasmrs_rx::BoxFlux<wasmrs::RawPayload, wasmrs_frames::PayloadError>,
50 config: Option<crate::RuntimeConfig>,
51 previous_inherent: crate::InherentData,
52 ) -> Result<wasmrs_rx::BoxFlux<wasmrs::Payload, wasmrs_frames::PayloadError>> {
53 link_call(self.clone(), operation, stream, config, previous_inherent)
54 }
55}
56
57impl std::fmt::Display for ComponentReference {
58 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
59 write!(f, "{}=>{}", self.origin, self.target)
60 }
61}
62
63#[derive(Debug, Clone, Serialize, Deserialize)]
64#[must_use]
65struct InvocationPayload {
66 reference: ComponentReference,
67 operation: String,
68}
69
70#[cfg(target_family = "wasm")]
71fn link_call(
72 compref: ComponentReference,
73 target_op: &str,
74 mut stream: wasmrs_rx::BoxFlux<wasmrs::RawPayload, wasmrs_frames::PayloadError>,
75 config: Option<crate::RuntimeConfig>,
76 previous_inherent: crate::InherentData,
77) -> Result<wasmrs_rx::BoxFlux<wasmrs::Payload, wasmrs_frames::PayloadError>> {
78 use tokio_stream::StreamExt;
79 use wasmrs::RSocket;
80 use wasmrs_guest::{FluxChannel, Observer};
81
82 let (tx, rx) = FluxChannel::new_parts();
83 let first = crate::ContextTransport {
84 config,
85 invocation: Some(crate::InvocationRequest {
86 reference: compref,
87 operation: target_op.to_owned(),
88 }),
89 inherent: previous_inherent,
90 };
91
92 let _ = tx.send_result(crate::Packet::encode("", first).into());
93 let _ = wasmrs_guest::runtime::spawn("comp_ref", async move {
94 loop {
95 if let Some(payload) = stream.next().await {
96 if let Err(_e) = tx.send_result(payload) {
97 };
99 } else {
100 break;
101 }
102 }
103 });
104
105 Ok(Box::pin(wasmrs_guest::Host::default().request_channel(rx.boxed()).map(
106 |r| {
107 r.and_then(|r| {
108 wasmrs::Payload::try_from(r).map_err(|e| wasmrs_frames::PayloadError::application_error(e.to_string(), None))
109 })
110 },
111 )))
112}
113
114#[cfg(not(target_family = "wasm"))]
115#[allow(clippy::needless_pass_by_value)]
116fn link_call(
117 _compref: ComponentReference,
118 _target_op: &str,
119 _input: wasmrs_rx::BoxFlux<wasmrs::RawPayload, wasmrs_frames::PayloadError>,
120 _config: Option<crate::RuntimeConfig>,
121 _previous_inherent: crate::InherentData,
122) -> Result<wasmrs_rx::BoxFlux<wasmrs::Payload, wasmrs_frames::PayloadError>> {
123 unimplemented!("Link calls from native components is not implemented yet")
124}