1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
use miniz_oxide::inflate;
use serde::Deserialize;
use chrono::prelude::*;
use elite_journal::entry::{Entry, Event};
pub const URL: &'static str = "tcp://eddn.edcd.io:9500";
#[derive(Debug, Deserialize)]
pub struct Envelope {
#[serde(rename = "$schemaRef")]
pub schema_ref: String,
pub header: Header,
pub message: Message,
}
#[derive(Debug, Deserialize)]
pub struct Header {
#[serde(rename = "gatewayTimestamp")]
pub gateway_timestamp: DateTime<Utc>,
#[serde(rename = "softwareName")]
pub software_name: String,
#[serde(rename = "softwareVersion")]
pub software_version: String,
#[serde(rename = "uploaderID")]
pub uploader_id: String,
}
#[derive(Debug, Deserialize)]
#[serde(untagged)]
pub enum Message {
Journal(Entry<Event>),
Other(serde_json::Value),
}
pub fn subscribe(url: &str) -> EnvelopeIterator {
println!("Starting EDDN ZeroMQ consumer...");
let ctx = zmq::Context::new();
let socket = ctx.socket(zmq::SUB)
.expect("failed to open socket");
socket.connect(url)
.expect("connection failed");
socket
.set_subscribe(&vec![])
.expect("failed to subscribe");
println!("Connected and subscribed.");
EnvelopeIterator { socket }
}
pub struct EnvelopeIterator {
socket: zmq::Socket,
}
impl Iterator for EnvelopeIterator {
type Item = Result<Envelope, serde_json::Error>;
fn next(&mut self) -> Option<Self::Item> {
let compressed = self.socket.recv_bytes(0)
.expect("failed to receive bytes");
let json = inflate::decompress_to_vec_zlib(&compressed)
.expect("failed to decompress");
Some(serde_json::from_slice(&json))
}
}
const SCHEMA_JOURNAL : &str = "https://eddn.edcd.io/schemas/journal/1";