1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
use std::io::{Error, ErrorKind, Result};
use std::ffi::CString;
use message::Message;
use std::cmp::Ordering;
use std::ptr;
use std::boxed::Box;
use std::rc::Rc;
use std::ops::Deref;
use std::slice;
use std::time::Duration;
use ffi::*;

/// An LCM instance that handles publishing and subscribing,
/// as well as encoding and decoding messages.
pub struct Lcm {
    lcm: *mut lcm_t,
    subscriptions: Vec<Rc<LcmSubscription>>,
}


pub struct LcmSubscription {
    subscription: *mut lcm_subscription_t,
    handler: Box<FnMut(*const lcm_recv_buf_t)>,
}


impl Lcm {
    /// Creates a new `Lcm` instance.
    ///
    /// ```
    /// use lcm::Lcm;
    /// let mut lcm = Lcm::new().unwrap();
    /// ```
    pub fn new() -> Result<Lcm> {
        trace!("Creating LCM instance");
        let lcm = unsafe { lcm_create(ptr::null()) };
        match lcm.is_null() {
            true => Err(Error::new(ErrorKind::Other, "Failed to initialize LCM.")),
            false => {
                Ok(Lcm {
                    lcm: lcm,
                    subscriptions: Vec::new(),
                })
            }
        }
    }

    pub fn get_fileno(&self) -> ::std::os::raw::c_int {
        unsafe { lcm_get_fileno(self.lcm) }
    }

    /// Subscribes a callback to a particular topic.
    ///
    /// ```
    /// # use lcm::Lcm;
    /// let mut lcm = Lcm::new().unwrap();
    /// lcm.subscribe("GREETINGS", |name: String| println!("Hello, {}!", name) );
    /// ```
    pub fn subscribe<M, F>(&mut self, channel: &str, mut callback: F) -> Rc<LcmSubscription>
        where M: Message,
              F: FnMut(M) + 'static
    {
        trace!("Subscribing handler to channel {}", channel);

        let channel = CString::new(channel).unwrap();

        let handler = Box::new(move |rbuf: *const lcm_recv_buf_t| {
            trace!("Running handler");
            let mut buf = unsafe {
                let ref rbuf = *rbuf;
                let data = rbuf.data as *mut u8;
                let len = rbuf.data_size as usize;
                slice::from_raw_parts(data, len)
            };
            trace!("Decoding buffer: {:?}", buf);
            match M::decode_with_hash(&mut buf) {
                Ok(msg) => callback(msg),
                Err(_) => error!("Failed to decode buffer: {:?}", buf),
            }
        });

        let mut subscription = Rc::new(LcmSubscription {
            subscription: ptr::null_mut(),
            handler: handler,
        });

        let user_data = (subscription.deref() as *const _) as *mut _;

        let c_subscription = unsafe {
            lcm_subscribe(self.lcm,
                          channel.as_ptr(),
                          Some(Lcm::handler_callback::<M>),
                          user_data)
        };

        Rc::get_mut(&mut subscription).unwrap().subscription = c_subscription;
        self.subscriptions.push(subscription.clone());

        subscription
    }

    /// Unsubscribes a message handler.
    ///
    /// ```
    /// # use lcm::Lcm;
    /// # let handler_function = |name: String| println!("Hello, {}!", name);
    /// # let mut lcm = Lcm::new().unwrap();
    /// let handler = lcm.subscribe("GREETINGS", handler_function);
    /// // ...
    /// lcm.unsubscribe(handler);
    /// ```
    pub fn unsubscribe(&mut self, handler: Rc<LcmSubscription>) -> Result<()> {
        trace!("Unsubscribing handler {:?}", handler.subscription);
        let result = unsafe { lcm_unsubscribe(self.lcm, handler.subscription) };

        self.subscriptions.retain(|sub| { sub.subscription != handler.subscription });

        match result {
            0 => Ok(()),
            _ => Err(Error::new(ErrorKind::Other, "LCM: Failed to unsubscribe")),
        }
    }

