v_common/module/
remote_indv_r_storage.rs1use crate::module::common::get_queue_status;
2use crate::module::veda_backend::get_storage_use_prop;
3use v_individual_model::onto::individual::{Individual, RawObj};
4use v_individual_model::onto::individual2msgpack::to_msgpack;
5use v_storage::{StorageId, StorageMode, VStorage, StorageROClient};
6use nng::{Message, Protocol, Socket};
7use std::cell::RefCell;
8use std::str;
9use std::sync::Mutex;
10use uuid::Uuid;
11use lazy_static::lazy_static;
12use log::error;
13
14lazy_static! {
15 pub static ref STORAGE: Mutex<RefCell<StorageROClient>> = Mutex::new(RefCell::new(StorageROClient::default()));
16}
17
18pub fn inproc_storage_manager() -> std::io::Result<()> {
21 let ro_storage_url = "inproc://nng/".to_owned() + &Uuid::new_v4().to_hyphenated().to_string();
22 STORAGE.lock().unwrap().get_mut().addr = ro_storage_url.to_owned();
23
24 let mut storage = get_storage_use_prop(StorageMode::ReadOnly);
25
26 let server = Socket::new(Protocol::Rep0)?;
27 if let Err(e) = server.listen(&ro_storage_url) {
28 error!("fail listen, {:?}", e);
29 return Ok(());
30 }
31
32 loop {
33 if let Ok(recv_msg) = server.recv() {
34 let res = req_prepare(&recv_msg, &mut storage);
35 if let Err(e) = server.send(res) {
36 error!("fail send {:?}", e);
37 }
38 }
39 }
40}
41
42fn req_prepare(request: &Message, storage: &mut VStorage) -> Message {
43 if let Ok(id) = str::from_utf8(request.as_slice()) {
44 if id.starts_with("srv:queue-state-") {
45 let indv = get_queue_status(id);
46
47 let mut binobj: Vec<u8> = Vec::new();
48 if let Err(e) = to_msgpack(&indv, &mut binobj) {
49 error!("failed to serialize, err = {:?}", e);
50 return Message::from("[]".as_bytes());
51 }
52
53 return Message::from(binobj.as_slice());
54 }
55
56 let binobj = storage.get_raw_value(StorageId::Individuals, id);
57 match binobj {
58 v_storage::StorageResult::Ok(data) => {
59 if data.is_empty() {
60 return Message::from("[]".as_bytes());
61 }
62 return Message::from(data.as_slice());
63 },
64 _ => {
65 return Message::from("[]".as_bytes());
66 }
67 }
68 }
69
70 Message::default()
71}
72
73pub fn get_individual(id: &str) -> Option<Individual> {
74 if id.starts_with("srv:queue-state-") {
75 return Some(get_queue_status(id));
76 }
77
78 let req = Message::from(id.to_string().as_bytes());
79
80 let mut sh_client = STORAGE.lock().unwrap();
81 let client = sh_client.get_mut();
82
83 if !client.is_ready {
84 client.connect();
85 }
86
87 if !client.is_ready {
88 return None;
89 }
90
91 if let Err(e) = client.soc.send(req) {
92 error!("fail send to storage_manager, err={:?}", e);
93 return None;
94 }
95
96 let wmsg = client.soc.recv();
98 if let Err(e) = wmsg {
99 error!("fail recv from main module, err={:?}", e);
100 return None;
101 }
102
103 drop(sh_client);
104
105 if let Ok(msg) = wmsg {
106 let data = msg.as_slice();
107 if data == b"[]" {
108 return None;
109 }
110 return Some(Individual::new_raw(RawObj::new(data.to_vec())));
111 }
112
113 None
114}