Skip to main content

v_common/module/
remote_indv_r_storage.rs

1use crate::module::common::get_queue_status;
2use crate::module::veda_backend::get_storage_use_prop;
3use v_individual_model::onto::individual::{Individual, RawObj};
4use v_individual_model::onto::individual2msgpack::to_msgpack;
5use v_storage::{StorageId, StorageMode, VStorage, StorageROClient};
6use nng::{Message, Protocol, Socket};
7use std::cell::RefCell;
8use std::str;
9use std::sync::Mutex;
10use uuid::Uuid;
11use lazy_static::lazy_static;
12use log::error;
13
14lazy_static! {
15    pub static ref STORAGE: Mutex<RefCell<StorageROClient>> = Mutex::new(RefCell::new(StorageROClient::default()));
16}
17
18// inproc storage server
19
20pub fn inproc_storage_manager() -> std::io::Result<()> {
21    let ro_storage_url = "inproc://nng/".to_owned() + &Uuid::new_v4().to_hyphenated().to_string();
22    STORAGE.lock().unwrap().get_mut().addr = ro_storage_url.to_owned();
23
24    let mut storage = get_storage_use_prop(StorageMode::ReadOnly);
25
26    let server = Socket::new(Protocol::Rep0)?;
27    if let Err(e) = server.listen(&ro_storage_url) {
28        error!("fail listen, {:?}", e);
29        return Ok(());
30    }
31
32    loop {
33        if let Ok(recv_msg) = server.recv() {
34            let res = req_prepare(&recv_msg, &mut storage);
35            if let Err(e) = server.send(res) {
36                error!("fail send {:?}", e);
37            }
38        }
39    }
40}
41
42fn req_prepare(request: &Message, storage: &mut VStorage) -> Message {
43    if let Ok(id) = str::from_utf8(request.as_slice()) {
44        if id.starts_with("srv:queue-state-") {
45            let indv = get_queue_status(id);
46
47            let mut binobj: Vec<u8> = Vec::new();
48            if let Err(e) = to_msgpack(&indv, &mut binobj) {
49                error!("failed to serialize, err = {:?}", e);
50                return Message::from("[]".as_bytes());
51            }
52
53            return Message::from(binobj.as_slice());
54        }
55
56        let binobj = storage.get_raw_value(StorageId::Individuals, id);
57        match binobj {
58            v_storage::StorageResult::Ok(data) => {
59                if data.is_empty() {
60                    return Message::from("[]".as_bytes());
61                }
62                return Message::from(data.as_slice());
63            },
64            _ => {
65                return Message::from("[]".as_bytes());
66            }
67        }
68    }
69
70    Message::default()
71}
72
73pub fn get_individual(id: &str) -> Option<Individual> {
74    if id.starts_with("srv:queue-state-") {
75        return Some(get_queue_status(id));
76    }
77
78    let req = Message::from(id.to_string().as_bytes());
79
80    let mut sh_client = STORAGE.lock().unwrap();
81    let client = sh_client.get_mut();
82
83    if !client.is_ready {
84        client.connect();
85    }
86
87    if !client.is_ready {
88        return None;
89    }
90
91    if let Err(e) = client.soc.send(req) {
92        error!("fail send to storage_manager, err={:?}", e);
93        return None;
94    }
95
96    // Wait for the response from the server.
97    let wmsg = client.soc.recv();
98    if let Err(e) = wmsg {
99        error!("fail recv from main module, err={:?}", e);
100        return None;
101    }
102
103    drop(sh_client);
104
105    if let Ok(msg) = wmsg {
106        let data = msg.as_slice();
107        if data == b"[]" {
108            return None;
109        }
110        return Some(Individual::new_raw(RawObj::new(data.to_vec())));
111    }
112
113    None
114}