1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
use config;
use endpoint;
use error::Error;
use headers;
use identity;
use osaka::osaka;


pub struct Endpoint {
    poll:       osaka::Poll,
    config:     config::Config,
    timeout:    u16,
    max_fragments:  Option<u32>,
}

pub fn connect_with_poll(config: config::Config, poll: osaka::Poll) -> Endpoint {
    Endpoint::new(poll, config)
}

pub fn connect(config: config::Config) -> Endpoint {
    Endpoint::new(osaka::Poll::new(), config)
}

impl Endpoint {
    pub fn new(poll: osaka::Poll, config: config::Config) -> Self {
        Self {
            poll,
            config,
            timeout: 5,
            max_fragments:  Some(0xfffff),
        }
    }

    pub fn set_timeout(&mut self, t: u16) {
        self.timeout = t;
    }

    pub fn timeout(mut self, t: u16) -> Self {
        self.timeout = t;
        self
    }

    pub fn set_max_fragments(&mut self, max_fragments: Option<u32>) {
        self.max_fragments = max_fragments;
    }

    pub fn max_fragments(mut self, max_fragments: Option<u32>) -> Self {
        self.max_fragments = max_fragments;
        self
    }

#[osaka]
    pub fn open<F>(
        self,
        target: identity::Identity,
        headers: headers::Headers,
        f: F,
        ) -> Result<(), Error>
        where
        F: 'static + FnOnce(osaka::Poll, endpoint::Handle, endpoint::Stream) -> osaka::Task<()>,
        {
            let mut ep = endpoint::EndpointBuilder::new(&self.config)?;
            ep.move_target(target.clone());
            let mut ep = ep.connect(self.poll.clone());
            let mut ep = osaka::sync!(ep)?;
            ep.connect(target.clone(), self.timeout)?;

            let q = loop {
                match osaka::sync!(ep)? {
                    endpoint::Event::BrokerGone => {
                        return Err(Error::OutgoingConnectFailed {
                            identity: target,
                            reason:   Some("broker lost".to_string()),
                        });
                    }
                    endpoint::Event::OutgoingConnect(q) => {
                        break q;
                    }
                    endpoint::Event::Disconnect { identity, .. } => {
                        return Err(Error::OutgoingConnectFailed {
                            identity,
                            reason: Some("disconnected".to_string()),
                        });
                    }
                    endpoint::Event::IncommingConnect(_) => (),
                }
            };

            let route = ep.accept_outgoing(q, move |_h, _s| None)?;
            let handle = ep.handle();
            ep.open(route, headers.clone(), self.max_fragments, |poll, stream| f(poll, handle, stream) )?;

            loop {
                match osaka::sync!(ep)? {
                    endpoint::Event::BrokerGone => return Ok(()),
                    endpoint::Event::OutgoingConnect(_) => (),
                    endpoint::Event::Disconnect { identity, reason, .. } => {
                        return Err(Error::OutgoingConnectFailed {
                            identity,
                            reason: Some(format!("{:?}", reason)),
                        });
                    }
                    endpoint::Event::IncommingConnect(_) => (),
                };
            }
        }


}