    /// Publishes a message on the specified channel.
    ///
    /// ```
    /// # use lcm::Lcm;
    /// let mut lcm = Lcm::new().unwrap();
    /// lcm.publish("GREETINGS", &"Charles".to_string()).unwrap();
    /// ```
    pub fn publish<M>(&mut self, channel: &str, message: &M) -> Result<()>
        where M: Message
    {
        let channel = CString::new(channel).unwrap();
        let buffer = message.encode_with_hash()?;
        let result = unsafe {
            lcm_publish(self.lcm,
                        channel.as_ptr(),
                        buffer.as_ptr() as *mut _,
                        buffer.len() as _)
        };
        match result {
            0 => Ok(()),
            _ => Err(Error::new(ErrorKind::Other, "LCM Error")),
        }
    }

    /// Waits for and dispatches the next incoming message.
    ///
    /// ```
    /// # use lcm::Lcm;
    /// # let handler_function = |name: String| println!("Hello, {}!", name);
    /// let mut lcm = Lcm::new().unwrap();
    /// lcm.subscribe("POSITION", handler_function);
    /// loop {
    /// # break;
    ///     lcm.handle().unwrap();
    /// }
    /// ```
    pub fn handle(&mut self) -> Result<()> {
        let result = unsafe { lcm_handle(self.lcm) };
        match result {
            0 => Ok(()),
            _ => Err(Error::new(ErrorKind::Other, "LCM Error")),
        }
    }

    /// Waits for and dispatches the next incoming message, up to a time limit.
    ///
    /// ```
    /// # use std::time::Duration;
    /// # use lcm::Lcm;
    /// # let handler_function = |name: String| println!("Hello, {}!", name);
    /// let mut lcm = Lcm::new().unwrap();
    /// lcm.subscribe("POSITION", handler_function);
    /// let wait_dur = Duration::from_millis(100);
    /// loop {
    /// # break;
    ///     lcm.handle_timeout(Duration::from_millis(1000)).unwrap();
    /// }
    /// ```
    pub fn handle_timeout(&mut self, timeout: Duration) -> Result<()> {
        let result = unsafe { lcm_handle_timeout(self.lcm, (timeout.as_secs() * 1000) as i32 + (timeout.subsec_nanos() / 1000_000) as i32) };
        match result.cmp(&0) {
            Ordering::Less => Err(Error::new(ErrorKind::Other, "LCM Error")),
            Ordering::Equal => Err(Error::new(ErrorKind::Other, "LCM Timeout")),
            Ordering::Greater => Ok(()),
        }
    }

    /// Adjusts the maximum number of received messages that can be queued up for a subscription.
    /// The default is `30`.
    ///
    /// ```
    /// # use lcm::Lcm;
    /// # let handler_function = |name: String| println!("Hello, {}!", name);
    /// # let mut lcm = Lcm::new().unwrap();
    /// let handler = lcm.subscribe("POSITION", handler_function);
    /// lcm.subscription_set_queue_capacity(handler, 30);
    /// ```
    pub fn subscription_set_queue_capacity(&self, handler: Rc<LcmSubscription>, num_messages: usize) {
        let handler = handler.subscription;
        let num_messages = num_messages as _;
        unsafe { lcm_subscription_set_queue_capacity(handler, num_messages) };
    }



    extern "C" fn handler_callback<M>(rbuf: *const lcm_recv_buf_t,
                                      _: *const ::std::os::raw::c_char,
                                      user_data: *mut ::std::os::raw::c_void)
        where M: Message
    {
        trace!("Received data");
        let sub = user_data as *mut LcmSubscription;
        let sub = unsafe { &mut *sub };
        (sub.handler)(rbuf);
    }
}

impl Drop for Lcm {
    fn drop(&mut self) {
        trace!("Destroying Lcm instance");
        unsafe { lcm_destroy(self.lcm) };
    }
}



///
/// Tests
///

#[test]
fn initialized() {
    let _lcm = Lcm::new().unwrap();
}

#[test]
fn test_subscribe() {
    let mut lcm = Lcm::new().unwrap();
    lcm.subscribe("channel", |_: String| {});
    assert_eq!(lcm.subscriptions.len(), 1);
}

#[test]
fn test_unsubscribe() {
    let mut lcm = Lcm::new().unwrap();
    let sub = lcm.subscribe("channel", |_: String| {});
    lcm.unsubscribe(sub).unwrap();
    assert_eq!(lcm.subscriptions.len(), 0);
}