1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
//! Redis链路驱动层
#![feature(const_btree_new)]

use log::*;
use redis::{Commands, IntoConnectionInfo};
use spin::{Mutex, RwLock};
use std::collections::BTreeMap;
use std::thread;
use std::time::Duration;
use unmp::link::{ErrorKind, Link};
use unmp::net;

static LINKS: Mutex<BTreeMap<String, Link>> = Mutex::new(BTreeMap::new());
static DRIVERS: RwLock<BTreeMap<Link, Driver>> = RwLock::new(BTreeMap::new());

struct Driver {
    tx: Mutex<redis::Connection>,
    topic: String,
}
fn sendto(link: Link, buf: &[u8]) -> Result<(), ErrorKind> {
    trace!("redis send: {:02X?}.", buf);
    if let Some(driver) = DRIVERS.read().get(&link) {
        match driver.tx.lock().publish::<_, _, u8>(&driver.topic, buf) {
            Ok(_) => {
                return Ok(());
            }
            Err(_) => {
                warn!("redis send error.");
                return Err(ErrorKind::TimedOut);
            }
        };
    } else {
        return Err(ErrorKind::NotConnected);
    }
}

/// 创建一个UDP链路实例,并连接到指定地址端口
pub fn start(url: &str, topic: &str) -> Link {
    let url = url.into_connection_info();
    if let Err(_) = url {
        return Link::INVALID;
    }
    let url = url.unwrap();
    let identifier = format!("{}/{}", url.addr, topic);
    if let Some(link) = LINKS.lock().get(&identifier) {
        return *link;
    }
    let client = redis::Client::open(url).unwrap();
    let tx = client.get_connection().unwrap();
    tx.set_write_timeout(Some(Duration::new(0, 500_000)))
        .expect("redis set write timeout");
    let driver = Driver {
        tx: Mutex::new(tx),
        topic: String::from(topic),
    };
    let link = Link::new(sendto);
    info!("redis new {}({}).", link, identifier);
    LINKS.lock().insert(identifier.clone(), link);
    DRIVERS.write().insert(link, driver);

    //设置数据接收接口
    let topic_tmp = String::from(topic);
    thread::spawn(move || {
        let mut rx = client.get_connection().unwrap();
        let mut pubsub = rx.as_pubsub();
        pubsub.subscribe(topic_tmp).unwrap();
        loop {
            match pubsub.get_message() {
                Ok(msg) => {
                    let buf = msg.get_payload_bytes();
                    trace!("redis recv: {:02X?}.", buf);

                    // 接收数据帧
                    thread::spawn(move || {
                        let buf = msg.get_payload_bytes();
                        net::when_recv(link, &buf);
                    });
                }
                Err(e) => {
                    warn!("redis recv error: {:?}", e.kind());
                    LINKS.lock().remove(&identifier);
                    DRIVERS.write().remove(&link);
                    info!("redis rm {}.", link);
                    link.destroy();
                    return;
                }
            }
        }
    });

    return link;
}