summer_ipfs_client/
pubsub.rs

1use std::io::{BufReader, BufRead};
2
3use crate::IpfsApi;
4
5use reqwest;
6use serde_json;
7use base64;
8use failure::Error;
9
10#[derive(Deserialize)]
11struct JsonPubSubMessage {
12    data: String,
13    from: String,
14    seqno: String
15}
16
17#[derive(Debug)]
18pub struct PubSubMessage {
19    data: Option<Vec<u8>>,
20    from: Option<Vec<u8>>,
21    seqno: Option<Vec<u8>>
22}
23
24impl PubSubMessage {
25    pub fn data(&self) -> Option<Vec<u8>> {
26        self.data.clone()
27    }
28
29    pub fn from(&self) -> Option<Vec<u8>> {
30        self.from.clone()
31    }
32
33    pub fn seqno(&self) -> Option<Vec<u8>> {
34        self.seqno.clone()
35    }
36}
37
38impl IpfsApi {
39    pub fn pubsub_subscribe(&self, channel: &str) -> Result<impl Iterator<Item=PubSubMessage>, Error> {
40        let mut url = self.get_url()?;
41        url.set_path("api/v0/pubsub/sub");
42        url.query_pairs_mut()
43            .append_pair("arg", channel)
44            .append_pair("discover", "true");
45        let resp = reqwest::get(url)?;
46
47        let messages = BufReader::new(resp).lines()
48            .filter(|x|x.is_ok())
49            .map(|x|x.unwrap())
50            .map(|x|serde_json::from_str::<JsonPubSubMessage>(&x))
51            .filter(|x|x.is_ok())
52            .map(|x|x.unwrap())
53            .map(|x| {
54                PubSubMessage {
55                    from: base64::decode(&x.from).ok(),
56                    seqno: base64::decode(&x.seqno).ok(),
57                    data: base64::decode(&x.data).ok()
58                }
59            });
60
61        Ok(messages)
62    }
63
64    pub fn pubsub_publish(&self, channel: &str, data: &str) -> Result<(), Error> {
65        let mut url = self.get_url()?;
66        url.set_path("api/v0/pubsub/pub");
67        url.query_pairs_mut()
68            .append_pair("arg", channel)
69            .append_pair("arg", data);
70        let _resp = reqwest::get(url)?;
71        Ok(())
72    }
73}