1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
use std::io::{BufReader, BufRead, Cursor};
use crate::IpfsApi;
use reqwest;
use serde_json;
use base64;
use failure::Error;
#[derive(Deserialize)]
struct JsonPubSubMessage {
data: String,
from: String,
seqno: String
}
#[derive(Debug)]
pub struct PubSubMessage {
data: Option<Vec<u8>>,
from: Option<Vec<u8>>,
seqno: Option<Vec<u8>>
}
impl PubSubMessage {
pub fn data(&self) -> Option<Vec<u8>> {
self.data.clone()
}
pub fn from(&self) -> Option<Vec<u8>> {
self.from.clone()
}
pub fn seqno(&self) -> Option<Vec<u8>> {
self.seqno.clone()
}
}
impl IpfsApi {
pub async fn pubsub_subscribe(&self, channel: &str) -> Result<impl Iterator<Item=PubSubMessage>, Error> {
let mut url = self.get_url()?;
url.set_path("api/v0/pubsub/sub");
url.query_pairs_mut()
.append_pair("arg", channel)
.append_pair("discover", "true");
let resp = reqwest::get(url).await?.bytes().await?;
let messages = BufReader::new(Cursor::new(resp)).lines()
.filter(|x|x.is_ok())
.map(|x|x.unwrap())
.map(|x|serde_json::from_str::<JsonPubSubMessage>(&x))
.filter(|x|x.is_ok())
.map(|x|x.unwrap())
.map(|x| {
PubSubMessage {
from: base64::decode(&x.from).ok(),
seqno: base64::decode(&x.seqno).ok(),
data: base64::decode(&x.data).ok()
}
});
Ok(messages)
}
pub async fn pubsub_publish(&self, channel: &str, data: &str) -> Result<(), Error> {
let mut url = self.get_url()?;
url.set_path("api/v0/pubsub/pub");
url.query_pairs_mut()
.append_pair("arg", channel)
.append_pair("arg", data);
let _resp = reqwest::get(url).await?;
Ok(())
}
}