1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
use super::framing::{Acknowledgement, BaseMsg, Message, MessageCodec, Request};
use super::{DataGram, Error, Generation, PublisherDesc, Result, MAX_DATA_SIZE};
use bytes::{Bytes, BytesMut};
use futures::{
    sink::SinkExt,
    stream::{Stream, StreamExt},
    FutureExt,
};
use std::collections::HashSet;
use std::{
    net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, ToSocketAddrs},
    pin::Pin,
    task::{Context, Poll},
};
use tokio::net::UdpSocket;
use tokio::sync::mpsc::{self, Sender};
use tokio::time as timer;
use tokio_util::udp::UdpFramed;

/// Future that completes when the subscription has been acknowledge to the subscriber.
pub struct SubscriptionComplete<'a> {
    inner: &'a mut Subscription,
}

impl<'a> std::future::Future for SubscriptionComplete<'a> {
    type Output = ();
    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
        debug!("Poll for subscription complete");
        let pin = self.get_mut();
        loop {
            futures::ready!(pin.inner.handle_message(cx));
            debug!("subscribed: {}", pin.inner.subscribed);
            if pin.inner.subscribed {
                debug!("SubscriptionComplete completed");
                break Poll::Ready(());
            }
            debug!("SubscriptionComplete Retry");
        }
    }
}

/// Stream of messages from [Publishers](super::publisher::Publisher)
///
/// The new function starts the process of subscribing to a publisher. A message could arrive at
/// any point after the call completes. [Subscription::wait_for_subscription_complete] can be used to listen
/// for the subscription acknowledgement.
///
/// All inbound messages are handled during the stream poll. This can mean that a subscriber not
/// handling messages can be deemed in active and it's subscription ended.
///
/// The primary means of interacting with a [Subscription] is through the [futures::stream::Stream]
/// impl.
pub struct Subscription {
    desc: PublisherDesc,
    addr: SocketAddr,
    inner_stream: Pin<Box<dyn Send + Stream<Item = Result<DataGram>>>>,
    sink: Sender<DataGram>,
    generation: Generation,
    current: std::option::Option<BytesMut>,
    chunks: usize,
    received_chunks: HashSet<usize>,
    subscribed: bool,
}

impl Subscription {
    pub async fn new(desc: PublisherDesc) -> Result<Subscription> {
        let addr = match desc.to_socket_addrs()?.next() {
            Some(a) => a,
            None => return Err(Error::AddrParseError),
        };

        let bind_addr = SocketAddr::new(
            match addr {
                SocketAddr::V4(_) => IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
                SocketAddr::V6(_) => IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0)),
            },
            0,
        );
        let (udp_sink, udp_stream) =
            UdpFramed::new(UdpSocket::bind(&bind_addr).await?, MessageCodec {}).split();

        let (sender, receiver) = mpsc::channel(10);

        {
            let mut subscribe_sender = sender.clone();
            tokio::spawn(async move {
                subscribe_sender
                    .send((Message::Request(Request::Subscribe(BaseMsg {})), addr))
                    .await
            });
        }

        {
            let mut pinned_sink = Box::pin(udp_sink);
            let addr_moveable = addr;
            tokio::spawn(async move {
                let ret = receiver
                    .map(|m| {
                        debug!("Sending Message");
                        Ok(m)
                    })
                    .forward(pinned_sink.as_mut())
                    .await;

                if let Err(err) = pinned_sink
                    .send((
                        Message::Request(Request::Unsubscribe(BaseMsg {})),
                        addr_moveable,
                    ))
                    .await
                {
                    error!("Unable to send final unsubscribe: {}", err);
                }

                ret
            });
        }
        Ok(Subscription {
            desc,
            addr,
            inner_stream: Box::pin(udp_stream),
            sink: sender,
            generation: 0,
            current: None,
            chunks: 0,
            received_chunks: HashSet::new(),
            subscribed: false,
        })
    }

    /// Returns the description used to create the Subscription
    pub fn description(&self) -> &PublisherDesc {
        &self.desc
    }

    /// Returns a future that completes when the subscription has been acknowledged by the
    /// publisher.
    pub fn wait_for_subscription_complete(&mut self) -> SubscriptionComplete {
        SubscriptionComplete { inner: self }
    }

    /// Centralized message handling function. This works with the Stream impl and Completion
    /// future.
    fn handle_message(&mut self, cx: &mut Context) -> Poll<()> {
        loop {
            debug!("Receive one message");
            let message = match futures::ready!(self.inner_stream.as_mut().poll_next(cx)) {
                Some(Ok(m)) => m,
                Some(Err(err)) => {
                    error!("Error parsing message {}", err);
                    continue;
                }
                None => continue,
            };

            debug!("message {:?}", message);
            if message.1 == self.addr {
                match message.0 {
                    Message::Data(data) => {
                        if data.generation > self.generation {
                            debug!("New data, clearing old");
                            self.generation = data.generation;
                            let completed = data.complete_size;
                            let mut new_current = BytesMut::new();
                            new_current.resize(completed, 0);
                            self.current.replace(new_current);
                            self.chunks = completed / MAX_DATA_SIZE
                                + if completed % MAX_DATA_SIZE == 0 { 0 } else { 1 };
                            self.received_chunks.clear();
                        }
                        if data.generation == self.generation {
                            if let Some(current) = &mut self.current {
                                if self.received_chunks.insert(data.chunk) {
                                    let offset = data.chunk * MAX_DATA_SIZE;
                                    debug!("current length is: {}", current.len());
                                    current[offset..data.data.len()].copy_from_slice(&data.data);
                                }
                            }
                        }

                        if self.received_chunks.len() == self.chunks {
                            return Poll::Ready(());
                        }
                    }
                    Message::Acknowledgement(ack) => {
                        debug!("Ack: {:?}", ack);
                        match ack {
                            Acknowledgement::Subscription(sub) => {
                                debug!("Subscription Ack: {}", sub);
                                self.subscribed = true;
                                let timeout = sub.timeout_interval / 2;
                                let mut sink = self.sink.clone();
                                let addr = self.addr;
                                let resub = timer::delay_for(timeout).then(move |_| async move {
                                    debug!("Sending Resubscription");
                                    match sink
                                        .send((
                                            Message::Request(Request::Subscribe(BaseMsg {})),
                                            addr,
                                        ))
                                        .await
                                    {
                                        Ok(()) => debug!("Sent Resubscription"),
                                        Err(_) => error!("Out put pipe shut unexpectedly"),
                                    };
                                });

                                tokio::spawn(resub);
                                return Poll::Ready(());
                            }
                        }
                    }
                    _ => {
                        //Skip unknown
                        debug!("Unknown Message: {}", message.0);
                    }
                }
            }
        }
    }
}

impl Stream for Subscription {
    type Item = Bytes;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
        let pin = self.get_mut();
        loop {
            futures::ready!(pin.handle_message(cx));
            if pin.received_chunks.len() == pin.chunks {
                if let Some(current) = pin.current.take() {
                    return Poll::Ready(Some(current.freeze()));
                }
            }
        }
    }
}