zawgl_client/
lib.rs

1use std::sync::{Arc, Mutex};
2use std::collections::HashMap;
3
4use futures_channel::mpsc::UnboundedSender;
5use futures_channel::oneshot::{Sender, Canceled};
6use futures_util::StreamExt;
7use tokio_tungstenite::{connect_async, tungstenite::protocol::Message};
8use serde_json::{from_str, json, value::Value};
9use uuid::Uuid;
10use log::*;
11
12type SharedChannelsMap = Arc<Mutex<HashMap<String, Sender<Value>>>>;
13
14/// Zawgl graph database client
15#[derive(Debug, Clone)]
16pub struct Client {
17    request_tx: Arc<Mutex<UnboundedSender<Message>>>,
18    map_rx_channels: SharedChannelsMap,
19    staled: Arc<Mutex<bool>>,
20}
21
22impl Client {
23
24    pub async fn new(address: &str) -> Self {
25        let (request_tx, request_rx) = futures_channel::mpsc::unbounded();
26        let map: SharedChannelsMap = Arc::new(Mutex::new(HashMap::new()));
27        let staled = Arc::new(Mutex::new(false));
28        if let Ok((ws_stream, _)) = connect_async(address).await {
29            let (write, read) = ws_stream.split();
30            tokio::spawn(request_rx.map(Ok).forward(write));
31            let clone = Arc::clone(&map);
32            let staled_cloned = Arc::clone(&staled);
33            tokio::spawn(async move {
34                read.for_each(|message| receive(Arc::clone(&staled_cloned), message, Arc::clone(&clone))).await
35            });
36        } else {
37            *staled.lock().unwrap() = true;
38        }
39        Client{request_tx: Arc::new(Mutex::new(request_tx)), map_rx_channels: Arc::clone(&map), staled}
40    }
41
42    pub fn is_staled(&self) -> bool {
43        *self.staled.lock().unwrap()
44    }
45
46    /// Executes a cypher request with parameters
47    pub async fn execute_cypher_request_with_parameters(&mut self, db: &str, query: &str, params: Value) -> Result<Value, Canceled> {
48        debug!("execute cypher request {} - {} - {}", db, query, params);
49        if !*self.staled.lock().unwrap() {
50            let uuid =  Uuid::new_v4();
51            let (tx, rx) = futures_channel::oneshot::channel::<Value>();
52            self.map_rx_channels.lock().unwrap().insert(uuid.to_string(), tx);
53            let res = tokio::spawn(send_request(Arc::clone(&self.request_tx), db.to_string(), uuid.to_string(), query.to_string(), params));
54            if res.await.unwrap().is_none() {
55                *self.staled.lock().unwrap() = true;
56            }
57            rx.await
58        } else {
59            Err(Canceled)
60        }
61    }
62
63    /// Executes a cypher request
64    pub async fn execute_cypher_request(&mut self, db: &str, query: &str) -> Result<Value, Canceled> {
65        self.execute_cypher_request_with_parameters(db, query, json!({})).await
66    }
67
68
69    /// Create database request
70    pub async fn create_database(&mut self, db_name: &str) {
71        let create = json!({
72            "create": db_name,
73            "request_id": Uuid::new_v4().to_string()
74        });
75        send(self.request_tx.clone(), create).await;
76    }
77}
78
79async fn send_request(tx: Arc<Mutex<UnboundedSender<Message>>>, db: String, id: String, query: String, params: Value) -> Option<()> {
80    let doc = json!({
81        "database": db,
82        "request_id": id,
83        "query" : query,
84        "parameters": params,
85    });
86    send(tx, doc).await
87}
88
89async fn send(tx: Arc<Mutex<UnboundedSender<Message>>>, msg: Value) -> Option<()> {
90    let mut header = "!application/openCypher".to_string();
91    header.push_str(&msg.to_string());
92    tx.lock().unwrap().unbounded_send(Message::text(header.to_string())).ok()
93}
94
95async fn receive(staled: Arc<Mutex<bool>>, message: Result<Message, tokio_tungstenite::tungstenite::Error>, map: SharedChannelsMap) {
96    match message {
97        Ok(msg) => {
98            let doc: Value = from_str(&msg.into_text().expect("json message")).expect("response");
99            let request_id = doc["request_id"].as_str().unwrap();
100            if let Some(tx) = map.lock().unwrap().remove(request_id) {
101                let res = tx.send(doc);
102                if let Err(d) = res {
103                    error!("parsing document {}", d.to_string())
104                }
105            }
106        },
107        Err(er) => {
108            *staled.lock().unwrap() = true;
109            debug!("error occured {}", er);
110        },
111    }
112}