hydroflow_plus/location/
external_process.rs1use std::marker::PhantomData;
2
3use hydroflow::bytes::Bytes;
4use serde::de::DeserializeOwned;
5use serde::Serialize;
6
7use super::{Location, LocationId, NoTick};
8use crate::builder::FlowState;
9use crate::ir::{HfPlusNode, HfPlusSource};
10use crate::{Stream, Unbounded};
11
12pub struct ExternalBytesPort {
13 pub(crate) process_id: usize,
14 pub(crate) port_id: usize,
15}
16
17pub struct ExternalBincodeSink<T: Serialize> {
18 pub(crate) process_id: usize,
19 pub(crate) port_id: usize,
20 pub(crate) _phantom: PhantomData<T>,
21}
22
23pub struct ExternalBincodeStream<T: DeserializeOwned> {
24 pub(crate) process_id: usize,
25 pub(crate) port_id: usize,
26 pub(crate) _phantom: PhantomData<T>,
27}
28
29pub struct ExternalProcess<'a, P> {
30 pub(crate) id: usize,
31
32 pub(crate) flow_state: FlowState,
33
34 pub(crate) _phantom: PhantomData<&'a &'a mut P>,
35}
36
37impl<P> Clone for ExternalProcess<'_, P> {
38 fn clone(&self) -> Self {
39 ExternalProcess {
40 id: self.id,
41 flow_state: self.flow_state.clone(),
42 _phantom: PhantomData,
43 }
44 }
45}
46
47impl<'a, P> Location<'a> for ExternalProcess<'a, P> {
48 fn id(&self) -> LocationId {
49 LocationId::ExternalProcess(self.id)
50 }
51
52 fn flow_state(&self) -> &FlowState {
53 &self.flow_state
54 }
55
56 fn is_top_level() -> bool {
57 true
58 }
59}
60
61impl<'a, P> ExternalProcess<'a, P> {
62 pub fn source_external_bytes<L: Location<'a> + NoTick>(
63 &self,
64 to: &L,
65 ) -> (ExternalBytesPort, Stream<Bytes, L, Unbounded>) {
66 let next_external_port_id = {
67 let mut flow_state = self.flow_state.borrow_mut();
68 let id = flow_state.next_external_out;
69 flow_state.next_external_out += 1;
70 id
71 };
72
73 (
74 ExternalBytesPort {
75 process_id: self.id,
76 port_id: next_external_port_id,
77 },
78 Stream::new(
79 to.clone(),
80 HfPlusNode::Persist(Box::new(HfPlusNode::Network {
81 from_location: LocationId::ExternalProcess(self.id),
82 from_key: Some(next_external_port_id),
83 to_location: to.id(),
84 to_key: None,
85 serialize_pipeline: None,
86 instantiate_fn: crate::ir::DebugInstantiate::Building(),
87 deserialize_pipeline: Some(syn::parse_quote!(map(|b| b.unwrap().freeze()))),
88 input: Box::new(HfPlusNode::Source {
89 source: HfPlusSource::ExternalNetwork(),
90 location_kind: LocationId::ExternalProcess(self.id),
91 }),
92 })),
93 ),
94 )
95 }
96
97 pub fn source_external_bincode<L: Location<'a> + NoTick, T: Serialize + DeserializeOwned>(
98 &self,
99 to: &L,
100 ) -> (ExternalBincodeSink<T>, Stream<T, L, Unbounded>) {
101 let next_external_port_id = {
102 let mut flow_state = self.flow_state.borrow_mut();
103 let id = flow_state.next_external_out;
104 flow_state.next_external_out += 1;
105 id
106 };
107
108 (
109 ExternalBincodeSink {
110 process_id: self.id,
111 port_id: next_external_port_id,
112 _phantom: PhantomData,
113 },
114 Stream::new(
115 to.clone(),
116 HfPlusNode::Persist(Box::new(HfPlusNode::Network {
117 from_location: LocationId::ExternalProcess(self.id),
118 from_key: Some(next_external_port_id),
119 to_location: to.id(),
120 to_key: None,
121 serialize_pipeline: None,
122 instantiate_fn: crate::ir::DebugInstantiate::Building(),
123 deserialize_pipeline: Some(crate::stream::deserialize_bincode::<T>(None)),
124 input: Box::new(HfPlusNode::Source {
125 source: HfPlusSource::ExternalNetwork(),
126 location_kind: LocationId::ExternalProcess(self.id),
127 }),
128 })),
129 ),
130 )
131 }
132}