timely/dataflow/operators/
rc.rs1use crate::dataflow::channels::pact::Pipeline;
4use crate::dataflow::operators::Operator;
5use crate::dataflow::{Scope, StreamCore};
6use crate::Container;
7use std::rc::Rc;
8
9pub trait SharedStream<S: Scope, C: Container> {
11 fn shared(&self) -> StreamCore<S, Rc<C>>;
25}
26
27impl<S: Scope, C: Container> SharedStream<S, C> for StreamCore<S, C> {
28 fn shared(&self) -> StreamCore<S, Rc<C>> {
29 let mut container = Default::default();
30 self.unary(Pipeline, "Shared", move |_, _| {
31 move |input, output| {
32 input.for_each(|time, data| {
33 data.swap(&mut container);
34 output
35 .session(&time)
36 .give_container(&mut Rc::new(std::mem::take(&mut container)));
37 });
38 }
39 })
40 }
41}
42
43#[cfg(test)]
44mod test {
45 use crate::dataflow::channels::pact::Pipeline;
46 use crate::dataflow::operators::capture::Extract;
47 use crate::dataflow::operators::rc::SharedStream;
48 use crate::dataflow::operators::{Capture, Concatenate, Operator, ToStream};
49
50 #[test]
51 fn test_shared() {
52 let output = crate::example(|scope| {
53 let shared = vec![Ok(0), Err(())].to_stream(scope).shared();
54 scope
55 .concatenate([
56 shared.unary(Pipeline, "read shared 1", |_, _| {
57 let mut container = Default::default();
58 move |input, output| {
59 input.for_each(|time, data| {
60 data.swap(&mut container);
61 output.session(&time).give(container.as_ptr() as usize);
62 });
63 }
64 }),
65 shared.unary(Pipeline, "read shared 2", |_, _| {
66 let mut container = Default::default();
67 move |input, output| {
68 input.for_each(|time, data| {
69 data.swap(&mut container);
70 output.session(&time).give(container.as_ptr() as usize);
71 });
72 }
73 }),
74 ])
75 .capture()
76 });
77 let output = &mut output.extract()[0].1;
78 output.sort();
79 output.dedup();
80 assert_eq!(output.len(), 1);
81 }
82}