hydroflow_plus/location/
external_process.rs

1use std::marker::PhantomData;
2
3use hydroflow::bytes::Bytes;
4use serde::de::DeserializeOwned;
5use serde::Serialize;
6
7use super::{Location, LocationId, NoTick};
8use crate::builder::FlowState;
9use crate::ir::{HfPlusNode, HfPlusSource};
10use crate::{Stream, Unbounded};
11
12pub struct ExternalBytesPort {
13    pub(crate) process_id: usize,
14    pub(crate) port_id: usize,
15}
16
17pub struct ExternalBincodeSink<T: Serialize> {
18    pub(crate) process_id: usize,
19    pub(crate) port_id: usize,
20    pub(crate) _phantom: PhantomData<T>,
21}
22
23pub struct ExternalBincodeStream<T: DeserializeOwned> {
24    pub(crate) process_id: usize,
25    pub(crate) port_id: usize,
26    pub(crate) _phantom: PhantomData<T>,
27}
28
29pub struct ExternalProcess<'a, P> {
30    pub(crate) id: usize,
31
32    pub(crate) flow_state: FlowState,
33
34    pub(crate) _phantom: PhantomData<&'a &'a mut P>,
35}
36
37impl<P> Clone for ExternalProcess<'_, P> {
38    fn clone(&self) -> Self {
39        ExternalProcess {
40            id: self.id,
41            flow_state: self.flow_state.clone(),
42            _phantom: PhantomData,
43        }
44    }
45}
46
47impl<'a, P> Location<'a> for ExternalProcess<'a, P> {
48    fn id(&self) -> LocationId {
49        LocationId::ExternalProcess(self.id)
50    }
51
52    fn flow_state(&self) -> &FlowState {
53        &self.flow_state
54    }
55
56    fn is_top_level() -> bool {
57        true
58    }
59}
60
61impl<'a, P> ExternalProcess<'a, P> {
62    pub fn source_external_bytes<L: Location<'a> + NoTick>(
63        &self,
64        to: &L,
65    ) -> (ExternalBytesPort, Stream<Bytes, L, Unbounded>) {
66        let next_external_port_id = {
67            let mut flow_state = self.flow_state.borrow_mut();
68            let id = flow_state.next_external_out;
69            flow_state.next_external_out += 1;
70            id
71        };
72
73        (
74            ExternalBytesPort {
75                process_id: self.id,
76                port_id: next_external_port_id,
77            },
78            Stream::new(
79                to.clone(),
80                HfPlusNode::Persist(Box::new(HfPlusNode::Network {
81                    from_location: LocationId::ExternalProcess(self.id),
82                    from_key: Some(next_external_port_id),
83                    to_location: to.id(),
84                    to_key: None,
85                    serialize_pipeline: None,
86                    instantiate_fn: crate::ir::DebugInstantiate::Building(),
87                    deserialize_pipeline: Some(syn::parse_quote!(map(|b| b.unwrap().freeze()))),
88                    input: Box::new(HfPlusNode::Source {
89                        source: HfPlusSource::ExternalNetwork(),
90                        location_kind: LocationId::ExternalProcess(self.id),
91                    }),
92                })),
93            ),
94        )
95    }
96
97    pub fn source_external_bincode<L: Location<'a> + NoTick, T: Serialize + DeserializeOwned>(
98        &self,
99        to: &L,
100    ) -> (ExternalBincodeSink<T>, Stream<T, L, Unbounded>) {
101        let next_external_port_id = {
102            let mut flow_state = self.flow_state.borrow_mut();
103            let id = flow_state.next_external_out;
104            flow_state.next_external_out += 1;
105            id
106        };
107
108        (
109            ExternalBincodeSink {
110                process_id: self.id,
111                port_id: next_external_port_id,
112                _phantom: PhantomData,
113            },
114            Stream::new(
115                to.clone(),
116                HfPlusNode::Persist(Box::new(HfPlusNode::Network {
117                    from_location: LocationId::ExternalProcess(self.id),
118                    from_key: Some(next_external_port_id),
119                    to_location: to.id(),
120                    to_key: None,
121                    serialize_pipeline: None,
122                    instantiate_fn: crate::ir::DebugInstantiate::Building(),
123                    deserialize_pipeline: Some(crate::stream::deserialize_bincode::<T>(None)),
124                    input: Box::new(HfPlusNode::Source {
125                        source: HfPlusSource::ExternalNetwork(),
126                        location_kind: LocationId::ExternalProcess(self.id),
127                    }),
128                })),
129            ),
130        )
131    }
132}