1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
use super::publisher::*;
use super::subscriber::*;
use super::message_publisher::*;

use futures::*;
use futures::future::{BoxFuture};
use futures::channel::oneshot;

///
/// A blocking publisher is a publisher that blocks messages until it has enough subscribers
/// 
/// This is useful for cases where a publisher is being used asynchronously and wants to ensure that
/// its messages are sent to at least one subscriber. Once the required number of subscribers is
/// reached, this will not block again even if some subscribers are dropped.
/// 
pub struct BlockingPublisher<Message> {
    /// True if there are not currently enouogh subscribers in this publisher
    insufficient_subscribers: bool,

    /// The number of required subscribers
    required_subscribers: usize,

    /// The publisher where messages will be relayed
    publisher: Publisher<Message>,

    /// Futures to be notified when there are enough subscribers for this publisher
    notify_futures: Vec<oneshot::Sender<()>>
}

impl<Message: Clone> BlockingPublisher<Message> {
    ///
    /// Creates a new blocking publisher
    /// 
    /// This publisher will refuse to receive any items until at least required_subscribers are connected.
    /// The buffer size indicates the number of queued items permitted per buffer.
    /// 
    pub fn new(required_subscribers: usize, buffer_size: usize) -> BlockingPublisher<Message> {
        BlockingPublisher {
            insufficient_subscribers:   required_subscribers != 0,
            required_subscribers:       required_subscribers,
            publisher:                  Publisher::new(buffer_size),
            notify_futures:             vec![]
        }
    }

    ///
    /// Returns a future that will complete when this publisher has enough subscribers
    /// 
    /// This is useful as a way to avoid blocking with `wait_send` when setting up the publisher
    /// 
    pub fn when_fully_subscribed(&mut self) -> impl Future<Output=Result<(), oneshot::Canceled>>+Send {
        let receiver =  if self.insufficient_subscribers {
            // Return a future that will be notified when we have enough subscribers
            let (sender, receiver) = oneshot::channel();

            // Notify when there are enough subscribers
            self.notify_futures.push(sender);

            Some(receiver)
        } else {
            None
        };

        async {
            if let Some(receiver) = receiver {
                receiver.await
            } else {
                Ok(())
            }
        }
    }
}

impl<Message: 'static+Send+Clone> MessagePublisher for BlockingPublisher<Message> {
    type Message = Message;

    fn subscribe(&mut self) -> Subscriber<Message> {
        // Create the subscription
        let subscription = self.publisher.subscribe();

        // Wake the sink if we get enough subscribers
        if self.insufficient_subscribers && self.publisher.count_subscribers() >= self.required_subscribers {
            // We now have enough subscribers
            self.insufficient_subscribers = false;
            
            // Notify any futures that are waiting on this publisher
            self.notify_futures.drain(..)
                .for_each(|sender| { sender.send(()).ok(); });
        }

        // Result is our new subscription
        subscription
    }

    ///
    /// Reserves a space for a message with the subscribers, returning when it's ready
    ///
    fn when_ready(&mut self) -> BoxFuture<'static, MessageSender<Message>> {
        // If there are not enough subscribers, wait for there to be enough subscribers
        let when_subscribed = if self.insufficient_subscribers {
            Some(self.when_fully_subscribed())
        } else {
            None
        };
        let when_ready = self.publisher.when_ready();

        // Wait for there to be enough subscribers before waiting for the publisher to become ready
        Box::pin(async move {
            if let Some(when_subscribed) = when_subscribed {
                when_subscribed.await.ok();
            }

            when_ready.await
        })
    }

    ///
    /// Waits until all subscribers have consumed all pending messages
    ///
    fn when_empty(&mut self) -> BoxFuture<'static, ()> {
        let when_subscribed = if self.insufficient_subscribers {
            Some(self.when_fully_subscribed())
        } else {
            None
        };
        let when_empty = self.publisher.when_empty();

        Box::pin(async move {
            if let Some(when_subscribed) = when_subscribed {
                when_subscribed.await.ok();
            }

            when_empty.await
        })
    }

    ///
    /// Returns true if this publisher is closed (will not publish any further messages to its subscribers)
    ///
    fn is_closed(&self) -> bool { self.publisher.is_closed() }

    ///
    /// Future that returns when this publisher is closed
    ///
    fn when_closed(&self) -> BoxFuture<'static, ()> {
        self.publisher.when_closed()
    }
}