1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
use config::Config;
use endpoint;
use error::Error;
use headers;
use identity;
use osaka::{osaka, Poll};
use prost::Message;
use proto;
use std::cell::RefCell;
use std::rc::Rc;

pub struct SubscriberBuilder {
    config:       Config,
    on_publish:   Option<Box<FnMut(identity::Identity)>>,
    on_unpublish: Option<Box<FnMut(identity::Identity)>>,
}

pub fn new(config: Config) -> SubscriberBuilder {
    SubscriberBuilder {
        config,
        on_unpublish: None,
        on_publish: None,
    }
}

impl SubscriberBuilder {
    #[osaka]
    fn handler(this: Rc<RefCell<Self>>, _poll: Poll, mut stream: endpoint::Stream) {
        let m = osaka::sync!(stream);
        let headers = headers::Headers::decode(&m).unwrap();
        info!("pubres: {:?}", headers);

        loop {
            let v = match proto::SubscribeChange::decode(osaka::sync!(stream)) {
                Err(e) => {
                    warn!("{}", e);
                    return;
                }
                Ok(v) => v,
            };

            match v.m {
                Some(proto::subscribe_change::M::Publish(proto::Publish { identity, xaddr })) => {
                    if let Some(h) = &mut this.borrow_mut().on_publish {
                        match identity::Identity::from_bytes(&identity) {
                            Ok(v) => {
                                h(v);
                            }
                            Err(e) => {
                                warn!("SubscribeChange::Publish: {}", e);
                            }
                        };
                    }
                }
                Some(proto::subscribe_change::M::Unpublish(proto::Unpublish { identity })) => {
                    if let Some(h) = &mut this.borrow_mut().on_unpublish {
                        match identity::Identity::from_bytes(&identity) {
                            Ok(v) => {
                                h(v);
                            }
                            Err(e) => {
                                warn!("SubscribeChange::Publish: {}", e);
                            }
                        };
                    }
                }
                Some(proto::subscribe_change::M::Supersede(_)) => {
                    warn!("subscriber superseded");
                    return;
                }
                None => (),
            }
        }
    }

    pub fn on_unpublish<F>(mut self, f: F) -> Self
    where
        F: 'static + Fn(identity::Identity),
    {
        self.on_unpublish = Some(Box::new(f));
        self
    }

    pub fn on_publish<F>(mut self, f: F) -> Self
    where
        F: 'static + Fn(identity::Identity),
    {
        self.on_publish = Some(Box::new(f));
        self
    }

    #[osaka]
    pub fn subscribe(self, poll: Poll, shadow: identity::Address) -> Result<(), Error> {
        let mut ep = endpoint::EndpointBuilder::new(&self.config)?.connect(poll.clone());
        let mut ep = osaka::sync!(ep)?;

        let this = Rc::new(RefCell::new(self));

        let broker = ep.broker();
        ep.open(
            broker,
            headers::Headers::with_path("/carrier.broker.v1/broker/subscribe"),
            |poll, mut stream| {
                stream.small_message(proto::SubscribeRequest {
                    shadow: shadow.as_bytes().to_vec(),
                    filter: Vec::new(),
                });
                Self::handler(this.clone(), poll, stream)
            },
        );

        loop {
            match osaka::sync!(ep)? {
                endpoint::Event::Disconnect { .. } => (),
                endpoint::Event::OutgoingConnect(_) => (),
                endpoint::Event::IncommingConnect(q) => {
                    info!("ignoring incomming connect {}", q.identity);
                }
            };
        }
    }
}