hydroflow_plus/
stream.rs

1use std::cell::RefCell;
2use std::hash::Hash;
3use std::marker::PhantomData;
4use std::ops::Deref;
5use std::rc::Rc;
6
7use hydroflow::bytes::Bytes;
8use hydroflow::futures::Sink;
9use hydroflow_lang::parse::Pipeline;
10use serde::de::DeserializeOwned;
11use serde::Serialize;
12use stageleft::{q, IntoQuotedMut, Quoted};
13use syn::parse_quote;
14
15use crate::builder::FLOW_USED_MESSAGE;
16use crate::cycle::{CycleCollection, CycleComplete, DeferTick, ForwardRefMarker, TickCycleMarker};
17use crate::ir::{DebugInstantiate, HfPlusLeaf, HfPlusNode, TeeNode};
18use crate::location::cluster::ClusterSelfId;
19use crate::location::external_process::{ExternalBincodeStream, ExternalBytesPort};
20use crate::location::{
21    check_matching_location, CanSend, ExternalProcess, Location, LocationId, NoTick, Tick,
22};
23use crate::staging_util::get_this_crate;
24use crate::{Cluster, ClusterId, Optional, Process, Singleton};
25
26/// Marks the stream as being unbounded, which means that it is not
27/// guaranteed to be complete in finite time.
28pub enum Unbounded {}
29
30/// Marks the stream as being bounded, which means that it is guaranteed
31/// to be complete in finite time.
32pub enum Bounded {}
33
34/// An ordered sequence stream of elements of type `T`.
35///
36/// Type Parameters:
37/// - `T`: the type of elements in the stream
38/// - `L`: the location where the stream is being materialized
39/// - `B`: the boundedness of the stream, which is either [`Bounded`]
40///    or [`Unbounded`]
41pub struct Stream<T, L, B> {
42    location: L,
43    pub(crate) ir_node: RefCell<HfPlusNode>,
44
45    _phantom: PhantomData<(T, L, B)>,
46}
47
48impl<'a, T, L: Location<'a>, B> Stream<T, L, B> {
49    fn location_kind(&self) -> LocationId {
50        self.location.id()
51    }
52}
53
54impl<'a, T, L: Location<'a>> DeferTick for Stream<T, Tick<L>, Bounded> {
55    fn defer_tick(self) -> Self {
56        Stream::defer_tick(self)
57    }
58}
59
60impl<'a, T, L: Location<'a>> CycleCollection<'a, TickCycleMarker> for Stream<T, Tick<L>, Bounded> {
61    type Location = Tick<L>;
62
63    fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
64        let location_id = location.id();
65        Stream::new(
66            location,
67            HfPlusNode::CycleSource {
68                ident,
69                location_kind: location_id,
70            },
71        )
72    }
73}
74
75impl<'a, T, L: Location<'a>> CycleComplete<'a, TickCycleMarker> for Stream<T, Tick<L>, Bounded> {
76    fn complete(self, ident: syn::Ident) {
77        self.location
78            .flow_state()
79            .borrow_mut()
80            .leaves
81            .as_mut()
82            .expect(FLOW_USED_MESSAGE)
83            .push(HfPlusLeaf::CycleSink {
84                ident,
85                location_kind: self.location_kind(),
86                input: Box::new(self.ir_node.into_inner()),
87            });
88    }
89}
90
91impl<'a, T, L: Location<'a> + NoTick, B> CycleCollection<'a, ForwardRefMarker> for Stream<T, L, B> {
92    type Location = L;
93
94    fn create_source(ident: syn::Ident, location: L) -> Self {
95        let location_id = location.id();
96        Stream::new(
97            location,
98            HfPlusNode::Persist(Box::new(HfPlusNode::CycleSource {
99                ident,
100                location_kind: location_id,
101            })),
102        )
103    }
104}
105
106impl<'a, T, L: Location<'a> + NoTick, B> CycleComplete<'a, ForwardRefMarker> for Stream<T, L, B> {
107    fn complete(self, ident: syn::Ident) {
108        self.location
109            .flow_state()
110            .borrow_mut()
111            .leaves
112            .as_mut()
113            .expect(FLOW_USED_MESSAGE)
114            .push(HfPlusLeaf::CycleSink {
115                ident,
116                location_kind: self.location_kind(),
117                input: Box::new(HfPlusNode::Unpersist(Box::new(self.ir_node.into_inner()))),
118            });
119    }
120}
121
122impl<'a, T, L: Location<'a>, B> Stream<T, L, B> {
123    pub(crate) fn new(location: L, ir_node: HfPlusNode) -> Self {
124        Stream {
125            location,
126            ir_node: RefCell::new(ir_node),
127            _phantom: PhantomData,
128        }
129    }
130}
131
132impl<'a, T: Clone, L: Location<'a>, B> Clone for Stream<T, L, B> {
133    fn clone(&self) -> Self {
134        if !matches!(self.ir_node.borrow().deref(), HfPlusNode::Tee { .. }) {
135            let orig_ir_node = self.ir_node.replace(HfPlusNode::Placeholder);
136            *self.ir_node.borrow_mut() = HfPlusNode::Tee {
137                inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
138            };
139        }
140
141        if let HfPlusNode::Tee { inner } = self.ir_node.borrow().deref() {
142            Stream {
143                location: self.location.clone(),
144                ir_node: HfPlusNode::Tee {
145                    inner: TeeNode(inner.0.clone()),
146                }
147                .into(),
148                _phantom: PhantomData,
149            }
150        } else {
151            unreachable!()
152        }
153    }
154}
155
156impl<'a, T, L: Location<'a>, B> Stream<T, L, B> {
157    pub fn map<U, F: Fn(T) -> U + 'a>(self, f: impl IntoQuotedMut<'a, F>) -> Stream<U, L, B> {
158        Stream::new(
159            self.location,
160            HfPlusNode::Map {
161                f: f.splice_fn1().into(),
162                input: Box::new(self.ir_node.into_inner()),
163            },
164        )
165    }
166
167    pub fn cloned(self) -> Stream<T, L, B>
168    where
169        T: Clone,
170    {
171        self.map(q!(|d| d.clone()))
172    }
173
174    pub fn flat_map<U, I: IntoIterator<Item = U>, F: Fn(T) -> I + 'a>(
175        self,
176        f: impl IntoQuotedMut<'a, F>,
177    ) -> Stream<U, L, B> {
178        Stream::new(
179            self.location,
180            HfPlusNode::FlatMap {
181                f: f.splice_fn1().into(),
182                input: Box::new(self.ir_node.into_inner()),
183            },
184        )
185    }
186
187    pub fn flatten<U>(self) -> Stream<U, L, B>
188    where
189        T: IntoIterator<Item = U>,
190    {
191        self.flat_map(q!(|d| d))
192    }
193
194    pub fn filter<F: Fn(&T) -> bool + 'a>(self, f: impl IntoQuotedMut<'a, F>) -> Stream<T, L, B> {
195        Stream::new(
196            self.location,
197            HfPlusNode::Filter {
198                f: f.splice_fn1_borrow().into(),
199                input: Box::new(self.ir_node.into_inner()),
200            },
201        )
202    }
203
204    pub fn filter_map<U, F: Fn(T) -> Option<U> + 'a>(
205        self,
206        f: impl IntoQuotedMut<'a, F>,
207    ) -> Stream<U, L, B> {
208        Stream::new(
209            self.location,
210            HfPlusNode::FilterMap {
211                f: f.splice_fn1().into(),
212                input: Box::new(self.ir_node.into_inner()),
213            },
214        )
215    }
216
217    pub fn cross_singleton<O>(
218        self,
219        other: impl Into<Optional<O, L, Bounded>>,
220    ) -> Stream<(T, O), L, B>
221    where
222        O: Clone,
223    {
224        let other: Optional<O, L, Bounded> = other.into();
225        check_matching_location(&self.location, &other.location);
226
227        Stream::new(
228            self.location,
229            HfPlusNode::CrossSingleton(
230                Box::new(self.ir_node.into_inner()),
231                Box::new(other.ir_node.into_inner()),
232            ),
233        )
234    }
235
236    /// Allow this stream through if the other stream has elements, otherwise the output is empty.
237    pub fn continue_if<U>(self, signal: Optional<U, L, Bounded>) -> Stream<T, L, B> {
238        self.cross_singleton(signal.map(q!(|_u| ())))
239            .map(q!(|(d, _signal)| d))
240    }
241
242    /// Allow this stream through if the other stream is empty, otherwise the output is empty.
243    pub fn continue_unless<U>(self, other: Optional<U, L, Bounded>) -> Stream<T, L, B> {
244        self.continue_if(other.into_stream().count().filter(q!(|c| *c == 0)))
245    }
246
247    pub fn cross_product<O>(self, other: Stream<O, L, B>) -> Stream<(T, O), L, B>
248    where
249        T: Clone,
250        O: Clone,
251    {
252        check_matching_location(&self.location, &other.location);
253
254        Stream::new(
255            self.location,
256            HfPlusNode::CrossProduct(
257                Box::new(self.ir_node.into_inner()),
258                Box::new(other.ir_node.into_inner()),
259            ),
260        )
261    }
262
263    pub fn union(self, other: Stream<T, L, B>) -> Stream<T, L, B> {
264        check_matching_location(&self.location, &other.location);
265
266        Stream::new(
267            self.location,
268            HfPlusNode::Union(
269                Box::new(self.ir_node.into_inner()),
270                Box::new(other.ir_node.into_inner()),
271            ),
272        )
273    }
274
275    pub fn enumerate(self) -> Stream<(usize, T), L, B> {
276        Stream::new(
277            self.location,
278            HfPlusNode::Enumerate(Box::new(self.ir_node.into_inner())),
279        )
280    }
281
282    pub fn unique(self) -> Stream<T, L, B>
283    where
284        T: Eq + Hash,
285    {
286        Stream::new(
287            self.location,
288            HfPlusNode::Unique(Box::new(self.ir_node.into_inner())),
289        )
290    }
291
292    pub fn filter_not_in(self, other: Stream<T, L, Bounded>) -> Stream<T, L, Bounded>
293    where
294        T: Eq + Hash,
295    {
296        check_matching_location(&self.location, &other.location);
297
298        Stream::new(
299            self.location,
300            HfPlusNode::Difference(
301                Box::new(self.ir_node.into_inner()),
302                Box::new(other.ir_node.into_inner()),
303            ),
304        )
305    }
306
307    pub fn first(self) -> Optional<T, L, Bounded> {
308        Optional::new(self.location, self.ir_node.into_inner())
309    }
310
311    pub fn inspect<F: Fn(&T) + 'a>(self, f: impl IntoQuotedMut<'a, F>) -> Stream<T, L, B> {
312        if L::is_top_level() {
313            Stream::new(
314                self.location,
315                HfPlusNode::Persist(Box::new(HfPlusNode::Inspect {
316                    f: f.splice_fn1_borrow().into(),
317                    input: Box::new(HfPlusNode::Unpersist(Box::new(self.ir_node.into_inner()))),
318                })),
319            )
320        } else {
321            Stream::new(
322                self.location,
323                HfPlusNode::Inspect {
324                    f: f.splice_fn1_borrow().into(),
325                    input: Box::new(self.ir_node.into_inner()),
326                },
327            )
328        }
329    }
330
331    pub fn fold<A, I: Fn() -> A + 'a, F: Fn(&mut A, T)>(
332        self,
333        init: impl IntoQuotedMut<'a, I>,
334        comb: impl IntoQuotedMut<'a, F>,
335    ) -> Singleton<A, L, B> {
336        let mut core = HfPlusNode::Fold {
337            init: init.splice_fn0().into(),
338            acc: comb.splice_fn2_borrow_mut().into(),
339            input: Box::new(self.ir_node.into_inner()),
340        };
341
342        if L::is_top_level() {
343            // top-level (possibly unbounded) singletons are represented as
344            // a stream which produces all values from all ticks every tick,
345            // so Unpersist will always give the lastest aggregation
346            core = HfPlusNode::Persist(Box::new(core));
347        }
348
349        Singleton::new(self.location, core)
350    }
351
352    pub fn reduce<F: Fn(&mut T, T) + 'a>(
353        self,
354        comb: impl IntoQuotedMut<'a, F>,
355    ) -> Optional<T, L, B> {
356        let mut core = HfPlusNode::Reduce {
357            f: comb.splice_fn2_borrow_mut().into(),
358            input: Box::new(self.ir_node.into_inner()),
359        };
360
361        if L::is_top_level() {
362            core = HfPlusNode::Persist(Box::new(core));
363        }
364
365        Optional::new(self.location, core)
366    }
367
368    pub fn max(self) -> Optional<T, L, B>
369    where
370        T: Ord,
371    {
372        self.reduce(q!(|curr, new| {
373            if new > *curr {
374                *curr = new;
375            }
376        }))
377    }
378
379    pub fn min(self) -> Optional<T, L, B>
380    where
381        T: Ord,
382    {
383        self.reduce(q!(|curr, new| {
384            if new < *curr {
385                *curr = new;
386            }
387        }))
388    }
389
390    pub fn count(self) -> Singleton<usize, L, B> {
391        self.fold(q!(|| 0usize), q!(|count, _| *count += 1))
392    }
393}
394
395impl<'a, T, L: Location<'a>> Stream<T, L, Bounded> {
396    pub fn sort(self) -> Stream<T, L, Bounded>
397    where
398        T: Ord,
399    {
400        Stream::new(
401            self.location,
402            HfPlusNode::Sort(Box::new(self.ir_node.into_inner())),
403        )
404    }
405}
406
407impl<'a, K, V1, L: Location<'a>, B> Stream<(K, V1), L, B> {
408    pub fn join<V2>(self, n: Stream<(K, V2), L, B>) -> Stream<(K, (V1, V2)), L, B>
409    where
410        K: Eq + Hash,
411    {
412        check_matching_location(&self.location, &n.location);
413
414        Stream::new(
415            self.location,
416            HfPlusNode::Join(
417                Box::new(self.ir_node.into_inner()),
418                Box::new(n.ir_node.into_inner()),
419            ),
420        )
421    }
422
423    pub fn anti_join(self, n: Stream<K, L, Bounded>) -> Stream<(K, V1), L, B>
424    where
425        K: Eq + Hash,
426    {
427        check_matching_location(&self.location, &n.location);
428
429        Stream::new(
430            self.location,
431            HfPlusNode::AntiJoin(
432                Box::new(self.ir_node.into_inner()),
433                Box::new(n.ir_node.into_inner()),
434            ),
435        )
436    }
437}
438
439impl<'a, K: Eq + Hash, V, L: Location<'a>> Stream<(K, V), Tick<L>, Bounded> {
440    pub fn fold_keyed<A, I: Fn() -> A + 'a, F: Fn(&mut A, V) + 'a>(
441        self,
442        init: impl IntoQuotedMut<'a, I>,
443        comb: impl IntoQuotedMut<'a, F>,
444    ) -> Stream<(K, A), Tick<L>, Bounded> {
445        Stream::new(
446            self.location,
447            HfPlusNode::FoldKeyed {
448                init: init.splice_fn0().into(),
449                acc: comb.splice_fn2_borrow_mut().into(),
450                input: Box::new(self.ir_node.into_inner()),
451            },
452        )
453    }
454
455    pub fn reduce_keyed<F: Fn(&mut V, V) + 'a>(
456        self,
457        comb: impl IntoQuotedMut<'a, F>,
458    ) -> Stream<(K, V), Tick<L>, Bounded> {
459        Stream::new(
460            self.location,
461            HfPlusNode::ReduceKeyed {
462                f: comb.splice_fn2_borrow_mut().into(),
463                input: Box::new(self.ir_node.into_inner()),
464            },
465        )
466    }
467}
468
469impl<'a, T, L: Location<'a> + NoTick, B> Stream<T, L, B> {
470    pub fn tick_batch(self, tick: &Tick<L>) -> Stream<T, Tick<L>, Bounded> {
471        Stream::new(
472            tick.clone(),
473            HfPlusNode::Unpersist(Box::new(self.ir_node.into_inner())),
474        )
475    }
476
477    pub fn tick_prefix(self, tick: &Tick<L>) -> Stream<T, Tick<L>, Bounded>
478    where
479        T: Clone,
480    {
481        self.tick_batch(tick).persist()
482    }
483
484    pub fn sample_every(
485        self,
486        interval: impl Quoted<'a, std::time::Duration> + Copy + 'a,
487    ) -> Stream<T, L, Unbounded> {
488        let samples = self.location.source_interval(interval);
489        let tick = self.location.tick();
490        self.tick_batch(&tick)
491            .continue_if(samples.tick_batch(&tick).first())
492            .all_ticks()
493    }
494
495    pub fn for_each<F: Fn(T) + 'a>(self, f: impl IntoQuotedMut<'a, F>) {
496        self.location
497            .flow_state()
498            .borrow_mut()
499            .leaves
500            .as_mut()
501            .expect(FLOW_USED_MESSAGE)
502            .push(HfPlusLeaf::ForEach {
503                input: Box::new(HfPlusNode::Unpersist(Box::new(self.ir_node.into_inner()))),
504                f: f.splice_fn1().into(),
505            });
506    }
507
508    pub fn dest_sink<S: Unpin + Sink<T> + 'a>(self, sink: impl Quoted<'a, S>) {
509        self.location
510            .flow_state()
511            .borrow_mut()
512            .leaves
513            .as_mut()
514            .expect(FLOW_USED_MESSAGE)
515            .push(HfPlusLeaf::DestSink {
516                sink: sink.splice_typed().into(),
517                input: Box::new(self.ir_node.into_inner()),
518            });
519    }
520}
521
522impl<'a, T, L: Location<'a>> Stream<T, Tick<L>, Bounded> {
523    pub fn all_ticks(self) -> Stream<T, L, Unbounded> {
524        Stream::new(
525            self.location.outer().clone(),
526            HfPlusNode::Persist(Box::new(self.ir_node.into_inner())),
527        )
528    }
529
530    pub fn persist(self) -> Stream<T, Tick<L>, Bounded>
531    where
532        T: Clone,
533    {
534        Stream::new(
535            self.location,
536            HfPlusNode::Persist(Box::new(self.ir_node.into_inner())),
537        )
538    }
539
540    pub fn defer_tick(self) -> Stream<T, Tick<L>, Bounded> {
541        Stream::new(
542            self.location,
543            HfPlusNode::DeferTick(Box::new(self.ir_node.into_inner())),
544        )
545    }
546
547    pub fn delta(self) -> Stream<T, Tick<L>, Bounded> {
548        Stream::new(
549            self.location,
550            HfPlusNode::Delta(Box::new(self.ir_node.into_inner())),
551        )
552    }
553}
554
555fn serialize_bincode<T: Serialize>(is_demux: bool) -> Pipeline {
556    let root = get_this_crate();
557
558    let t_type: syn::Type = stageleft::quote_type::<T>();
559
560    if is_demux {
561        parse_quote! {
562            map(|(id, data): (#root::ClusterId<_>, #t_type)| {
563                (id.raw_id, #root::runtime_support::bincode::serialize::<#t_type>(&data).unwrap().into())
564            })
565        }
566    } else {
567        parse_quote! {
568            map(|data| {
569                #root::runtime_support::bincode::serialize::<#t_type>(&data).unwrap().into()
570            })
571        }
572    }
573}
574
575pub(super) fn deserialize_bincode<T: DeserializeOwned>(tagged: Option<syn::Type>) -> Pipeline {
576    let root = get_this_crate();
577
578    let t_type: syn::Type = stageleft::quote_type::<T>();
579
580    if let Some(c_type) = tagged {
581        parse_quote! {
582            map(|res| {
583                let (id, b) = res.unwrap();
584                (#root::ClusterId::<#c_type>::from_raw(id), #root::runtime_support::bincode::deserialize::<#t_type>(&b).unwrap())
585            })
586        }
587    } else {
588        parse_quote! {
589            map(|res| {
590                #root::runtime_support::bincode::deserialize::<#t_type>(&res.unwrap()).unwrap()
591            })
592        }
593    }
594}
595
596impl<'a, T, L: Location<'a> + NoTick, B> Stream<T, L, B> {
597    pub fn decouple_process<P2>(
598        self,
599        other: &Process<'a, P2>,
600    ) -> Stream<T, Process<'a, P2>, Unbounded>
601    where
602        L: CanSend<'a, Process<'a, P2>, In<T> = T, Out<T> = T>,
603        T: Clone + Serialize + DeserializeOwned,
604    {
605        self.send_bincode::<Process<'a, P2>, T>(other)
606    }
607
608    pub fn decouple_cluster<C2, Tag>(
609        self,
610        other: &Cluster<'a, C2>,
611    ) -> Stream<T, Cluster<'a, C2>, Unbounded>
612    where
613        L: CanSend<'a, Cluster<'a, C2>, In<T> = (ClusterId<C2>, T), Out<T> = (Tag, T)>,
614        T: Clone + Serialize + DeserializeOwned,
615    {
616        let self_node_id = match self.location_kind() {
617            LocationId::Cluster(cluster_id) => ClusterSelfId {
618                id: cluster_id,
619                _phantom: PhantomData,
620            },
621            _ => panic!("decouple_cluster must be called on a cluster"),
622        };
623
624        self.map(q!(move |b| (self_node_id, b.clone())))
625            .send_bincode_interleaved(other)
626    }
627
628    pub fn send_bincode<L2: Location<'a>, CoreType>(
629        self,
630        other: &L2,
631    ) -> Stream<L::Out<CoreType>, L2, Unbounded>
632    where
633        L: CanSend<'a, L2, In<CoreType> = T>,
634        CoreType: Serialize + DeserializeOwned,
635    {
636        let serialize_pipeline = Some(serialize_bincode::<CoreType>(L::is_demux()));
637
638        let deserialize_pipeline = Some(deserialize_bincode::<CoreType>(L::tagged_type()));
639
640        Stream::new(
641            other.clone(),
642            HfPlusNode::Network {
643                from_location: self.location_kind(),
644                from_key: None,
645                to_location: other.id(),
646                to_key: None,
647                serialize_pipeline,
648                instantiate_fn: DebugInstantiate::Building(),
649                deserialize_pipeline,
650                input: Box::new(self.ir_node.into_inner()),
651            },
652        )
653    }
654
655    pub fn send_bincode_external<L2: 'a, CoreType>(
656        self,
657        other: &ExternalProcess<L2>,
658    ) -> ExternalBincodeStream<L::Out<CoreType>>
659    where
660        L: CanSend<'a, ExternalProcess<'a, L2>, In<CoreType> = T, Out<CoreType> = CoreType>,
661        CoreType: Serialize + DeserializeOwned,
662        // for now, we restirct Out<CoreType> to be CoreType, which means no tagged cluster -> external
663    {
664        let serialize_pipeline = Some(serialize_bincode::<CoreType>(L::is_demux()));
665
666        let mut flow_state_borrow = self.location.flow_state().borrow_mut();
667
668        let external_key = flow_state_borrow.next_external_out;
669        flow_state_borrow.next_external_out += 1;
670
671        let leaves = flow_state_borrow.leaves.as_mut().expect("Attempted to add a leaf to a flow that has already been finalized. No leaves can be added after the flow has been compiled()");
672
673        let dummy_f: syn::Expr = syn::parse_quote!(());
674
675        leaves.push(HfPlusLeaf::ForEach {
676            f: dummy_f.into(),
677            input: Box::new(HfPlusNode::Network {
678                from_location: self.location_kind(),
679                from_key: None,
680                to_location: other.id(),
681                to_key: Some(external_key),
682                serialize_pipeline,
683                instantiate_fn: DebugInstantiate::Building(),
684                deserialize_pipeline: None,
685                input: Box::new(self.ir_node.into_inner()),
686            }),
687        });
688
689        ExternalBincodeStream {
690            process_id: other.id,
691            port_id: external_key,
692            _phantom: PhantomData,
693        }
694    }
695
696    pub fn send_bytes<L2: Location<'a>>(self, other: &L2) -> Stream<L::Out<Bytes>, L2, Unbounded>
697    where
698        L: CanSend<'a, L2, In<Bytes> = T>,
699    {
700        let root = get_this_crate();
701        Stream::new(
702            other.clone(),
703            HfPlusNode::Network {
704                from_location: self.location_kind(),
705                from_key: None,
706                to_location: other.id(),
707                to_key: None,
708                serialize_pipeline: None,
709                instantiate_fn: DebugInstantiate::Building(),
710                deserialize_pipeline: if let Some(c_type) = L::tagged_type() {
711                    Some(
712                        parse_quote!(map(|(id, b)| (#root::ClusterId<#c_type>::from_raw(id), b.unwrap().freeze()))),
713                    )
714                } else {
715                    Some(parse_quote!(map(|b| b.unwrap().freeze())))
716                },
717                input: Box::new(self.ir_node.into_inner()),
718            },
719        )
720    }
721
722    pub fn send_bytes_external<L2: 'a>(self, other: &ExternalProcess<L2>) -> ExternalBytesPort
723    where
724        L: CanSend<'a, ExternalProcess<'a, L2>, In<Bytes> = T, Out<Bytes> = Bytes>,
725    {
726        let mut flow_state_borrow = self.location.flow_state().borrow_mut();
727        let external_key = flow_state_borrow.next_external_out;
728        flow_state_borrow.next_external_out += 1;
729
730        let leaves = flow_state_borrow.leaves.as_mut().expect("Attempted to add a leaf to a flow that has already been finalized. No leaves can be added after the flow has been compiled()");
731
732        let dummy_f: syn::Expr = syn::parse_quote!(());
733
734        leaves.push(HfPlusLeaf::ForEach {
735            f: dummy_f.into(),
736            input: Box::new(HfPlusNode::Network {
737                from_location: self.location_kind(),
738                from_key: None,
739                to_location: other.id(),
740                to_key: Some(external_key),
741                serialize_pipeline: None,
742                instantiate_fn: DebugInstantiate::Building(),
743                deserialize_pipeline: None,
744                input: Box::new(self.ir_node.into_inner()),
745            }),
746        });
747
748        ExternalBytesPort {
749            process_id: other.id,
750            port_id: external_key,
751        }
752    }
753
754    pub fn send_bincode_interleaved<L2: Location<'a>, Tag, CoreType>(
755        self,
756        other: &L2,
757    ) -> Stream<CoreType, L2, Unbounded>
758    where
759        L: CanSend<'a, L2, In<CoreType> = T, Out<CoreType> = (Tag, CoreType)>,
760        CoreType: Serialize + DeserializeOwned,
761    {
762        self.send_bincode::<L2, CoreType>(other).map(q!(|(_, b)| b))
763    }
764
765    pub fn send_bytes_interleaved<L2: Location<'a>, Tag>(
766        self,
767        other: &L2,
768    ) -> Stream<Bytes, L2, Unbounded>
769    where
770        L: CanSend<'a, L2, In<Bytes> = T, Out<Bytes> = (Tag, Bytes)>,
771    {
772        self.send_bytes::<L2>(other).map(q!(|(_, b)| b))
773    }
774
775    pub fn broadcast_bincode<C2>(
776        self,
777        other: &Cluster<'a, C2>,
778    ) -> Stream<L::Out<T>, Cluster<'a, C2>, Unbounded>
779    where
780        L: CanSend<'a, Cluster<'a, C2>, In<T> = (ClusterId<C2>, T)>,
781        T: Clone + Serialize + DeserializeOwned,
782    {
783        let ids = other.members();
784
785        self.flat_map(q!(|b| ids.iter().map(move |id| (
786            ::std::clone::Clone::clone(id),
787            ::std::clone::Clone::clone(&b)
788        ))))
789        .send_bincode(other)
790    }
791
792    pub fn broadcast_bincode_interleaved<C2, Tag>(
793        self,
794        other: &Cluster<'a, C2>,
795    ) -> Stream<T, Cluster<'a, C2>, Unbounded>
796    where
797        L: CanSend<'a, Cluster<'a, C2>, In<T> = (ClusterId<C2>, T), Out<T> = (Tag, T)> + 'a,
798        T: Clone + Serialize + DeserializeOwned,
799    {
800        self.broadcast_bincode(other).map(q!(|(_, b)| b))
801    }
802
803    pub fn broadcast_bytes<C2>(
804        self,
805        other: &Cluster<'a, C2>,
806    ) -> Stream<L::Out<Bytes>, Cluster<'a, C2>, Unbounded>
807    where
808        L: CanSend<'a, Cluster<'a, C2>, In<Bytes> = (ClusterId<C2>, T)> + 'a,
809        T: Clone,
810    {
811        let ids = other.members();
812
813        self.flat_map(q!(|b| ids.iter().map(move |id| (
814            ::std::clone::Clone::clone(id),
815            ::std::clone::Clone::clone(&b)
816        ))))
817        .send_bytes(other)
818    }
819
820    pub fn broadcast_bytes_interleaved<C2, Tag>(
821        self,
822        other: &Cluster<'a, C2>,
823    ) -> Stream<Bytes, Cluster<'a, C2>, Unbounded>
824    where
825        L: CanSend<'a, Cluster<'a, C2>, In<Bytes> = (ClusterId<C2>, T), Out<Bytes> = (Tag, Bytes)>
826            + 'a,
827        T: Clone,
828    {
829        self.broadcast_bytes(other).map(q!(|(_, b)| b))
830    }
831}
832
833#[cfg(test)]
834mod tests {
835    use hydro_deploy::Deployment;
836    use hydroflow::futures::StreamExt;
837    use serde::{Deserialize, Serialize};
838    use stageleft::q;
839
840    use crate::location::Location;
841    use crate::FlowBuilder;
842
843    struct P1 {}
844    struct P2 {}
845
846    #[derive(Serialize, Deserialize, Debug)]
847    struct SendOverNetwork {
848        n: u32,
849    }
850
851    #[tokio::test]
852    async fn first_ten_distributed() {
853        let mut deployment = Deployment::new();
854
855        let flow = FlowBuilder::new();
856        let first_node = flow.process::<P1>();
857        let second_node = flow.process::<P2>();
858        let external = flow.external_process::<P2>();
859
860        let numbers = first_node.source_iter(q!(0..10));
861        let out_port = numbers
862            .map(q!(|n| SendOverNetwork { n }))
863            .send_bincode(&second_node)
864            .send_bincode_external(&external);
865
866        let nodes = flow
867            .with_process(&first_node, deployment.Localhost())
868            .with_process(&second_node, deployment.Localhost())
869            .with_external(&external, deployment.Localhost())
870            .deploy(&mut deployment);
871
872        deployment.deploy().await.unwrap();
873
874        let mut external_out = nodes.connect_source_bincode(out_port).await;
875
876        deployment.start().await.unwrap();
877
878        for i in 0..10 {
879            assert_eq!(external_out.next().await.unwrap().n, i);
880        }
881    }
882}