1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
//! Redis链路驱动层

extern crate redis;

use log::{trace, warn};
use redis::Commands;
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
use unmp::link::{Driver, Link, Recver};
use unmp::net;

/// Redis链路驱动接口
struct Client {
    tx: Mutex<redis::Connection>,
    topic: String,
}
impl Driver for Client {
    fn send(&self, buf: &[u8]) {
        trace!("redis send: {:02X?}.", buf);
        let mut tx = self.tx.lock().unwrap();
        let _: u8 = tx.publish(&self.topic, buf).expect("redis send error");
    }
}

/// 创建一个UDP链路实例,并连接到指定地址端口
pub fn start(url: &str, topic: &str) -> Link {
    let client = redis::Client::open(url).unwrap();
    let tx = client.get_connection().unwrap();
    tx.set_write_timeout(Some(Duration::new(0, 500_000)))
        .expect("redis set write timeout");
    let tx = Mutex::new(tx);
    let driver = Arc::new(Client {
        tx: tx,
        topic: String::from(topic),
    });
    let link = Link::new(format!("Redis({}:{})", url, topic), driver.clone());

    //设置数据接收接口
    let link_tmp = link.clone();
    let topic_tmp = String::from(topic);
    thread::spawn(move || {
        let mut recver: Recver = Recver::new();
        let mut rx = client.get_connection().unwrap();
        let mut pubsub = rx.as_pubsub();
        pubsub.subscribe(topic_tmp).unwrap();
        loop {
            match pubsub.get_message() {
                Ok(msg) => {
                    let mut buf = msg.get_payload_bytes();
                    trace!("redis recv: {:02X?}.", buf);

                    // 接收数据帧
                    while let Some(load) = {
                        let load = recver.recv(buf);
                        buf = &[];
                        load
                    } {
                        net::recv_handle(link_tmp.clone(), &load);
                    }
                }
                Err(e) => {
                    warn!("redis recv error: {:?}", e.kind());
                }
            }
        }
    });

    return link;
}