1extern crate futures;
2
3extern crate grpc;
4extern crate httpbis;
5extern crate protobuf;
6extern crate serde;
7extern crate serde_json;
8
9use crate::protos::{api, api_grpc::{self, Dgraph}};
10use futures::compat::Future01CompatExt;
13
14use errors::DgraphError;
15use std::collections::HashMap;
16use rand::{Rng, SeedableRng};
17use rand::seq::SliceRandom;
18
19pub mod errors;
20pub mod protos;
21
22
23pub struct DgraphClient
24{
25    dc: Vec<api_grpc::DgraphClient>,
27    rng_seed: [u8; 16],
28}
29
30impl DgraphClient
31{
32    pub fn new(dc: Vec<api_grpc::DgraphClient>) -> Self {
33        assert!(!dc.is_empty());
34        Self {
35rng_seed: rand::thread_rng().gen(),
37            dc,
38        }
39    }
40
41    pub fn new_txn(&self) -> Txn {
42        Txn {
43            context: Default::default(),
44            finished: false,
45            read_only: false,
46            best_effort: false,
47            mutated: false,
48            dc: self.any_client(),
49        }
50    }
51
52
53    pub fn new_read_only(&self) -> Txn {
54        Txn {
55            context: Default::default(),
56            finished: false,
57            read_only: true,
58            best_effort: false,
59            mutated: false,
60            dc: self.any_client(),
61        }
62    }
63
64
65    pub fn new_best_effort(&self) -> Txn {
66        Txn {
67            context: Default::default(),
68            finished: false,
69            read_only: true,
70            best_effort: true,
71            mutated: false,
72            dc: self.any_client(),
73        }
74    }
75
76    fn any_client(&self) -> &api_grpc::DgraphClient {
77        let mut rng = rand_xoshiro::Xoroshiro128Plus::from_seed(self.rng_seed);
78
79        self.dc.choose(&mut rng).unwrap()
80    }
81}
82
83pub struct Txn<'a> {
84    context: api::TxnContext,
85    finished: bool,
86    read_only: bool,
87    best_effort: bool,
88    mutated: bool,
89    dc: &'a api_grpc::DgraphClient,
90}
91
92impl<'a> Txn<'a> {
93    pub async fn query(&mut self, q: impl Into<String>) -> Result<api::Response, DgraphError> {
94        self.query_with_vars(q, HashMap::new()).await
95    }
96
97    pub async fn query_with_vars(
98        &mut self,
99        q: impl Into<String>,
100        vars: HashMap<String, String>,
101    ) -> Result<api::Response, DgraphError> {
102        self._do(
103            api::Request {
104                query: q.into(),
105                start_ts: self.context.start_ts,
106                read_only: self.read_only,
107                best_effort: self.best_effort,
108                vars,
109                ..Default::default()
110            }
111        ).await
112    }
113
114    pub async fn mutate(&mut self, mu: api::Mutation) -> Result<api::Response, DgraphError> {
115        self._do(
116            api::Request {
117                start_ts: self.context.start_ts,
118                commit_now: mu.commit_now,
119                mutations: vec![mu].into(),
120                ..Default::default()
121            }
122        ).await
123    }
124
125    pub async fn upsert(&mut self, q: impl Into<String>, mut mu: api::Mutation) -> Result<api::Response, DgraphError> {
126        mu.commit_now = true;
127        self._do(
128            api::Request {
129                query: q.into(),
130                mutations: vec![mu].into(),
131                commit_now: true,
132                ..Default::default()
133            }
134        ).await
135    }
136
137
138    pub async fn commit(&mut self) -> Result<(), DgraphError> {
139        match (self.read_only, self.finished) {
140            (true, _) => return Err(DgraphError::ReadOnly),
141            (_, true) => return Err(DgraphError::Finished),
142            _ => self.commit_or_abort().await,
143        }
144    }
145
146    pub async fn commit_or_abort(&mut self) -> Result<(), DgraphError> {
147        if self.finished {
148            return Ok(());
149        }
150        self.finished = true;
151
152        if !self.mutated {
153            return Ok(());
154        }
155
156        self.dc.commit_or_abort(
157            Default::default(),
158            self.context.clone(),
159        ).join_metadata_result().compat().await?;
160
161        Ok(())
162    }
163
164    pub async fn discard(&mut self) -> Result<(), DgraphError> {
165        self.context.aborted = true;
166        self.commit_or_abort().await
167    }
168
169    async fn _do(&mut self, mut req: api::Request) -> Result<api::Response, DgraphError> {
170        if self.finished {
171            return Err(DgraphError::Finished);
172        }
173
174        if !req.mutations.is_empty() {
175            if self.read_only {
176                return Err(DgraphError::ReadOnly);
177            }
178            self.mutated = true;
179        }
180
181        req.start_ts = self.context.start_ts;
182
183        let commit_now = req.commit_now;
184
185        let query_res = self.dc.query(
186            Default::default(),
187            req,
188        ).join_metadata_result().compat().await;
189
190        if let Err(_) = query_res {
192            let _ = self.discard().await;
193        }
194        let query_res = query_res?;
195
196        if commit_now {
197            self.finished = true;
198        }
199
200        let txn = match query_res.1.txn.as_ref() {
201            Some(txn) => txn,
202            None => return Err(DgraphError::EmptyTransaction)
203        };
204
205        self.merge_context(txn)?;
206        Ok(query_res.1)
207    }
208
209    fn merge_context(&mut self, src: &api::TxnContext) -> Result<(), DgraphError> {
210        if self.context.start_ts == 0 {
211            self.context.start_ts = src.start_ts;
212        }
213
214        if self.context.start_ts != src.start_ts {
215            return Err(DgraphError::StartTsMismatch);
216        }
217
218        for key in src.keys.iter() {
219            self.context.keys.push(key.clone());
220        }
221
222        for pred in src.preds.iter() {
223            self.context.preds.push(pred.clone());
224        }
225
226        Ok(())
227    }
228}
229
230#[cfg(test)]
231mod tests {
232    use super::*;
233
234    use serde_json::Value;
235    use grpc::{ClientStub, Client, ClientConf};
236    use std::sync::Arc;
237
238    fn local_dgraph_client() -> DgraphClient {
239        let addr = "localhost";
240        let port = 9080;
241
242        let client = api_grpc::DgraphClient::with_client(
243            Arc::new(
244                Client::new_plain(addr.as_ref(), port, ClientConf {
245                    ..Default::default()
246                }).expect("Failed to initialize client stub")
247            )
248        );
249
250        DgraphClient::new(vec![client])
251    }
252
253    #[test]
255    fn test_query() {
256        async_std::task::block_on(async {
257            println!("Connecting to local dg");
258            let dg = local_dgraph_client();
259            let mut txn = dg.new_txn();
260            println!("Querying local dg");
261            let query_res: Value = txn.query(r#"
262                {
263                  q0(func: has(node_key)) {
264                    uid
265                  }
266                }
267            "#)
268                .await
269                .map(|res| serde_json::from_slice(&res.json))
270                .expect("Dgraph query failed")
271                .expect("Json deserialize failed");
272
273            assert!(query_res.as_object().unwrap().contains_key("q0"));
275        });
276    }
277
278    #[test]
279    fn test_upsert() {
280        async_std::task::block_on(async {
281            let dg = local_dgraph_client();
282
283            let query = r#"
284                {
285                  p as var(func: eq(node_key, "{453120d4-5c9f-43f6-b7af-28b376b3a993}"))
286                }
287                "#;
288
289            let j_mut = serde_json::json!{
290                {
291                    "uid": "uid(p)",
292                    "node_key": "{453120d4-5c9f-43f6-b7af-28b376b3a993}",
293                    "process_name": "bar.exe",
294                }
295            };
296
297            let mu = api::Mutation {
298                set_json: j_mut.to_string().into_bytes(),
299                ..Default::default()
300            };
301            let mut txn = dg.new_txn();
302            let txn_res = txn.upsert(
303                query, mu,
304            )
305                .await
306                .expect("Request to dgraph failed");
307            dbg!(txn_res);
308
309            txn.commit_or_abort().await.unwrap();
310
311});
326    }
327
328    async fn node_key_to_uid(dg: &DgraphClient, node_key: &str) -> Result<Option<String>, DgraphError> {
329
330        let mut txn = dg.new_read_only();
331
332        const QUERY: & str = r"
333        query q0($a: string)
334        {
335            q0(func: eq(node_key, $a), first: 1) {
336                uid
337            }
338        }
339        ";
340
341        let mut vars = HashMap::new();
342        vars.insert("$a".to_string(), node_key.into());
343
344        let query_res: Value = txn.query_with_vars(QUERY, vars).await
345            .map(|res| serde_json::from_slice(&res.json))?.expect("json");
346
347        let uid = query_res.get("q0")
348            .and_then(|res| res.get(0))
349            .and_then(|uid| uid.get("uid"))
350            .and_then(|uid| uid.as_str())
351            .map(String::from);
352
353        Ok(uid)
354    }
355
356    #[test]
357    fn test_txn_query_mutate() {
358        async_std::task::block_on(async {
359            let dg = local_dgraph_client();
360
361            let query = r#"
362                {
363                  q0(func: eq(node_key, "{453120d4-5c9f-43f6-b7af-28b376b3a993}")) {
364                    uid
365                  }
366                }
367                "#;
368
369            let mut txn = dg.new_read_only();
370
371            let query_res: Value = txn.query(query).await
372                .map(|res| serde_json::from_slice(&res.json))
373                .expect("query")
374                .expect("json");
375
376            let uid = query_res.get("q0")
377                .and_then(|res| res.get(0))
378                .and_then(|uid| uid.get("uid"))
379                .and_then(|uid| uid.as_str());
380dbg!(&uid);
382
383            let mut set = serde_json::json!({
384                "node_key": "{453120d4-5c9f-43f6-b7af-28b376b3a993}",
385                "process_name": "bar.exe",
386            });
387
388            if let Some(uid) = uid {
389                set["uid"] = uid.into();
390            }
391            let j_mut = set;
392
393            let mu = api::Mutation {
394                set_json: j_mut.to_string().into_bytes(),
395                ..Default::default()
396            };
397            let mut txn = dg.new_txn();
398            let txn_res = txn.mutate(mu)
399                .await
400                .expect("Request to dgraph failed");
401            dbg!(txn_res);
402
403            txn.commit_or_abort().await.unwrap();
404
405let k = node_key_to_uid(&dg, "{453120d4-5c9f-43f6-b7af-28b376b3a993}")
413                .await
414                .expect("nktu");
415            dbg!(k )
416        });
417    }
418}