1use crate::{
2 binary_message::BinaryMessage,
3 constant::{PAYLOAD_TYPE_KEY_EXCHANGE, PAYLOAD_TYPE_PAYLOAD},
4 crypto_utils::{
5 self, aes_decrypt, aes_encrypt, derive_shared_key, verify_signature, IdentityToken,
6 },
7 log,
8};
9use base64ct::{Base64, Base64UrlUnpadded, Encoding};
10use serde_json::{json, Value};
11use std::{collections::VecDeque, vec};
12
13pub struct Rpc {
14 method_id: u64,
15 method_name: String,
16 metadata: IdentityToken,
17 token: String,
18 response_count: u32,
19 client_end_pending: bool,
20 client_ended: bool,
21 server_ended: bool,
22 target_did: Vec<String>,
23 target_public_key: String,
24 service_id: String,
25 call_started: bool,
26 key_exchange_pending: bool,
27 encryption_key: Option<[u8; 32]>,
28 pending_payloads: VecDeque<String>,
29}
30
31impl Rpc {
32 pub fn new(
33 id: &u64,
34 method_name: &str,
35 target_did: Vec<String>,
36 metadata: IdentityToken,
37 target_public_key: String,
38 service_id: String,
39 ) -> Rpc {
40 Rpc {
41 method_id: id.clone(),
42 method_name: method_name.to_string(),
43 token: metadata.token.clone(),
44 metadata: metadata,
45 response_count: 0,
46 client_end_pending: false,
47 client_ended: false,
48 server_ended: false,
49 target_did: target_did,
50 target_public_key: target_public_key,
51 service_id: service_id,
52 call_started: false,
53 key_exchange_pending: false,
54 encryption_key: None,
55 pending_payloads: VecDeque::new(),
56 }
57 }
58
59 pub fn encode_payload(
60 &self,
61 payload_type: &u8,
62 payload_json: &str,
63 meta_json: Option<&str>,
64 ) -> Vec<u8> {
65 let iv_generate = crypto_utils::random_bytes(16);
67 let iv: [u8; 16] = iv_generate.try_into().unwrap();
68
69 let encrypted_payload = aes_encrypt(&payload_json, &self.encryption_key.unwrap(), &iv);
71
72 let bin_message = BinaryMessage::new_payload(
74 payload_type,
75 self.method_id,
76 meta_json.unwrap_or("").as_bytes().to_vec(),
77 iv.to_vec(),
78 encrypted_payload,
79 );
80
81 bin_message.to_bytes()
82 }
83
84 pub fn send(&mut self, payload: String) -> Vec<u8> {
85 if self.server_ended {
86 log("server has ended the stream, not sending payload");
88
89 self.client_ended = true;
91
92 return vec![];
94 }
95
96 if self.call_started {
97 if !payload.is_empty() {
99 log("sending payload");
100
101 let payload_json = json!({
103 "method_payload": {
104 "json_payload": payload
105 }
106 })
107 .to_string();
108
109 self.encode_payload(&PAYLOAD_TYPE_PAYLOAD, &payload_json, None)
110 } else {
111 log("ending call");
112
113 self.client_ended = true;
115
116 let payload_json = json!({
117 "method_end": {
118 "ended": true
119 },
120 })
121 .to_string();
122
123 let meta_json = json!({
124 "client_end": true
125 })
126 .to_string();
127
128 self.encode_payload(&PAYLOAD_TYPE_PAYLOAD, &payload_json, Some(&meta_json))
129 }
130 } else if let Some(_key) = &self.encryption_key {
131 log("encryption key is present");
134
135 self.call_started = true;
136
137 let mut method_invoke = json!({
138 "method_name": self.method_name,
139 "json_request": payload
140 });
141
142 if !self.service_id.is_empty() {
143 method_invoke["service_id"] = Value::String(self.service_id.clone());
144 }
145
146 let payload_json = json!({
147 "method_invoke": method_invoke
148 })
149 .to_string();
150
151 self.encode_payload(&PAYLOAD_TYPE_PAYLOAD, &payload_json, None)
152 } else {
153 log("encryption key is not present");
156
157 self.pending_payloads.push_back(payload);
159
160 if !self.key_exchange_pending {
162 log("sending key exchange payload");
163
164 self.key_exchange_pending = true;
165
166 let mut key_exchange_json = json!({
167 "i": self.method_id,
168 "d": self.target_did,
169 });
170
171 if !self.service_id.is_empty() {
172 key_exchange_json["s"] = Value::String(self.service_id.clone());
173 }
174
175 log(&format!("key exchange json: {}", key_exchange_json));
176
177 let token_parts: Vec<&str> = self.token.split('.').collect();
179 let token_payload = Base64UrlUnpadded::decode_vec(token_parts[1]);
180 let token_signature = Base64UrlUnpadded::decode_vec(token_parts[2]);
181
182 let bin_message = BinaryMessage::new_key_exchange(
183 self.method_id,
184 key_exchange_json.to_string().as_bytes().to_vec(),
185 token_signature.unwrap(),
186 token_payload.unwrap(),
187 );
188
189 bin_message.to_bytes()
190 } else {
191 vec![]
193 }
194 }
195 }
196
197 pub fn receive(&mut self, bin_message: BinaryMessage) -> Result<String, String> {
198 self.response_count += 1;
199
200 if &bin_message.payload_type == PAYLOAD_TYPE_KEY_EXCHANGE {
201 log("received key exchange message");
202
203 let mut verify_message = bin_message.public_key.clone();
205 verify_message.extend_from_slice(&bin_message.nonce);
206
207 let verified = verify_signature(
209 &self.target_public_key,
210 &verify_message,
211 &bin_message.public_key_signature,
212 );
213
214 if !verified {
215 log("signature verification failed");
216 return Err("signature verification failed".to_string());
217 }
218
219 let public_key_pem = format!(
221 "-----BEGIN PUBLIC KEY-----\n{}\n-----END PUBLIC KEY-----",
222 Base64::encode_string(&bin_message.public_key)
223 );
224
225 let derived_key = derive_shared_key(
226 &self.metadata.encryption_key.as_ref().unwrap().0,
227 &public_key_pem,
228 );
229
230 match derived_key {
231 Ok(k) => {
232 self.encryption_key = Some(k);
233
234 Ok(json!({
235 "method_id": self.method_id,
236 "key_exchanged": true
237 })
238 .to_string())
239 }
240 Err(e) => {
241 log(&format!("error deriving key: {}", e));
242 self.encryption_key = None;
243
244 Err(e)
245 }
246 }
247 } else if &bin_message.payload_type == PAYLOAD_TYPE_PAYLOAD {
248 log("received payload message");
249
250 let decrypted_json: String;
251
252 if bin_message.iv.len() > 0 {
253 let iv: [u8; 16] = bin_message.iv[..16].try_into().unwrap();
254 let mut encrypted_payload = bin_message.payload.clone();
255
256 decrypted_json = aes_decrypt(
257 encrypted_payload.as_mut_slice(),
258 &self.encryption_key.unwrap(),
259 &iv,
260 );
261 } else {
262 decrypted_json = "{}".to_string();
263 }
264
265 let decrypted_payload: Value = serde_json::from_str(&decrypted_json).unwrap();
266
267 let meta_json = match bin_message.meta.len() {
268 0 => "{}".to_string(),
269 _ => String::from_utf8_lossy(&bin_message.meta).to_string(),
270 };
271
272 let mut response: Value = serde_json::from_str(&meta_json).unwrap();
273 response["method_id"] = self.method_id.into();
274 response["payload"] = decrypted_payload;
275
276 let response_json = response.to_string();
277
278 Ok(response_json)
279 } else {
280 log("received non-key exchange message");
281 Err("unsupported message type".to_string())
282 }
283 }
284
285 pub fn end(&mut self) -> Vec<u8> {
286 match self.client_end_pending {
287 true => {
288 vec![]
290 }
291 false => {
292 log(&format!("client side ended: {}", self.method_id));
294 self.client_end_pending = true;
295 self.send(String::new())
296 }
297 }
298 }
299
300 pub fn pending_payload(&mut self) -> Vec<u8> {
306 match self.pending_payloads.pop_front() {
307 Some(p) => self.send(p),
308 None => vec![],
309 }
310 }
311
312 pub fn drop(self) {
313 }
315}
316
317impl Drop for Rpc {
318 fn drop(&mut self) {
319 log(&format!("dropping RPC: {}", self.method_id));
320 }
321}