1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
extern crate redis;
use log::{trace, warn};
use redis::Commands;
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
use unmp::link::{Driver, Link, Recver};
use unmp::net;
struct Client {
tx: Mutex<redis::Connection>,
topic: String,
}
impl Driver for Client {
fn send(&self, buf: &[u8]) {
trace!("redis send: {:02X?}.", buf);
let mut tx = self.tx.lock().unwrap();
let _: u8 = tx.publish(&self.topic, buf).expect("redis send error");
}
}
pub fn start(url: &str, topic: &str) -> Link {
let client = redis::Client::open(url).unwrap();
let tx = client.get_connection().unwrap();
tx.set_write_timeout(Some(Duration::new(0, 500_000)))
.expect("redis set write timeout");
let tx = Mutex::new(tx);
let driver = Arc::new(Client {
tx: tx,
topic: String::from(topic),
});
let link = Link::new(format!("Redis({}:{})", url, topic), driver.clone());
let link_tmp = link.clone();
let topic_tmp = String::from(topic);
thread::spawn(move || {
let mut recver: Recver = Recver::new();
let mut rx = client.get_connection().unwrap();
let mut pubsub = rx.as_pubsub();
pubsub.subscribe(topic_tmp).unwrap();
loop {
match pubsub.get_message() {
Ok(msg) => {
let mut buf = msg.get_payload_bytes();
trace!("redis recv: {:02X?}.", buf);
while let Some(load) = {
let load = recver.recv(buf);
buf = &[];
load
} {
net::recv_handle(link_tmp.clone(), &load);
}
}
Err(e) => {
warn!("redis recv error: {:?}", e.kind());
}
}
}
});
return link;
}