use crate::{
Timestamp,
algebra::IndexedZSet,
circuit::{
ChildCircuit, Circuit, Stream, circuit_builder::IterativeCircuit,
schedule::Error as SchedulerError,
},
operator::{DelayedFeedback, dynamic::distinct::DistinctFactories},
trace::Spine,
};
use crate::circuit::checkpointer::Checkpoint;
use impl_trait_for_tuples::impl_for_tuples;
use size_of::SizeOf;
use std::result::Result;
pub trait RecursiveStreams<C> {
type Feedback;
type Export;
type Output;
type Factories;
fn new(circuit: &C, factories: &Self::Factories) -> (Self::Feedback, Self);
fn distinct(self, factories: &Self::Factories) -> Self;
fn connect(&self, vars: Self::Feedback);
fn export(self, factories: &Self::Factories) -> Self::Export;
fn consolidate(exports: Self::Export, factories: &Self::Factories) -> Self::Output;
}
impl<C, B> RecursiveStreams<C> for Stream<C, B>
where
C: Circuit,
C::Parent: Circuit,
B: Checkpoint + IndexedZSet + Send + Sync,
Spine<B>: SizeOf,
{
type Feedback = DelayedFeedback<C, B>;
type Export = Stream<C::Parent, Spine<B>>;
type Output = Stream<C::Parent, B>;
type Factories = DistinctFactories<B, C::Time>;
fn new(circuit: &C, factories: &Self::Factories) -> (Self::Feedback, Self) {
let feedback =
DelayedFeedback::with_default(circuit, B::dyn_empty(&factories.input_factories));
let stream = feedback.stream().clone();
(feedback, stream)
}
fn distinct(self, factories: &Self::Factories) -> Self {
Stream::dyn_distinct(&self, factories).set_persistent_id(
self.get_persistent_id()
.map(|name| format!("{name}.distinct"))
.as_deref(),
)
}
fn connect(&self, vars: Self::Feedback) {
vars.connect(self)
}
fn export(self, factories: &Self::Factories) -> Self::Export {
Stream::export(&self.dyn_integrate_trace(&factories.input_factories))
}
fn consolidate(exports: Self::Export, factories: &Self::Factories) -> Self::Output {
Stream::dyn_consolidate(&exports, &factories.input_factories)
}
}
#[allow(clippy::unused_unit)]
#[impl_for_tuples(14)]
#[tuple_types_custom_trait_bound(Clone + RecursiveStreams<C>)]
impl<C> RecursiveStreams<C> for Tuple {
for_tuples!( type Feedback = ( #( Tuple::Feedback ),* ); );
for_tuples!( type Export = ( #( Tuple::Export ),* ); );
for_tuples!( type Output = ( #( Tuple::Output ),* ); );
for_tuples!( type Factories = ( #( Tuple::Factories ),* ); );
fn new(circuit: &C, factories: &Self::Factories) -> (Self::Feedback, Self) {
let res = (for_tuples!( #( Tuple::new(circuit, &factories.Tuple) ),* ));
let streams = (for_tuples!( #( { let stream = &res.Tuple; stream.1.clone() } ),* ));
let feedback = (for_tuples!( #( { let stream = res.Tuple; stream.0 } ),* ));
(feedback, streams)
}
fn distinct(self, factories: &Self::Factories) -> Self {
(for_tuples!( #( self.Tuple.distinct(&factories.Tuple) ),* ))
}
fn connect(&self, vars: Self::Feedback) {
for_tuples!( #( self.Tuple.connect(vars.Tuple); )* );
}
fn export(self, factories: &Self::Factories) -> Self::Export {
(for_tuples!( #( self.Tuple.export(&factories.Tuple) ),* ))
}
fn consolidate(exports: Self::Export, factories: &Self::Factories) -> Self::Output {
(for_tuples!( #( Tuple::consolidate(exports.Tuple, &factories.Tuple) ),* ))
}
}
#[rustfmt::skip]
impl<P, T> ChildCircuit<P, T>
where
P: 'static,
T: Timestamp,
Self: Circuit,
{
pub fn dyn_recursive<F, S>(&self, factories: &S::Factories, f: F) -> Result<S::Output, SchedulerError>
where
S: RecursiveStreams<IterativeCircuit<Self>>,
F: FnOnce(&IterativeCircuit<Self>, S) -> Result<S, SchedulerError>,
{
let traces = self.fixedpoint(|child| {
let (vars, input_streams) = S::new(child, factories);
let output_streams = f(child, input_streams)?;
let output_streams = S::distinct(output_streams, factories);
S::connect(&output_streams, vars);
Ok(S::export(output_streams, factories))
})?;
Ok(S::consolidate(traces, factories))
}
}
#[cfg(test)]
mod test {
use crate::{
Circuit, FallbackZSet, Runtime, Stream, operator::Generator, typed_batch::OrdZSet,
utils::Tup2, zset,
};
use std::{
thread,
time::{Duration, Instant},
vec,
};
#[test]
fn reachability() {
let mut root = Runtime::init_circuit(1, move |circuit| {
let mut edges = vec![
zset! { Tup2(1, 2) => 1 },
zset! { Tup2(2, 3) => 1},
zset! { Tup2(1, 3) => 1},
zset! { Tup2(3, 1) => 1},
zset! { Tup2(3, 1) => -1},
zset! { Tup2(1, 2) => -1},
zset! { Tup2(2, 4) => 1, Tup2(4, 1) => 1 },
zset! { Tup2(2, 3) => -1, Tup2(3, 2) => 1 },
]
.into_iter();
let mut outputs = vec![
zset! { Tup2(1, 2) => 1 },
zset! { Tup2(1, 2) => 1, Tup2(2, 3) => 1, Tup2(1, 3) => 1 },
zset! { Tup2(1, 2) => 1, Tup2(2, 3) => 1, Tup2(1, 3) => 1 },
zset! { Tup2(1, 1) => 1, Tup2(2, 2) => 1, Tup2(3, 3) => 1, Tup2(1, 2) => 1, Tup2(1, 3) => 1, Tup2(2, 3) => 1, Tup2(2, 1) => 1, Tup2(3, 1) => 1, Tup2(3, 2) => 1},
zset! { Tup2(1, 2) => 1, Tup2(2, 3) => 1, Tup2(1, 3) => 1 },
zset! { Tup2(2, 3) => 1, Tup2(1, 3) => 1 },
zset! { Tup2(1, 3) => 1, Tup2(2, 3) => 1, Tup2(2, 4) => 1, Tup2(2, 1) => 1, Tup2(4, 1) => 1, Tup2(4, 3) => 1 },
zset! { Tup2(1, 1) => 1, Tup2(2, 2) => 1, Tup2(3, 3) => 1, Tup2(4, 4) => 1,
Tup2(1, 2) => 1, Tup2(1, 3) => 1, Tup2(1, 4) => 1,
Tup2(2, 1) => 1, Tup2(2, 3) => 1, Tup2(2, 4) => 1,
Tup2(3, 1) => 1, Tup2(3, 2) => 1, Tup2(3, 4) => 1,
Tup2(4, 1) => 1, Tup2(4, 2) => 1, Tup2(4, 3) => 1 },
]
.into_iter();
let edges = circuit
.add_source(Generator::new(move || edges.next().unwrap()));
let paths = circuit.recursive(|child, paths: Stream<_, OrdZSet<Tup2<u64, u64>>>| {
let edges = edges.delta0(child);
let paths_indexed = paths.map_index(|&Tup2(x, y)| (y, x));
let edges_indexed = edges.map_index(|Tup2(x, y)| (*x, *y));
Ok(edges.plus(&paths_indexed.join(&edges_indexed, |_via, from, to| Tup2(*from, *to))))
})
.unwrap();
paths.integrate().stream_distinct().inspect(move |ps| {
assert_eq!(*ps, outputs.next().unwrap());
});
Ok(())
})
.unwrap().0;
for _ in 0..8 {
root.transaction().unwrap();
}
}
#[test]
fn issue4168() {
let (mut circuit, edges_handle) = Runtime::init_circuit(8, move |circuit| {
let (edges_stream, edges_handle) = circuit.add_input_zset::<Tup2<u64, u64>>();
let _ = circuit
.recursive(|child, paths: Stream<_, OrdZSet<Tup2<u64, u64>>>| {
let edges = edges_stream.delta0(child);
let paths_indexed = paths.map_index(|&Tup2(x, y)| (y, x));
let edges_indexed = edges.map_index(|Tup2(x, y)| (*x, *y));
Ok(edges.plus(
&paths_indexed.join(&edges_indexed, |_via, from, to| Tup2(*from, *to)),
))
})
.unwrap();
let _ = circuit
.recursive(|child, paths: Stream<_, OrdZSet<Tup2<u64, u64>>>| {
let edges = edges_stream.delta0(child);
let paths_indexed = paths.map_index(|&Tup2(x, y)| (y, x));
let edges_indexed = edges.map_index(|Tup2(x, y)| (*x, *y));
Ok(edges.plus(
&paths_indexed.join(&edges_indexed, |_via, from, to| Tup2(*from, *to)),
))
})
.unwrap();
Ok(edges_handle)
})
.unwrap();
let handle = thread::spawn(move || {
for i in 0..100 {
edges_handle.append(&mut vec![Tup2(Tup2(i, i + 1), 1)]);
circuit.transaction().unwrap();
}
});
let start = Instant::now();
while start.elapsed() < Duration::from_secs(100) {
if handle.is_finished() {
handle.join().unwrap();
return;
}
thread::sleep(Duration::from_millis(100));
}
panic!("Deadlock in test 'issue4168'");
}
#[test]
fn issue4028() {
let insert_edges = (0..100)
.map(|i| Tup2(Tup2(i, i + 1), 1))
.collect::<Vec<_>>();
let delete_edges = (0..100)
.map(|i| Tup2(Tup2(i, i + 1), -1))
.collect::<Vec<_>>();
let (mut root, (edges_handle, paths_handle)) = Runtime::init_circuit(1, move |circuit| {
let (edges, edges_handle) = circuit.add_input_zset::<Tup2<u64, u64>>();
let paths = circuit
.recursive(|child, paths: Stream<_, OrdZSet<Tup2<u64, u64>>>| {
let edges = edges.delta0(child);
let paths_indexed = paths.map_index(|&Tup2(x, y)| (y, x));
let edges_indexed = edges.map_index(|Tup2(x, y)| (*x, *y));
Ok(edges.plus(
&paths_indexed.join(&edges_indexed, |_via, from, to| Tup2(*from, *to)),
))
})
.unwrap();
let paths_handle = paths.integrate().output();
Ok((edges_handle, paths_handle))
})
.unwrap();
for _ in 0..10 {
edges_handle.append(&mut insert_edges.clone());
root.transaction().unwrap();
edges_handle.append(&mut delete_edges.clone());
root.transaction().unwrap();
let paths = paths_handle.consolidate();
assert!(paths.is_empty());
}
}
#[test]
fn reachability2() {
type Edges<S> = Stream<S, OrdZSet<Tup2<u64, u64>>>;
let mut root = Runtime::init_circuit(1, move |circuit| {
let mut edges = vec![
zset! { Tup2(1, 2) => 1 },
zset! { Tup2(2, 3) => 1},
zset! { Tup2(1, 3) => 1},
zset! { Tup2(3, 1) => 1},
zset! { Tup2(3, 1) => -1},
zset! { Tup2(1, 2) => -1},
zset! { Tup2(2, 4) => 1, Tup2(4, 1) => 1 },
zset! { Tup2(2, 3) => -1, Tup2(3, 2) => 1 },
]
.into_iter();
let output_vec = vec![
zset! { Tup2(1, 2) => 1 },
zset! { Tup2(1, 2) => 1, Tup2(2, 3) => 1, Tup2(1, 3) => 1 },
zset! { Tup2(1, 2) => 1, Tup2(2, 3) => 1, Tup2(1, 3) => 1 },
zset! { Tup2(1, 1) => 1, Tup2(2, 2) => 1, Tup2(3, 3) => 1, Tup2(1, 2) => 1, Tup2(1, 3) => 1, Tup2(2, 3) => 1, Tup2(2, 1) => 1, Tup2(3, 1) => 1, Tup2(3, 2) => 1},
zset! { Tup2(1, 2) => 1, Tup2(2, 3) => 1, Tup2(1, 3) => 1 },
zset! { Tup2(2, 3) => 1, Tup2(1, 3) => 1 },
zset! { Tup2(1, 3) => 1, Tup2(2, 3) => 1, Tup2(2, 4) => 1, Tup2(2, 1) => 1, Tup2(4, 1) => 1, Tup2(4, 3) => 1 },
zset! { Tup2(1, 1) => 1, Tup2(2, 2) => 1, Tup2(3, 3) => 1, Tup2(4, 4) => 1,
Tup2(1, 2) => 1, Tup2(1, 3) => 1, Tup2(1, 4) => 1,
Tup2(2, 1) => 1, Tup2(2, 3) => 1, Tup2(2, 4) => 1,
Tup2(3, 1) => 1, Tup2(3, 2) => 1, Tup2(3, 4) => 1,
Tup2(4, 1) => 1, Tup2(4, 2) => 1, Tup2(4, 3) => 1 },
];
let mut outputs = output_vec.clone().into_iter();
let mut outputs2 = output_vec.into_iter();
let edges = circuit
.add_source(Generator::new(move || edges.next().unwrap()));
let (paths, reverse_paths): (Stream<_, FallbackZSet<Tup2<u64, u64>>>, Stream<_, FallbackZSet<Tup2<u64, u64>>>) =
circuit.recursive(|child, (paths, reverse_paths): (Edges<_>, Edges<_>)| {
let edges = edges.delta0(child);
let paths_indexed = paths.map_index(|&Tup2(x, y)| (y, x));
let reverse_paths_indexed = reverse_paths.map_index(|&Tup2(x, y)| (y, x));
let edges_indexed = edges.map_index(|Tup2(x,y)| (*x, *y));
let reverse_edges = edges.map(|&Tup2(x, y)| Tup2(y, x));
let reverse_edges_indexed = reverse_edges.map_index(|Tup2(x,y)| (*x, *y));
Ok((edges.plus(&paths_indexed.join(&edges_indexed, |_via, from, to| Tup2(*from, *to))),
reverse_edges.plus(&reverse_paths_indexed.join(&reverse_edges_indexed, |_via, from, to| Tup2(*from, *to)))
))
})
.unwrap();
paths.integrate().stream_distinct().inspect(move |ps| {
assert_eq!(*ps, outputs.next().unwrap());
});
reverse_paths.map(|Tup2(x, y)| Tup2(*y, *x)).integrate().stream_distinct().inspect(move |ps: &OrdZSet<_>| {
assert_eq!(*ps, outputs2.next().unwrap());
});
Ok(())
})
.unwrap().0;
for _ in 0..8 {
root.transaction().unwrap();
}
}
}