1extern crate futures;
2
3extern crate grpc;
4extern crate httpbis;
5extern crate protobuf;
6extern crate serde;
7extern crate serde_json;
8
9use crate::protos::{api, api_grpc::{self, Dgraph}};
10use futures::compat::Future01CompatExt;
13
14use errors::DgraphError;
15use std::collections::HashMap;
16use rand::{Rng, SeedableRng};
17use rand::seq::SliceRandom;
18
19pub mod errors;
20pub mod protos;
21
22
23pub struct DgraphClient
24{
25 dc: Vec<api_grpc::DgraphClient>,
27 rng_seed: [u8; 16],
28}
29
30impl DgraphClient
31{
32 pub fn new(dc: Vec<api_grpc::DgraphClient>) -> Self {
33 assert!(!dc.is_empty());
34 Self {
35rng_seed: rand::thread_rng().gen(),
37 dc,
38 }
39 }
40
41 pub fn new_txn(&self) -> Txn {
42 Txn {
43 context: Default::default(),
44 finished: false,
45 read_only: false,
46 best_effort: false,
47 mutated: false,
48 dc: self.any_client(),
49 }
50 }
51
52
53 pub fn new_read_only(&self) -> Txn {
54 Txn {
55 context: Default::default(),
56 finished: false,
57 read_only: true,
58 best_effort: false,
59 mutated: false,
60 dc: self.any_client(),
61 }
62 }
63
64
65 pub fn new_best_effort(&self) -> Txn {
66 Txn {
67 context: Default::default(),
68 finished: false,
69 read_only: true,
70 best_effort: true,
71 mutated: false,
72 dc: self.any_client(),
73 }
74 }
75
76 fn any_client(&self) -> &api_grpc::DgraphClient {
77 let mut rng = rand_xoshiro::Xoroshiro128Plus::from_seed(self.rng_seed);
78
79 self.dc.choose(&mut rng).unwrap()
80 }
81}
82
83pub struct Txn<'a> {
84 context: api::TxnContext,
85 finished: bool,
86 read_only: bool,
87 best_effort: bool,
88 mutated: bool,
89 dc: &'a api_grpc::DgraphClient,
90}
91
92impl<'a> Txn<'a> {
93 pub async fn query(&mut self, q: impl Into<String>) -> Result<api::Response, DgraphError> {
94 self.query_with_vars(q, HashMap::new()).await
95 }
96
97 pub async fn query_with_vars(
98 &mut self,
99 q: impl Into<String>,
100 vars: HashMap<String, String>,
101 ) -> Result<api::Response, DgraphError> {
102 self._do(
103 api::Request {
104 query: q.into(),
105 start_ts: self.context.start_ts,
106 read_only: self.read_only,
107 best_effort: self.best_effort,
108 vars,
109 ..Default::default()
110 }
111 ).await
112 }
113
114 pub async fn mutate(&mut self, mu: api::Mutation) -> Result<api::Response, DgraphError> {
115 self._do(
116 api::Request {
117 start_ts: self.context.start_ts,
118 commit_now: mu.commit_now,
119 mutations: vec![mu].into(),
120 ..Default::default()
121 }
122 ).await
123 }
124
125 pub async fn upsert(&mut self, q: impl Into<String>, mut mu: api::Mutation) -> Result<api::Response, DgraphError> {
126 mu.commit_now = true;
127 self._do(
128 api::Request {
129 query: q.into(),
130 mutations: vec![mu].into(),
131 commit_now: true,
132 ..Default::default()
133 }
134 ).await
135 }
136
137
138 pub async fn commit(&mut self) -> Result<(), DgraphError> {
139 match (self.read_only, self.finished) {
140 (true, _) => return Err(DgraphError::ReadOnly),
141 (_, true) => return Err(DgraphError::Finished),
142 _ => self.commit_or_abort().await,
143 }
144 }
145
146 pub async fn commit_or_abort(&mut self) -> Result<(), DgraphError> {
147 if self.finished {
148 return Ok(());
149 }
150 self.finished = true;
151
152 if !self.mutated {
153 return Ok(());
154 }
155
156 self.dc.commit_or_abort(
157 Default::default(),
158 self.context.clone(),
159 ).join_metadata_result().compat().await?;
160
161 Ok(())
162 }
163
164 pub async fn discard(&mut self) -> Result<(), DgraphError> {
165 self.context.aborted = true;
166 self.commit_or_abort().await
167 }
168
169 async fn _do(&mut self, mut req: api::Request) -> Result<api::Response, DgraphError> {
170 if self.finished {
171 return Err(DgraphError::Finished);
172 }
173
174 if !req.mutations.is_empty() {
175 if self.read_only {
176 return Err(DgraphError::ReadOnly);
177 }
178 self.mutated = true;
179 }
180
181 req.start_ts = self.context.start_ts;
182
183 let commit_now = req.commit_now;
184
185 let query_res = self.dc.query(
186 Default::default(),
187 req,
188 ).join_metadata_result().compat().await;
189
190 if let Err(_) = query_res {
192 let _ = self.discard().await;
193 }
194 let query_res = query_res?;
195
196 if commit_now {
197 self.finished = true;
198 }
199
200 let txn = match query_res.1.txn.as_ref() {
201 Some(txn) => txn,
202 None => return Err(DgraphError::EmptyTransaction)
203 };
204
205 self.merge_context(txn)?;
206 Ok(query_res.1)
207 }
208
209 fn merge_context(&mut self, src: &api::TxnContext) -> Result<(), DgraphError> {
210 if self.context.start_ts == 0 {
211 self.context.start_ts = src.start_ts;
212 }
213
214 if self.context.start_ts != src.start_ts {
215 return Err(DgraphError::StartTsMismatch);
216 }
217
218 for key in src.keys.iter() {
219 self.context.keys.push(key.clone());
220 }
221
222 for pred in src.preds.iter() {
223 self.context.preds.push(pred.clone());
224 }
225
226 Ok(())
227 }
228}
229
230#[cfg(test)]
231mod tests {
232 use super::*;
233
234 use serde_json::Value;
235 use grpc::{ClientStub, Client, ClientConf};
236 use std::sync::Arc;
237
238 fn local_dgraph_client() -> DgraphClient {
239 let addr = "localhost";
240 let port = 9080;
241
242 let client = api_grpc::DgraphClient::with_client(
243 Arc::new(
244 Client::new_plain(addr.as_ref(), port, ClientConf {
245 ..Default::default()
246 }).expect("Failed to initialize client stub")
247 )
248 );
249
250 DgraphClient::new(vec![client])
251 }
252
253 #[test]
255 fn test_query() {
256 async_std::task::block_on(async {
257 println!("Connecting to local dg");
258 let dg = local_dgraph_client();
259 let mut txn = dg.new_txn();
260 println!("Querying local dg");
261 let query_res: Value = txn.query(r#"
262 {
263 q0(func: has(node_key)) {
264 uid
265 }
266 }
267 "#)
268 .await
269 .map(|res| serde_json::from_slice(&res.json))
270 .expect("Dgraph query failed")
271 .expect("Json deserialize failed");
272
273 assert!(query_res.as_object().unwrap().contains_key("q0"));
275 });
276 }
277
278 #[test]
279 fn test_upsert() {
280 async_std::task::block_on(async {
281 let dg = local_dgraph_client();
282
283 let query = r#"
284 {
285 p as var(func: eq(node_key, "{453120d4-5c9f-43f6-b7af-28b376b3a993}"))
286 }
287 "#;
288
289 let j_mut = serde_json::json!{
290 {
291 "uid": "uid(p)",
292 "node_key": "{453120d4-5c9f-43f6-b7af-28b376b3a993}",
293 "process_name": "bar.exe",
294 }
295 };
296
297 let mu = api::Mutation {
298 set_json: j_mut.to_string().into_bytes(),
299 ..Default::default()
300 };
301 let mut txn = dg.new_txn();
302 let txn_res = txn.upsert(
303 query, mu,
304 )
305 .await
306 .expect("Request to dgraph failed");
307 dbg!(txn_res);
308
309 txn.commit_or_abort().await.unwrap();
310
311});
326 }
327
328 async fn node_key_to_uid(dg: &DgraphClient, node_key: &str) -> Result<Option<String>, DgraphError> {
329
330 let mut txn = dg.new_read_only();
331
332 const QUERY: & str = r"
333 query q0($a: string)
334 {
335 q0(func: eq(node_key, $a), first: 1) {
336 uid
337 }
338 }
339 ";
340
341 let mut vars = HashMap::new();
342 vars.insert("$a".to_string(), node_key.into());
343
344 let query_res: Value = txn.query_with_vars(QUERY, vars).await
345 .map(|res| serde_json::from_slice(&res.json))?.expect("json");
346
347 let uid = query_res.get("q0")
348 .and_then(|res| res.get(0))
349 .and_then(|uid| uid.get("uid"))
350 .and_then(|uid| uid.as_str())
351 .map(String::from);
352
353 Ok(uid)
354 }
355
356 #[test]
357 fn test_txn_query_mutate() {
358 async_std::task::block_on(async {
359 let dg = local_dgraph_client();
360
361 let query = r#"
362 {
363 q0(func: eq(node_key, "{453120d4-5c9f-43f6-b7af-28b376b3a993}")) {
364 uid
365 }
366 }
367 "#;
368
369 let mut txn = dg.new_read_only();
370
371 let query_res: Value = txn.query(query).await
372 .map(|res| serde_json::from_slice(&res.json))
373 .expect("query")
374 .expect("json");
375
376 let uid = query_res.get("q0")
377 .and_then(|res| res.get(0))
378 .and_then(|uid| uid.get("uid"))
379 .and_then(|uid| uid.as_str());
380dbg!(&uid);
382
383 let mut set = serde_json::json!({
384 "node_key": "{453120d4-5c9f-43f6-b7af-28b376b3a993}",
385 "process_name": "bar.exe",
386 });
387
388 if let Some(uid) = uid {
389 set["uid"] = uid.into();
390 }
391 let j_mut = set;
392
393 let mu = api::Mutation {
394 set_json: j_mut.to_string().into_bytes(),
395 ..Default::default()
396 };
397 let mut txn = dg.new_txn();
398 let txn_res = txn.mutate(mu)
399 .await
400 .expect("Request to dgraph failed");
401 dbg!(txn_res);
402
403 txn.commit_or_abort().await.unwrap();
404
405let k = node_key_to_uid(&dg, "{453120d4-5c9f-43f6-b7af-28b376b3a993}")
413 .await
414 .expect("nktu");
415 dbg!(k )
416 });
417 }
418}