1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
use futures::sync::mpsc;
use tokio::prelude::*;

use super::{CancelReason, Streamer, Consumer, ConsumerEvent, Conduit};

pub type ProducerEventRx<T> = mpsc::UnboundedReceiver<ProducerEvent<T>>;
pub type ProducerEventTx<T> = mpsc::UnboundedSender<ProducerEvent<T>>;

pub type ProducerMessageRx = mpsc::UnboundedReceiver<ProducerMessage>;
pub type ProducerMessageTx = mpsc::UnboundedSender<ProducerMessage>;


#[derive(Debug)]
pub enum ProducerMessage {
    Request(usize),
    Cancel(CancelReason),
}

#[derive(Debug)]
pub enum ProducerEvent<T> {
    Data(T),
    End,
}

pub trait Producer<T> : Streamer
    where T: Send + 'static
{
    fn request(&mut self, num_items: usize);
    fn event_stream(&mut self) -> Option<ProducerEventRx<T>>;
    fn set_event_stream(&mut self, event_stream: ProducerEventRx<T>);
    //fn events(&mut self) -> ProducerEventEmitter<T>;
    fn pipe_into<C>(self, consumer: C)
        where Self: Sized + Send + 'static,
              C: Consumer<T> + Sized + Send + 'static,
              T: Send + 'static,
    {
        pipe_into(self, consumer);
    }

    fn pipe_through<C, U>(self, conduit: C) -> C::ConcreteProducer
        where Self: Sized + Send + 'static,
              C: Conduit<T, U> + Sized + Send + 'static,
              T: Send + 'static,
              U: Send + 'static,
              C::ConcreteConsumer: Send,
    {
        let (consumer, producer) = conduit.split();
        pipe_into(self, consumer);

        producer
    }

    fn events(&mut self) -> ProducerEventEmitter<T> {

        let event_rx = self.event_stream().expect("event stream");
            
        ProducerEventEmitter::new(event_rx)
    }
}

pub struct ProducerEventEmitter<T>
    where T: Send + 'static
{
    event_rx: Option<ProducerEventRx<T>>,
}

impl<T> ProducerEventEmitter<T>
    where T: Send + 'static
{
    pub fn new(event_rx: ProducerEventRx<T>) -> Self {
        Self {
            event_rx: Some(event_rx),
        }
    }

    pub fn for_each<C: FnMut(ProducerEvent<T>) + Send + 'static>(&mut self, mut callback: C) {
        let rx = Option::take(&mut self.event_rx).expect("take event_rx");

        tokio::spawn(rx.for_each(move |event| {
            callback(event);
            Ok(())
        }));
    }
}

pub fn pipe_into<T, P, C>(mut producer: P, mut consumer: C)
    where T: Send + 'static,
          P: Producer<T> + Send + 'static,
          C: Consumer<T> + Send + 'static
{
    let consumer_events = consumer.event_stream().expect("no event stream");

    producer.events().for_each(move |event| {
        match event {
            ProducerEvent::Data(data) => {
                consumer.write(data);
            },
            ProducerEvent::End => {
                consumer.end();
            },
        }
    });

    tokio::spawn(consumer_events.for_each(move |event| {
        match event {
            ConsumerEvent::Request(num_items) => {
                producer.request(num_items);
            },
            ConsumerEvent::Cancellation(reason) => {
                producer.cancel(reason);
            },
        }
        
        Ok(())
    })
    .map_err(|e| {
        println!("error {:?}", e);
    }));
}