1use std::cell::RefCell;
2use std::hash::Hash;
3use std::marker::PhantomData;
4use std::ops::Deref;
5use std::rc::Rc;
6
7use hydroflow::bytes::Bytes;
8use hydroflow::futures::Sink;
9use hydroflow_lang::parse::Pipeline;
10use serde::de::DeserializeOwned;
11use serde::Serialize;
12use stageleft::{q, IntoQuotedMut, Quoted};
13use syn::parse_quote;
14
15use crate::builder::FLOW_USED_MESSAGE;
16use crate::cycle::{CycleCollection, CycleComplete, DeferTick, ForwardRefMarker, TickCycleMarker};
17use crate::ir::{DebugInstantiate, HfPlusLeaf, HfPlusNode, TeeNode};
18use crate::location::cluster::ClusterSelfId;
19use crate::location::external_process::{ExternalBincodeStream, ExternalBytesPort};
20use crate::location::{
21 check_matching_location, CanSend, ExternalProcess, Location, LocationId, NoTick, Tick,
22};
23use crate::staging_util::get_this_crate;
24use crate::{Cluster, ClusterId, Optional, Process, Singleton};
25
26pub enum Unbounded {}
29
30pub enum Bounded {}
33
34pub struct Stream<T, L, B> {
42 location: L,
43 pub(crate) ir_node: RefCell<HfPlusNode>,
44
45 _phantom: PhantomData<(T, L, B)>,
46}
47
48impl<'a, T, L: Location<'a>, B> Stream<T, L, B> {
49 fn location_kind(&self) -> LocationId {
50 self.location.id()
51 }
52}
53
54impl<'a, T, L: Location<'a>> DeferTick for Stream<T, Tick<L>, Bounded> {
55 fn defer_tick(self) -> Self {
56 Stream::defer_tick(self)
57 }
58}
59
60impl<'a, T, L: Location<'a>> CycleCollection<'a, TickCycleMarker> for Stream<T, Tick<L>, Bounded> {
61 type Location = Tick<L>;
62
63 fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
64 let location_id = location.id();
65 Stream::new(
66 location,
67 HfPlusNode::CycleSource {
68 ident,
69 location_kind: location_id,
70 },
71 )
72 }
73}
74
75impl<'a, T, L: Location<'a>> CycleComplete<'a, TickCycleMarker> for Stream<T, Tick<L>, Bounded> {
76 fn complete(self, ident: syn::Ident) {
77 self.location
78 .flow_state()
79 .borrow_mut()
80 .leaves
81 .as_mut()
82 .expect(FLOW_USED_MESSAGE)
83 .push(HfPlusLeaf::CycleSink {
84 ident,
85 location_kind: self.location_kind(),
86 input: Box::new(self.ir_node.into_inner()),
87 });
88 }
89}
90
91impl<'a, T, L: Location<'a> + NoTick, B> CycleCollection<'a, ForwardRefMarker> for Stream<T, L, B> {
92 type Location = L;
93
94 fn create_source(ident: syn::Ident, location: L) -> Self {
95 let location_id = location.id();
96 Stream::new(
97 location,
98 HfPlusNode::Persist(Box::new(HfPlusNode::CycleSource {
99 ident,
100 location_kind: location_id,
101 })),
102 )
103 }
104}
105
106impl<'a, T, L: Location<'a> + NoTick, B> CycleComplete<'a, ForwardRefMarker> for Stream<T, L, B> {
107 fn complete(self, ident: syn::Ident) {
108 self.location
109 .flow_state()
110 .borrow_mut()
111 .leaves
112 .as_mut()
113 .expect(FLOW_USED_MESSAGE)
114 .push(HfPlusLeaf::CycleSink {
115 ident,
116 location_kind: self.location_kind(),
117 input: Box::new(HfPlusNode::Unpersist(Box::new(self.ir_node.into_inner()))),
118 });
119 }
120}
121
122impl<'a, T, L: Location<'a>, B> Stream<T, L, B> {
123 pub(crate) fn new(location: L, ir_node: HfPlusNode) -> Self {
124 Stream {
125 location,
126 ir_node: RefCell::new(ir_node),
127 _phantom: PhantomData,
128 }
129 }
130}
131
132impl<'a, T: Clone, L: Location<'a>, B> Clone for Stream<T, L, B> {
133 fn clone(&self) -> Self {
134 if !matches!(self.ir_node.borrow().deref(), HfPlusNode::Tee { .. }) {
135 let orig_ir_node = self.ir_node.replace(HfPlusNode::Placeholder);
136 *self.ir_node.borrow_mut() = HfPlusNode::Tee {
137 inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
138 };
139 }
140
141 if let HfPlusNode::Tee { inner } = self.ir_node.borrow().deref() {
142 Stream {
143 location: self.location.clone(),
144 ir_node: HfPlusNode::Tee {
145 inner: TeeNode(inner.0.clone()),
146 }
147 .into(),
148 _phantom: PhantomData,
149 }
150 } else {
151 unreachable!()
152 }
153 }
154}
155
156impl<'a, T, L: Location<'a>, B> Stream<T, L, B> {
157 pub fn map<U, F: Fn(T) -> U + 'a>(self, f: impl IntoQuotedMut<'a, F>) -> Stream<U, L, B> {
158 Stream::new(
159 self.location,
160 HfPlusNode::Map {
161 f: f.splice_fn1().into(),
162 input: Box::new(self.ir_node.into_inner()),
163 },
164 )
165 }
166
167 pub fn cloned(self) -> Stream<T, L, B>
168 where
169 T: Clone,
170 {
171 self.map(q!(|d| d.clone()))
172 }
173
174 pub fn flat_map<U, I: IntoIterator<Item = U>, F: Fn(T) -> I + 'a>(
175 self,
176 f: impl IntoQuotedMut<'a, F>,
177 ) -> Stream<U, L, B> {
178 Stream::new(
179 self.location,
180 HfPlusNode::FlatMap {
181 f: f.splice_fn1().into(),
182 input: Box::new(self.ir_node.into_inner()),
183 },
184 )
185 }
186
187 pub fn flatten<U>(self) -> Stream<U, L, B>
188 where
189 T: IntoIterator<Item = U>,
190 {
191 self.flat_map(q!(|d| d))
192 }
193
194 pub fn filter<F: Fn(&T) -> bool + 'a>(self, f: impl IntoQuotedMut<'a, F>) -> Stream<T, L, B> {
195 Stream::new(
196 self.location,
197 HfPlusNode::Filter {
198 f: f.splice_fn1_borrow().into(),
199 input: Box::new(self.ir_node.into_inner()),
200 },
201 )
202 }
203
204 pub fn filter_map<U, F: Fn(T) -> Option<U> + 'a>(
205 self,
206 f: impl IntoQuotedMut<'a, F>,
207 ) -> Stream<U, L, B> {
208 Stream::new(
209 self.location,
210 HfPlusNode::FilterMap {
211 f: f.splice_fn1().into(),
212 input: Box::new(self.ir_node.into_inner()),
213 },
214 )
215 }
216
217 pub fn cross_singleton<O>(
218 self,
219 other: impl Into<Optional<O, L, Bounded>>,
220 ) -> Stream<(T, O), L, B>
221 where
222 O: Clone,
223 {
224 let other: Optional<O, L, Bounded> = other.into();
225 check_matching_location(&self.location, &other.location);
226
227 Stream::new(
228 self.location,
229 HfPlusNode::CrossSingleton(
230 Box::new(self.ir_node.into_inner()),
231 Box::new(other.ir_node.into_inner()),
232 ),
233 )
234 }
235
236 pub fn continue_if<U>(self, signal: Optional<U, L, Bounded>) -> Stream<T, L, B> {
238 self.cross_singleton(signal.map(q!(|_u| ())))
239 .map(q!(|(d, _signal)| d))
240 }
241
242 pub fn continue_unless<U>(self, other: Optional<U, L, Bounded>) -> Stream<T, L, B> {
244 self.continue_if(other.into_stream().count().filter(q!(|c| *c == 0)))
245 }
246
247 pub fn cross_product<O>(self, other: Stream<O, L, B>) -> Stream<(T, O), L, B>
248 where
249 T: Clone,
250 O: Clone,
251 {
252 check_matching_location(&self.location, &other.location);
253
254 Stream::new(
255 self.location,
256 HfPlusNode::CrossProduct(
257 Box::new(self.ir_node.into_inner()),
258 Box::new(other.ir_node.into_inner()),
259 ),
260 )
261 }
262
263 pub fn union(self, other: Stream<T, L, B>) -> Stream<T, L, B> {
264 check_matching_location(&self.location, &other.location);
265
266 Stream::new(
267 self.location,
268 HfPlusNode::Union(
269 Box::new(self.ir_node.into_inner()),
270 Box::new(other.ir_node.into_inner()),
271 ),
272 )
273 }
274
275 pub fn enumerate(self) -> Stream<(usize, T), L, B> {
276 Stream::new(
277 self.location,
278 HfPlusNode::Enumerate(Box::new(self.ir_node.into_inner())),
279 )
280 }
281
282 pub fn unique(self) -> Stream<T, L, B>
283 where
284 T: Eq + Hash,
285 {
286 Stream::new(
287 self.location,
288 HfPlusNode::Unique(Box::new(self.ir_node.into_inner())),
289 )
290 }
291
292 pub fn filter_not_in(self, other: Stream<T, L, Bounded>) -> Stream<T, L, Bounded>
293 where
294 T: Eq + Hash,
295 {
296 check_matching_location(&self.location, &other.location);
297
298 Stream::new(
299 self.location,
300 HfPlusNode::Difference(
301 Box::new(self.ir_node.into_inner()),
302 Box::new(other.ir_node.into_inner()),
303 ),
304 )
305 }
306
307 pub fn first(self) -> Optional<T, L, Bounded> {
308 Optional::new(self.location, self.ir_node.into_inner())
309 }
310
311 pub fn inspect<F: Fn(&T) + 'a>(self, f: impl IntoQuotedMut<'a, F>) -> Stream<T, L, B> {
312 if L::is_top_level() {
313 Stream::new(
314 self.location,
315 HfPlusNode::Persist(Box::new(HfPlusNode::Inspect {
316 f: f.splice_fn1_borrow().into(),
317 input: Box::new(HfPlusNode::Unpersist(Box::new(self.ir_node.into_inner()))),
318 })),
319 )
320 } else {
321 Stream::new(
322 self.location,
323 HfPlusNode::Inspect {
324 f: f.splice_fn1_borrow().into(),
325 input: Box::new(self.ir_node.into_inner()),
326 },
327 )
328 }
329 }
330
331 pub fn fold<A, I: Fn() -> A + 'a, F: Fn(&mut A, T)>(
332 self,
333 init: impl IntoQuotedMut<'a, I>,
334 comb: impl IntoQuotedMut<'a, F>,
335 ) -> Singleton<A, L, B> {
336 let mut core = HfPlusNode::Fold {
337 init: init.splice_fn0().into(),
338 acc: comb.splice_fn2_borrow_mut().into(),
339 input: Box::new(self.ir_node.into_inner()),
340 };
341
342 if L::is_top_level() {
343 core = HfPlusNode::Persist(Box::new(core));
347 }
348
349 Singleton::new(self.location, core)
350 }
351
352 pub fn reduce<F: Fn(&mut T, T) + 'a>(
353 self,
354 comb: impl IntoQuotedMut<'a, F>,
355 ) -> Optional<T, L, B> {
356 let mut core = HfPlusNode::Reduce {
357 f: comb.splice_fn2_borrow_mut().into(),
358 input: Box::new(self.ir_node.into_inner()),
359 };
360
361 if L::is_top_level() {
362 core = HfPlusNode::Persist(Box::new(core));
363 }
364
365 Optional::new(self.location, core)
366 }
367
368 pub fn max(self) -> Optional<T, L, B>
369 where
370 T: Ord,
371 {
372 self.reduce(q!(|curr, new| {
373 if new > *curr {
374 *curr = new;
375 }
376 }))
377 }
378
379 pub fn min(self) -> Optional<T, L, B>
380 where
381 T: Ord,
382 {
383 self.reduce(q!(|curr, new| {
384 if new < *curr {
385 *curr = new;
386 }
387 }))
388 }
389
390 pub fn count(self) -> Singleton<usize, L, B> {
391 self.fold(q!(|| 0usize), q!(|count, _| *count += 1))
392 }
393}
394
395impl<'a, T, L: Location<'a>> Stream<T, L, Bounded> {
396 pub fn sort(self) -> Stream<T, L, Bounded>
397 where
398 T: Ord,
399 {
400 Stream::new(
401 self.location,
402 HfPlusNode::Sort(Box::new(self.ir_node.into_inner())),
403 )
404 }
405}
406
407impl<'a, K, V1, L: Location<'a>, B> Stream<(K, V1), L, B> {
408 pub fn join<V2>(self, n: Stream<(K, V2), L, B>) -> Stream<(K, (V1, V2)), L, B>
409 where
410 K: Eq + Hash,
411 {
412 check_matching_location(&self.location, &n.location);
413
414 Stream::new(
415 self.location,
416 HfPlusNode::Join(
417 Box::new(self.ir_node.into_inner()),
418 Box::new(n.ir_node.into_inner()),
419 ),
420 )
421 }
422
423 pub fn anti_join(self, n: Stream<K, L, Bounded>) -> Stream<(K, V1), L, B>
424 where
425 K: Eq + Hash,
426 {
427 check_matching_location(&self.location, &n.location);
428
429 Stream::new(
430 self.location,
431 HfPlusNode::AntiJoin(
432 Box::new(self.ir_node.into_inner()),
433 Box::new(n.ir_node.into_inner()),
434 ),
435 )
436 }
437}
438
439impl<'a, K: Eq + Hash, V, L: Location<'a>> Stream<(K, V), Tick<L>, Bounded> {
440 pub fn fold_keyed<A, I: Fn() -> A + 'a, F: Fn(&mut A, V) + 'a>(
441 self,
442 init: impl IntoQuotedMut<'a, I>,
443 comb: impl IntoQuotedMut<'a, F>,
444 ) -> Stream<(K, A), Tick<L>, Bounded> {
445 Stream::new(
446 self.location,
447 HfPlusNode::FoldKeyed {
448 init: init.splice_fn0().into(),
449 acc: comb.splice_fn2_borrow_mut().into(),
450 input: Box::new(self.ir_node.into_inner()),
451 },
452 )
453 }
454
455 pub fn reduce_keyed<F: Fn(&mut V, V) + 'a>(
456 self,
457 comb: impl IntoQuotedMut<'a, F>,
458 ) -> Stream<(K, V), Tick<L>, Bounded> {
459 Stream::new(
460 self.location,
461 HfPlusNode::ReduceKeyed {
462 f: comb.splice_fn2_borrow_mut().into(),
463 input: Box::new(self.ir_node.into_inner()),
464 },
465 )
466 }
467}
468
469impl<'a, T, L: Location<'a> + NoTick, B> Stream<T, L, B> {
470 pub fn tick_batch(self, tick: &Tick<L>) -> Stream<T, Tick<L>, Bounded> {
471 Stream::new(
472 tick.clone(),
473 HfPlusNode::Unpersist(Box::new(self.ir_node.into_inner())),
474 )
475 }
476
477 pub fn tick_prefix(self, tick: &Tick<L>) -> Stream<T, Tick<L>, Bounded>
478 where
479 T: Clone,
480 {
481 self.tick_batch(tick).persist()
482 }
483
484 pub fn sample_every(
485 self,
486 interval: impl Quoted<'a, std::time::Duration> + Copy + 'a,
487 ) -> Stream<T, L, Unbounded> {
488 let samples = self.location.source_interval(interval);
489 let tick = self.location.tick();
490 self.tick_batch(&tick)
491 .continue_if(samples.tick_batch(&tick).first())
492 .all_ticks()
493 }
494
495 pub fn for_each<F: Fn(T) + 'a>(self, f: impl IntoQuotedMut<'a, F>) {
496 self.location
497 .flow_state()
498 .borrow_mut()
499 .leaves
500 .as_mut()
501 .expect(FLOW_USED_MESSAGE)
502 .push(HfPlusLeaf::ForEach {
503 input: Box::new(HfPlusNode::Unpersist(Box::new(self.ir_node.into_inner()))),
504 f: f.splice_fn1().into(),
505 });
506 }
507
508 pub fn dest_sink<S: Unpin + Sink<T> + 'a>(self, sink: impl Quoted<'a, S>) {
509 self.location
510 .flow_state()
511 .borrow_mut()
512 .leaves
513 .as_mut()
514 .expect(FLOW_USED_MESSAGE)
515 .push(HfPlusLeaf::DestSink {
516 sink: sink.splice_typed().into(),
517 input: Box::new(self.ir_node.into_inner()),
518 });
519 }
520}
521
522impl<'a, T, L: Location<'a>> Stream<T, Tick<L>, Bounded> {
523 pub fn all_ticks(self) -> Stream<T, L, Unbounded> {
524 Stream::new(
525 self.location.outer().clone(),
526 HfPlusNode::Persist(Box::new(self.ir_node.into_inner())),
527 )
528 }
529
530 pub fn persist(self) -> Stream<T, Tick<L>, Bounded>
531 where
532 T: Clone,
533 {
534 Stream::new(
535 self.location,
536 HfPlusNode::Persist(Box::new(self.ir_node.into_inner())),
537 )
538 }
539
540 pub fn defer_tick(self) -> Stream<T, Tick<L>, Bounded> {
541 Stream::new(
542 self.location,
543 HfPlusNode::DeferTick(Box::new(self.ir_node.into_inner())),
544 )
545 }
546
547 pub fn delta(self) -> Stream<T, Tick<L>, Bounded> {
548 Stream::new(
549 self.location,
550 HfPlusNode::Delta(Box::new(self.ir_node.into_inner())),
551 )
552 }
553}
554
555fn serialize_bincode<T: Serialize>(is_demux: bool) -> Pipeline {
556 let root = get_this_crate();
557
558 let t_type: syn::Type = stageleft::quote_type::<T>();
559
560 if is_demux {
561 parse_quote! {
562 map(|(id, data): (#root::ClusterId<_>, #t_type)| {
563 (id.raw_id, #root::runtime_support::bincode::serialize::<#t_type>(&data).unwrap().into())
564 })
565 }
566 } else {
567 parse_quote! {
568 map(|data| {
569 #root::runtime_support::bincode::serialize::<#t_type>(&data).unwrap().into()
570 })
571 }
572 }
573}
574
575pub(super) fn deserialize_bincode<T: DeserializeOwned>(tagged: Option<syn::Type>) -> Pipeline {
576 let root = get_this_crate();
577
578 let t_type: syn::Type = stageleft::quote_type::<T>();
579
580 if let Some(c_type) = tagged {
581 parse_quote! {
582 map(|res| {
583 let (id, b) = res.unwrap();
584 (#root::ClusterId::<#c_type>::from_raw(id), #root::runtime_support::bincode::deserialize::<#t_type>(&b).unwrap())
585 })
586 }
587 } else {
588 parse_quote! {
589 map(|res| {
590 #root::runtime_support::bincode::deserialize::<#t_type>(&res.unwrap()).unwrap()
591 })
592 }
593 }
594}
595
596impl<'a, T, L: Location<'a> + NoTick, B> Stream<T, L, B> {
597 pub fn decouple_process<P2>(
598 self,
599 other: &Process<'a, P2>,
600 ) -> Stream<T, Process<'a, P2>, Unbounded>
601 where
602 L: CanSend<'a, Process<'a, P2>, In<T> = T, Out<T> = T>,
603 T: Clone + Serialize + DeserializeOwned,
604 {
605 self.send_bincode::<Process<'a, P2>, T>(other)
606 }
607
608 pub fn decouple_cluster<C2, Tag>(
609 self,
610 other: &Cluster<'a, C2>,
611 ) -> Stream<T, Cluster<'a, C2>, Unbounded>
612 where
613 L: CanSend<'a, Cluster<'a, C2>, In<T> = (ClusterId<C2>, T), Out<T> = (Tag, T)>,
614 T: Clone + Serialize + DeserializeOwned,
615 {
616 let self_node_id = match self.location_kind() {
617 LocationId::Cluster(cluster_id) => ClusterSelfId {
618 id: cluster_id,
619 _phantom: PhantomData,
620 },
621 _ => panic!("decouple_cluster must be called on a cluster"),
622 };
623
624 self.map(q!(move |b| (self_node_id, b.clone())))
625 .send_bincode_interleaved(other)
626 }
627
628 pub fn send_bincode<L2: Location<'a>, CoreType>(
629 self,
630 other: &L2,
631 ) -> Stream<L::Out<CoreType>, L2, Unbounded>
632 where
633 L: CanSend<'a, L2, In<CoreType> = T>,
634 CoreType: Serialize + DeserializeOwned,
635 {
636 let serialize_pipeline = Some(serialize_bincode::<CoreType>(L::is_demux()));
637
638 let deserialize_pipeline = Some(deserialize_bincode::<CoreType>(L::tagged_type()));
639
640 Stream::new(
641 other.clone(),
642 HfPlusNode::Network {
643 from_location: self.location_kind(),
644 from_key: None,
645 to_location: other.id(),
646 to_key: None,
647 serialize_pipeline,
648 instantiate_fn: DebugInstantiate::Building(),
649 deserialize_pipeline,
650 input: Box::new(self.ir_node.into_inner()),
651 },
652 )
653 }
654
655 pub fn send_bincode_external<L2: 'a, CoreType>(
656 self,
657 other: &ExternalProcess<L2>,
658 ) -> ExternalBincodeStream<L::Out<CoreType>>
659 where
660 L: CanSend<'a, ExternalProcess<'a, L2>, In<CoreType> = T, Out<CoreType> = CoreType>,
661 CoreType: Serialize + DeserializeOwned,
662 {
664 let serialize_pipeline = Some(serialize_bincode::<CoreType>(L::is_demux()));
665
666 let mut flow_state_borrow = self.location.flow_state().borrow_mut();
667
668 let external_key = flow_state_borrow.next_external_out;
669 flow_state_borrow.next_external_out += 1;
670
671 let leaves = flow_state_borrow.leaves.as_mut().expect("Attempted to add a leaf to a flow that has already been finalized. No leaves can be added after the flow has been compiled()");
672
673 let dummy_f: syn::Expr = syn::parse_quote!(());
674
675 leaves.push(HfPlusLeaf::ForEach {
676 f: dummy_f.into(),
677 input: Box::new(HfPlusNode::Network {
678 from_location: self.location_kind(),
679 from_key: None,
680 to_location: other.id(),
681 to_key: Some(external_key),
682 serialize_pipeline,
683 instantiate_fn: DebugInstantiate::Building(),
684 deserialize_pipeline: None,
685 input: Box::new(self.ir_node.into_inner()),
686 }),
687 });
688
689 ExternalBincodeStream {
690 process_id: other.id,
691 port_id: external_key,
692 _phantom: PhantomData,
693 }
694 }
695
696 pub fn send_bytes<L2: Location<'a>>(self, other: &L2) -> Stream<L::Out<Bytes>, L2, Unbounded>
697 where
698 L: CanSend<'a, L2, In<Bytes> = T>,
699 {
700 let root = get_this_crate();
701 Stream::new(
702 other.clone(),
703 HfPlusNode::Network {
704 from_location: self.location_kind(),
705 from_key: None,
706 to_location: other.id(),
707 to_key: None,
708 serialize_pipeline: None,
709 instantiate_fn: DebugInstantiate::Building(),
710 deserialize_pipeline: if let Some(c_type) = L::tagged_type() {
711 Some(
712 parse_quote!(map(|(id, b)| (#root::ClusterId<#c_type>::from_raw(id), b.unwrap().freeze()))),
713 )
714 } else {
715 Some(parse_quote!(map(|b| b.unwrap().freeze())))
716 },
717 input: Box::new(self.ir_node.into_inner()),
718 },
719 )
720 }
721
722 pub fn send_bytes_external<L2: 'a>(self, other: &ExternalProcess<L2>) -> ExternalBytesPort
723 where
724 L: CanSend<'a, ExternalProcess<'a, L2>, In<Bytes> = T, Out<Bytes> = Bytes>,
725 {
726 let mut flow_state_borrow = self.location.flow_state().borrow_mut();
727 let external_key = flow_state_borrow.next_external_out;
728 flow_state_borrow.next_external_out += 1;
729
730 let leaves = flow_state_borrow.leaves.as_mut().expect("Attempted to add a leaf to a flow that has already been finalized. No leaves can be added after the flow has been compiled()");
731
732 let dummy_f: syn::Expr = syn::parse_quote!(());
733
734 leaves.push(HfPlusLeaf::ForEach {
735 f: dummy_f.into(),
736 input: Box::new(HfPlusNode::Network {
737 from_location: self.location_kind(),
738 from_key: None,
739 to_location: other.id(),
740 to_key: Some(external_key),
741 serialize_pipeline: None,
742 instantiate_fn: DebugInstantiate::Building(),
743 deserialize_pipeline: None,
744 input: Box::new(self.ir_node.into_inner()),
745 }),
746 });
747
748 ExternalBytesPort {
749 process_id: other.id,
750 port_id: external_key,
751 }
752 }
753
754 pub fn send_bincode_interleaved<L2: Location<'a>, Tag, CoreType>(
755 self,
756 other: &L2,
757 ) -> Stream<CoreType, L2, Unbounded>
758 where
759 L: CanSend<'a, L2, In<CoreType> = T, Out<CoreType> = (Tag, CoreType)>,
760 CoreType: Serialize + DeserializeOwned,
761 {
762 self.send_bincode::<L2, CoreType>(other).map(q!(|(_, b)| b))
763 }
764
765 pub fn send_bytes_interleaved<L2: Location<'a>, Tag>(
766 self,
767 other: &L2,
768 ) -> Stream<Bytes, L2, Unbounded>
769 where
770 L: CanSend<'a, L2, In<Bytes> = T, Out<Bytes> = (Tag, Bytes)>,
771 {
772 self.send_bytes::<L2>(other).map(q!(|(_, b)| b))
773 }
774
775 pub fn broadcast_bincode<C2>(
776 self,
777 other: &Cluster<'a, C2>,
778 ) -> Stream<L::Out<T>, Cluster<'a, C2>, Unbounded>
779 where
780 L: CanSend<'a, Cluster<'a, C2>, In<T> = (ClusterId<C2>, T)>,
781 T: Clone + Serialize + DeserializeOwned,
782 {
783 let ids = other.members();
784
785 self.flat_map(q!(|b| ids.iter().map(move |id| (
786 ::std::clone::Clone::clone(id),
787 ::std::clone::Clone::clone(&b)
788 ))))
789 .send_bincode(other)
790 }
791
792 pub fn broadcast_bincode_interleaved<C2, Tag>(
793 self,
794 other: &Cluster<'a, C2>,
795 ) -> Stream<T, Cluster<'a, C2>, Unbounded>
796 where
797 L: CanSend<'a, Cluster<'a, C2>, In<T> = (ClusterId<C2>, T), Out<T> = (Tag, T)> + 'a,
798 T: Clone + Serialize + DeserializeOwned,
799 {
800 self.broadcast_bincode(other).map(q!(|(_, b)| b))
801 }
802
803 pub fn broadcast_bytes<C2>(
804 self,
805 other: &Cluster<'a, C2>,
806 ) -> Stream<L::Out<Bytes>, Cluster<'a, C2>, Unbounded>
807 where
808 L: CanSend<'a, Cluster<'a, C2>, In<Bytes> = (ClusterId<C2>, T)> + 'a,
809 T: Clone,
810 {
811 let ids = other.members();
812
813 self.flat_map(q!(|b| ids.iter().map(move |id| (
814 ::std::clone::Clone::clone(id),
815 ::std::clone::Clone::clone(&b)
816 ))))
817 .send_bytes(other)
818 }
819
820 pub fn broadcast_bytes_interleaved<C2, Tag>(
821 self,
822 other: &Cluster<'a, C2>,
823 ) -> Stream<Bytes, Cluster<'a, C2>, Unbounded>
824 where
825 L: CanSend<'a, Cluster<'a, C2>, In<Bytes> = (ClusterId<C2>, T), Out<Bytes> = (Tag, Bytes)>
826 + 'a,
827 T: Clone,
828 {
829 self.broadcast_bytes(other).map(q!(|(_, b)| b))
830 }
831}
832
833#[cfg(test)]
834mod tests {
835 use hydro_deploy::Deployment;
836 use hydroflow::futures::StreamExt;
837 use serde::{Deserialize, Serialize};
838 use stageleft::q;
839
840 use crate::location::Location;
841 use crate::FlowBuilder;
842
843 struct P1 {}
844 struct P2 {}
845
846 #[derive(Serialize, Deserialize, Debug)]
847 struct SendOverNetwork {
848 n: u32,
849 }
850
851 #[tokio::test]
852 async fn first_ten_distributed() {
853 let mut deployment = Deployment::new();
854
855 let flow = FlowBuilder::new();
856 let first_node = flow.process::<P1>();
857 let second_node = flow.process::<P2>();
858 let external = flow.external_process::<P2>();
859
860 let numbers = first_node.source_iter(q!(0..10));
861 let out_port = numbers
862 .map(q!(|n| SendOverNetwork { n }))
863 .send_bincode(&second_node)
864 .send_bincode_external(&external);
865
866 let nodes = flow
867 .with_process(&first_node, deployment.Localhost())
868 .with_process(&second_node, deployment.Localhost())
869 .with_external(&external, deployment.Localhost())
870 .deploy(&mut deployment);
871
872 deployment.deploy().await.unwrap();
873
874 let mut external_out = nodes.connect_source_bincode(out_port).await;
875
876 deployment.start().await.unwrap();
877
878 for i in 0..10 {
879 assert_eq!(external_out.next().await.unwrap().n, i);
880 }
881 }
882}