rix 0.3.0

Experimental Matrix client using futures
Documentation
use hyper;
use tokio_core;
use futures;
use hyper_tls;
use std;
use error;
use serde_json;

use std::str::FromStr;
use futures::{Future, Stream};
use error::Error;

#[derive(Deserialize, Debug)]
pub struct Event {
    pub content: serde_json::Value,
    #[serde(rename = "type")]
    pub event_type: String,
    pub sender: Option<String>,
    pub room: Option<String>
}

#[derive(Deserialize, Debug)]
pub struct EventContainer {
    pub events: Vec<Event>
}

#[derive(Deserialize, Debug)]
pub struct GroupSyncResult {
    pub timeline: Option<EventContainer>,
    pub ephemeral: Option<EventContainer>
}

type GroupSyncContainer = std::collections::HashMap<String, GroupSyncResult>;

#[derive(Deserialize, Debug)]
pub struct GroupsSyncResult {
    pub leave: GroupSyncContainer,
    pub join: GroupSyncContainer,
    pub invite: GroupSyncContainer
}

#[derive(Deserialize, Debug)]
pub struct SyncResult {
    pub next_batch: String,
    pub presence: EventContainer,
    pub rooms: GroupsSyncResult
}

fn group_events<'a>(container: &'a GroupSyncContainer) -> Box<std::iter::Iterator<Item=&'a Event> + 'a> {
    Box::new(container.iter().flat_map(|(_, group)| {
        vec![&group.timeline, &group.ephemeral]
            .into_iter()
            .filter_map(|x| match x {
                &Some(ref x) => Some(x),
                &None => None
            })
            .flat_map(|x| x.events.iter())
    }))
}

impl SyncResult {
    pub fn events<'a>(&'a self) -> Box<std::iter::Iterator<Item=&'a Event> + 'a> {
        Box::new(self.presence.events.iter()
            .chain(group_events(&self.rooms.leave))
            .chain(group_events(&self.rooms.join))
            .chain(group_events(&self.rooms.invite)))
    }
}

fn fill_group_result(id: &str, container: &mut EventContainer) {
    for mut evt in &mut container.events {
        evt.room = Some(id.to_owned());
    }
}

fn fill_group_container(container: &mut GroupSyncContainer) {
    for (id, room) in container {
        if let Some(ref mut timeline) = room.timeline {
            fill_group_result(id, timeline);
        }
        if let Some(ref mut ephemeral) = room.ephemeral {
            fill_group_result(id, ephemeral);
        }
    }
}

fn fill_groups_result(result: &mut GroupsSyncResult) {
    fill_group_container(&mut result.leave);
    fill_group_container(&mut result.join);
    fill_group_container(&mut result.invite);
}

pub fn sync(host: &str, access_token: &str, handle: &tokio_core::reactor::Handle, since: Option<String>) -> Box<Future<Item=SyncResult,Error=Error>> {
    let http = hyper::Client::configure()
        .connector(box_fut_try!(hyper_tls::HttpsConnector::new(1, &handle)
                                                 .map_err(|e| e.into())))
        .build(&handle);
    let params = if let Some(since) = since {
        format!("&since={}&timeout=30000", since)
    } else {
        "".to_owned()
    };
    let mut request = hyper::Request::new(hyper::Method::Get,
                                      box_fut_try!(hyper::Uri::from_str(&format!("{}/_matrix/client/r0/sync?access_token={}{}", host, access_token, params)).map_err(|e| Error::HTTP(e.into()))));
    Box::new(http.request(request)
        .and_then(|response| {
            response.body().concat2()
        })
        .map_err(|e| e.into())
        .and_then(|body| {
            String::from_utf8(body.to_vec()).map_err(|e| e.into())
        })
        .and_then(|body| {
            serde_json::from_str(&body).map_err(|e| e.into())
        })
        .and_then(|mut body: SyncResult| {
            fill_groups_result(&mut body.rooms);
            Ok(body)
        })
        )
}

pub struct SyncStream {
    host: String,
    token: String,
    handle: tokio_core::reactor::Handle,
    current_future: Box<Future<Item=SyncResult,Error=Error>>
}

impl Stream for SyncStream {
    type Item = SyncResult;
    type Error = Error;
    fn poll(&mut self) -> futures::Poll<Option<Self::Item>,Self::Error> {
        let poll_result = self.current_future.poll();
        match poll_result {
            Ok(futures::Async::Ready(item)) => {
                self.current_future = sync(&self.host, &self.token, &self.handle, Some(item.next_batch.clone()));
                Ok(futures::Async::Ready(Some(item)))
            },
            Ok(futures::Async::NotReady) => Ok(futures::Async::NotReady),
            Err(e) => Err(e)
        }
    }
}

pub fn sync_stream(host: &str, access_token: &str, handle: &tokio_core::reactor::Handle) -> SyncStream {
    SyncStream {
        host: host.to_owned(),
        token: access_token.to_owned(),
        handle: handle.clone(),
        current_future: sync(host, access_token, handle, None)
    }
}