1use std::sync::{Arc, Mutex};
2use std::collections::HashMap;
3
4use futures_channel::mpsc::UnboundedSender;
5use futures_channel::oneshot::{Sender, Canceled};
6use futures_util::StreamExt;
7use tokio_tungstenite::{connect_async, tungstenite::protocol::Message};
8use serde_json::{from_str, json, value::Value};
9use uuid::Uuid;
10use log::*;
11
12type SharedChannelsMap = Arc<Mutex<HashMap<String, Sender<Value>>>>;
13
14#[derive(Debug, Clone)]
16pub struct Client {
17 request_tx: Arc<Mutex<UnboundedSender<Message>>>,
18 map_rx_channels: SharedChannelsMap,
19 staled: Arc<Mutex<bool>>,
20}
21
22impl Client {
23
24 pub async fn new(address: &str) -> Self {
25 let (request_tx, request_rx) = futures_channel::mpsc::unbounded();
26 let map: SharedChannelsMap = Arc::new(Mutex::new(HashMap::new()));
27 let staled = Arc::new(Mutex::new(false));
28 if let Ok((ws_stream, _)) = connect_async(address).await {
29 let (write, read) = ws_stream.split();
30 tokio::spawn(request_rx.map(Ok).forward(write));
31 let clone = Arc::clone(&map);
32 let staled_cloned = Arc::clone(&staled);
33 tokio::spawn(async move {
34 read.for_each(|message| receive(Arc::clone(&staled_cloned), message, Arc::clone(&clone))).await
35 });
36 } else {
37 *staled.lock().unwrap() = true;
38 }
39 Client{request_tx: Arc::new(Mutex::new(request_tx)), map_rx_channels: Arc::clone(&map), staled}
40 }
41
42 pub fn is_staled(&self) -> bool {
43 *self.staled.lock().unwrap()
44 }
45
46 pub async fn execute_cypher_request_with_parameters(&mut self, db: &str, query: &str, params: Value) -> Result<Value, Canceled> {
48 debug!("execute cypher request {} - {} - {}", db, query, params);
49 if !*self.staled.lock().unwrap() {
50 let uuid = Uuid::new_v4();
51 let (tx, rx) = futures_channel::oneshot::channel::<Value>();
52 self.map_rx_channels.lock().unwrap().insert(uuid.to_string(), tx);
53 let res = tokio::spawn(send_request(Arc::clone(&self.request_tx), db.to_string(), uuid.to_string(), query.to_string(), params));
54 if res.await.unwrap().is_none() {
55 *self.staled.lock().unwrap() = true;
56 }
57 rx.await
58 } else {
59 Err(Canceled)
60 }
61 }
62
63 pub async fn execute_cypher_request(&mut self, db: &str, query: &str) -> Result<Value, Canceled> {
65 self.execute_cypher_request_with_parameters(db, query, json!({})).await
66 }
67
68
69 pub async fn create_database(&mut self, db_name: &str) {
71 let create = json!({
72 "create": db_name,
73 "request_id": Uuid::new_v4().to_string()
74 });
75 send(self.request_tx.clone(), create).await;
76 }
77}
78
79async fn send_request(tx: Arc<Mutex<UnboundedSender<Message>>>, db: String, id: String, query: String, params: Value) -> Option<()> {
80 let doc = json!({
81 "database": db,
82 "request_id": id,
83 "query" : query,
84 "parameters": params,
85 });
86 send(tx, doc).await
87}
88
89async fn send(tx: Arc<Mutex<UnboundedSender<Message>>>, msg: Value) -> Option<()> {
90 let mut header = "!application/openCypher".to_string();
91 header.push_str(&msg.to_string());
92 tx.lock().unwrap().unbounded_send(Message::text(header.to_string())).ok()
93}
94
95async fn receive(staled: Arc<Mutex<bool>>, message: Result<Message, tokio_tungstenite::tungstenite::Error>, map: SharedChannelsMap) {
96 match message {
97 Ok(msg) => {
98 let doc: Value = from_str(&msg.into_text().expect("json message")).expect("response");
99 let request_id = doc["request_id"].as_str().unwrap();
100 if let Some(tx) = map.lock().unwrap().remove(request_id) {
101 let res = tx.send(doc);
102 if let Err(d) = res {
103 error!("parsing document {}", d.to_string())
104 }
105 }
106 },
107 Err(er) => {
108 *staled.lock().unwrap() = true;
109 debug!("error occured {}", er);
110 },
111 }
112}