summer_ipfs_client/
pubsub.rs1use std::io::{BufReader, BufRead};
2
3use crate::IpfsApi;
4
5use reqwest;
6use serde_json;
7use base64;
8use failure::Error;
9
10#[derive(Deserialize)]
11struct JsonPubSubMessage {
12 data: String,
13 from: String,
14 seqno: String
15}
16
17#[derive(Debug)]
18pub struct PubSubMessage {
19 data: Option<Vec<u8>>,
20 from: Option<Vec<u8>>,
21 seqno: Option<Vec<u8>>
22}
23
24impl PubSubMessage {
25 pub fn data(&self) -> Option<Vec<u8>> {
26 self.data.clone()
27 }
28
29 pub fn from(&self) -> Option<Vec<u8>> {
30 self.from.clone()
31 }
32
33 pub fn seqno(&self) -> Option<Vec<u8>> {
34 self.seqno.clone()
35 }
36}
37
38impl IpfsApi {
39 pub fn pubsub_subscribe(&self, channel: &str) -> Result<impl Iterator<Item=PubSubMessage>, Error> {
40 let mut url = self.get_url()?;
41 url.set_path("api/v0/pubsub/sub");
42 url.query_pairs_mut()
43 .append_pair("arg", channel)
44 .append_pair("discover", "true");
45 let resp = reqwest::get(url)?;
46
47 let messages = BufReader::new(resp).lines()
48 .filter(|x|x.is_ok())
49 .map(|x|x.unwrap())
50 .map(|x|serde_json::from_str::<JsonPubSubMessage>(&x))
51 .filter(|x|x.is_ok())
52 .map(|x|x.unwrap())
53 .map(|x| {
54 PubSubMessage {
55 from: base64::decode(&x.from).ok(),
56 seqno: base64::decode(&x.seqno).ok(),
57 data: base64::decode(&x.data).ok()
58 }
59 });
60
61 Ok(messages)
62 }
63
64 pub fn pubsub_publish(&self, channel: &str, data: &str) -> Result<(), Error> {
65 let mut url = self.get_url()?;
66 url.set_path("api/v0/pubsub/pub");
67 url.query_pairs_mut()
68 .append_pair("arg", channel)
69 .append_pair("arg", data);
70 let _resp = reqwest::get(url)?;
71 Ok(())
72 }
73}