fisco_bcos_service/
channel.rs1use uuid::Uuid;
2use thiserror::Error;
3use std::convert::TryInto;
4use serde_json::{Value as JSONValue, json};
5
6use crate::config::Config;
7use crate::tassl::{TASSL, TASSLError};
8
9
10#[derive(Error, Debug)]
11pub enum ChannelError {
12 #[error("tassl error")]
13 TASSLError(#[from] TASSLError),
14
15 #[error("std::str::Utf8Error")]
16 StrFromUtf8Error(#[from] std::str::Utf8Error),
17
18 #[error("serde_json::Error")]
19 SerdeJsonError(#[from] serde_json::Error),
20
21 #[error("std::array::TryFromSliceError")]
22 TryFromSliceError(#[from] std::array::TryFromSliceError),
23}
24
25#[derive(Debug)]
28pub enum MessageType {
29 RpcRequest,
30 ClientRegisterEventLog,
31 AMOPClientTopics,
32 BlockNotify,
33 EventLogPush,
34}
35
36impl MessageType {
37 fn value(&self) -> i16 {
38 match *self {
39 MessageType::RpcRequest => 0x12_i16,
40 MessageType::ClientRegisterEventLog => 0x15_i16,
41 MessageType::AMOPClientTopics => 0x32_i16,
42 MessageType::BlockNotify => 0x1001_i16,
43 MessageType::EventLogPush => 0x1002_i16,
44 }
45 }
46}
47
48pub fn pack_channel_message(data: &Vec<u8>, message_type: MessageType) -> Vec<u8> {
49 let mut buffer:Vec<u8> = Vec::new();
50 buffer.append(&mut Vec::from(((data.len() + 42) as u32).to_be_bytes()));
51 buffer.append(&mut Vec::from(message_type.value().to_be_bytes()));
52 buffer.append(&mut Uuid::new_v4().to_string().replace("-", "").into_bytes());
53 buffer.append(&mut Vec::from(0_i32.to_be_bytes()));
54 buffer.append(&mut data.clone());
55 buffer
56}
57
58pub fn pack_amop_message(topic: &Vec<u8>, data: &Vec<u8>) -> Vec<u8> {
59 let mut buffer:Vec<u8> = vec!();
60 buffer.append(&mut Vec::from((1 + topic.len() as u8).to_be_bytes()));
61 buffer.append(&mut topic.clone());
62 buffer.append(&mut data.clone());
63 buffer
64}
65
66pub fn open_tassl(config: &Config) -> Result<TASSL, TASSLError> {
67 let tassl = TASSL::new(config.timeout_seconds);
68 tassl.init();
69 tassl.load_auth_files(
70 &config.authentication.ca_cert,
71 &config.authentication.sign_key,
72 &config.authentication.sign_cert,
73 &config.authentication.enc_key,
74 &config.authentication.enc_cert,
75 )?;
76 tassl.connect(&config.node.host, config.node.port)?;
77 Ok(tassl)
78}
79
80pub fn channel_read(tassl: &TASSL) -> Result<JSONValue, ChannelError> {
81 let mut buffer:Vec<u8> = vec![0; 4];
82 tassl.read(&mut buffer[0..])?;
83 let buffer_size = u32::from_be_bytes(buffer.clone().as_slice().try_into()?) as usize;
84 buffer.append(&mut vec![0; buffer_size - 4]);
85 tassl.read(&mut buffer[4..])?;
86 let message_type = i16::from_be_bytes((&buffer[4..6]).try_into()?);
87 let data = Vec::from(&buffer[42..buffer_size]);
88 if message_type == MessageType::BlockNotify.value() {
89 Ok(parse_block_notify(data))
90 } else if message_type == MessageType::ClientRegisterEventLog.value() {
91 parse_client_register_event_log(data)
92 } else {
93 Ok(serde_json::from_str(std::str::from_utf8(&data)?.trim_end_matches("\n"))?)
94 }
95}
96
97fn parse_block_notify(buffer: Vec<u8>) -> JSONValue {
98 let topic_len = u8::from_be_bytes(buffer[0].to_be_bytes()) as usize;
99 let values: Vec<String> = std::str::from_utf8(&buffer[topic_len..])
100 .unwrap_or("")
101 .to_string()
102 .split(",")
103 .into_iter()
104 .map(|v| v.to_string())
105 .collect();
106 json!({
107 "group_id": String::from(&values[0]).parse::<i32>().unwrap_or(-1),
108 "block_height": String::from(&values[1]).parse::<i32>().unwrap_or(-1),
109 })
110}
111
112fn parse_client_register_event_log(buffer: Vec<u8>) -> Result<JSONValue, ChannelError> {
113 let topic_len = u8::from_be_bytes(buffer[0].to_be_bytes()) as usize;
114 Ok(serde_json::from_str(std::str::from_utf8(&buffer[topic_len..])?.trim_end_matches("\n"))?)
115}