fisco_bcos_service/
channel.rs

1use uuid::Uuid;
2use thiserror::Error;
3use std::convert::TryInto;
4use serde_json::{Value as JSONValue, json};
5
6use crate::config::Config;
7use crate::tassl::{TASSL, TASSLError};
8
9
10#[derive(Error, Debug)]
11pub enum ChannelError {
12    #[error("tassl error")]
13    TASSLError(#[from] TASSLError),
14
15    #[error("std::str::Utf8Error")]
16    StrFromUtf8Error(#[from] std::str::Utf8Error),
17
18    #[error("serde_json::Error")]
19    SerdeJsonError(#[from] serde_json::Error),
20
21    #[error("std::array::TryFromSliceError")]
22    TryFromSliceError(#[from] std::array::TryFromSliceError),
23}
24
25// 格式详情参见:
26// https://fisco-bcos-documentation.readthedocs.io/zh_CN/latest/docs/design/protocol_description.html#channelmessage-v2
27#[derive(Debug)]
28pub enum MessageType {
29    RpcRequest,
30    ClientRegisterEventLog,
31    AMOPClientTopics,
32    BlockNotify,
33    EventLogPush,
34}
35
36impl MessageType {
37    fn value(&self) -> i16 {
38        match *self {
39            MessageType::RpcRequest => 0x12_i16,
40            MessageType::ClientRegisterEventLog => 0x15_i16,
41            MessageType::AMOPClientTopics => 0x32_i16,
42            MessageType::BlockNotify => 0x1001_i16,
43            MessageType::EventLogPush => 0x1002_i16,
44        }
45    }
46}
47
48pub fn pack_channel_message(data: &Vec<u8>, message_type: MessageType) -> Vec<u8> {
49    let mut buffer:Vec<u8> = Vec::new();
50    buffer.append(&mut Vec::from(((data.len() + 42) as u32).to_be_bytes()));
51    buffer.append(&mut Vec::from(message_type.value().to_be_bytes()));
52    buffer.append(&mut Uuid::new_v4().to_string().replace("-", "").into_bytes());
53    buffer.append(&mut Vec::from(0_i32.to_be_bytes()));
54    buffer.append(&mut data.clone());
55    buffer
56}
57
58pub fn pack_amop_message(topic: &Vec<u8>, data: &Vec<u8>) -> Vec<u8> {
59    let mut buffer:Vec<u8> = vec!();
60    buffer.append(&mut Vec::from((1 + topic.len() as u8).to_be_bytes()));
61    buffer.append(&mut topic.clone());
62    buffer.append(&mut data.clone());
63    buffer
64}
65
66pub fn open_tassl(config: &Config) -> Result<TASSL, TASSLError> {
67    let tassl = TASSL::new(config.timeout_seconds);
68    tassl.init();
69    tassl.load_auth_files(
70        &config.authentication.ca_cert,
71        &config.authentication.sign_key,
72        &config.authentication.sign_cert,
73        &config.authentication.enc_key,
74        &config.authentication.enc_cert,
75    )?;
76    tassl.connect(&config.node.host, config.node.port)?;
77    Ok(tassl)
78}
79
80pub fn channel_read(tassl: &TASSL) -> Result<JSONValue, ChannelError> {
81    let mut buffer:Vec<u8> = vec![0; 4];
82    tassl.read(&mut buffer[0..])?;
83    let buffer_size = u32::from_be_bytes(buffer.clone().as_slice().try_into()?) as usize;
84    buffer.append(&mut vec![0; buffer_size - 4]);
85    tassl.read(&mut buffer[4..])?;
86    let message_type = i16::from_be_bytes((&buffer[4..6]).try_into()?);
87    let data = Vec::from(&buffer[42..buffer_size]);
88    if message_type == MessageType::BlockNotify.value() {
89        Ok(parse_block_notify(data))
90    } else if message_type == MessageType::ClientRegisterEventLog.value() {
91        parse_client_register_event_log(data)
92    } else {
93        Ok(serde_json::from_str(std::str::from_utf8(&data)?.trim_end_matches("\n"))?)
94    }
95}
96
97fn parse_block_notify(buffer: Vec<u8>) -> JSONValue {
98    let topic_len = u8::from_be_bytes(buffer[0].to_be_bytes()) as usize;
99    let values: Vec<String> = std::str::from_utf8(&buffer[topic_len..])
100        .unwrap_or("")
101        .to_string()
102        .split(",")
103        .into_iter()
104        .map(|v| v.to_string())
105        .collect();
106    json!({
107        "group_id": String::from(&values[0]).parse::<i32>().unwrap_or(-1),
108        "block_height": String::from(&values[1]).parse::<i32>().unwrap_or(-1),
109    })
110}
111
112fn parse_client_register_event_log(buffer: Vec<u8>) -> Result<JSONValue, ChannelError> {
113    let topic_len = u8::from_be_bytes(buffer[0].to_be_bytes()) as usize;
114    Ok(serde_json::from_str(std::str::from_utf8(&buffer[topic_len..])?.trim_end_matches("\n"))?)
115}