timely/dataflow/channels/
mod.rs1use crate::communication::Push;
4use crate::Container;
5
6pub mod pushers;
8pub mod pullers;
10pub mod pact;
12
13pub type BundleCore<T, D> = crate::communication::Message<Message<T, D>>;
15
16pub type Bundle<T, D> = BundleCore<T, Vec<D>>;
18
19#[derive(Clone, Abomonation, Serialize, Deserialize)]
21pub struct Message<T, D> {
22 pub time: T,
24 pub data: D,
26 pub from: usize,
28 pub seq: usize,
30}
31
32impl<T, D> Message<T, D> {
33 #[deprecated = "Use timely::buffer::default_capacity instead"]
35 pub fn default_length() -> usize {
36 crate::container::buffer::default_capacity::<D>()
37 }
38}
39
40impl<T, D: Container> Message<T, D> {
41 pub fn new(time: T, data: D, from: usize, seq: usize) -> Self {
43 Message { time, data, from, seq }
44 }
45
46 #[inline]
49 pub fn push_at<P: Push<BundleCore<T, D>>>(buffer: &mut D, time: T, pusher: &mut P) {
50
51 let data = ::std::mem::take(buffer);
52 let message = Message::new(time, data, 0, 0);
53 let mut bundle = Some(BundleCore::from_typed(message));
54
55 pusher.push(&mut bundle);
56
57 if let Some(message) = bundle {
58 if let Some(message) = message.if_typed() {
59 *buffer = message.data;
60 buffer.clear();
61 }
62 }
63 }
64}