timely/dataflow/channels/
mod.rs

1//! Structured communication between timely dataflow operators.
2
3use crate::communication::Push;
4use crate::Container;
5
6/// A collection of types that may be pushed at.
7pub mod pushers;
8/// A collection of types that may be pulled from.
9pub mod pullers;
10/// Parallelization contracts, describing how data must be exchanged between operators.
11pub mod pact;
12
13/// The input to and output from timely dataflow communication channels.
14pub type BundleCore<T, D> = crate::communication::Message<Message<T, D>>;
15
16/// The input to and output from timely dataflow communication channels specialized to vectors.
17pub type Bundle<T, D> = BundleCore<T, Vec<D>>;
18
19/// A serializable representation of timestamped data.
20#[derive(Clone, Abomonation, Serialize, Deserialize)]
21pub struct Message<T, D> {
22    /// The timestamp associated with the message.
23    pub time: T,
24    /// The data in the message.
25    pub data: D,
26    /// The source worker.
27    pub from: usize,
28    /// A sequence number for this worker-to-worker stream.
29    pub seq: usize,
30}
31
32impl<T, D> Message<T, D> {
33    /// Default buffer size.
34    #[deprecated = "Use timely::buffer::default_capacity instead"]
35    pub fn default_length() -> usize {
36        crate::container::buffer::default_capacity::<D>()
37    }
38}
39
40impl<T, D: Container> Message<T, D> {
41    /// Creates a new message instance from arguments.
42    pub fn new(time: T, data: D, from: usize, seq: usize) -> Self {
43        Message { time, data, from, seq }
44    }
45
46    /// Forms a message, and pushes contents at `pusher`. Replaces `buffer` with what the pusher
47    /// leaves in place, or the container's default element.
48    #[inline]
49    pub fn push_at<P: Push<BundleCore<T, D>>>(buffer: &mut D, time: T, pusher: &mut P) {
50
51        let data = ::std::mem::take(buffer);
52        let message = Message::new(time, data, 0, 0);
53        let mut bundle = Some(BundleCore::from_typed(message));
54
55        pusher.push(&mut bundle);
56
57        if let Some(message) = bundle {
58            if let Some(message) = message.if_typed() {
59                *buffer = message.data;
60                buffer.clear();
61            }
62        }
63    }
64}