dgraph_tonic/client/mod.rs
1use std::convert::TryInto;
2use std::fmt::Debug;
3use std::ops::{Deref, DerefMut};
4
5use anyhow::Result;
6use http::Uri;
7use rand::Rng;
8#[cfg(any(feature = "acl", feature = "slash-ql"))]
9use tonic::codegen::InterceptedService;
10use tonic::transport::{Channel, Endpoint};
11
12use crate::api::dgraph_client::DgraphClient as DClient;
13use crate::api::Version;
14#[cfg(feature = "acl")]
15pub use crate::client::acl::{
16 AclClient, AclClientType, DgraphAclClient, TxnAcl, TxnAclBestEffort, TxnAclMutated,
17 TxnAclReadOnly,
18};
19#[cfg(all(feature = "acl", feature = "tls"))]
20pub use crate::client::acl::{
21 AclTlsClient, TxnAclTls, TxnAclTlsBestEffort, TxnAclTlsMutated, TxnAclTlsReadOnly,
22};
23pub use crate::client::default::{
24 Client, Http, LazyChannel, Txn, TxnBestEffort, TxnMutated, TxnReadOnly,
25};
26pub use crate::client::endpoints::Endpoints;
27use crate::client::lazy::ILazyChannel;
28pub(crate) use crate::client::lazy::ILazyClient;
29#[cfg(feature = "slash-ql")]
30pub use crate::client::slash_ql::{
31 DgraphSlashQlClient, SlashQl, SlashQlClient, TxnSlashQl, TxnSlashQlBestEffort,
32 TxnSlashQlMutated, TxnSlashQlReadOnly,
33};
34#[cfg(feature = "tls")]
35pub use crate::client::tls::{
36 Tls, TlsClient, TxnTls, TxnTlsBestEffort, TxnTlsMutated, TxnTlsReadOnly,
37};
38use crate::errors::ClientError;
39use crate::stub::Stub;
40use crate::{
41 IDgraphClient, Operation, Payload, TxnBestEffortType, TxnMutatedType, TxnReadOnlyType, TxnType,
42};
43
44#[cfg(feature = "acl")]
45pub(crate) mod acl;
46pub(crate) mod default;
47pub(crate) mod endpoints;
48pub(crate) mod lazy;
49#[cfg(feature = "slash-ql")]
50pub(crate) mod slash_ql;
51#[cfg(feature = "tls")]
52pub(crate) mod tls;
53
54///
55/// return random cloned item from vector
56///
57pub(crate) fn rnd_item<T: Clone>(items: &[T]) -> T {
58 let mut rng = rand::thread_rng();
59 let i = rng.gen_range(0..items.len());
60 if let Some(item) = items.get(i) {
61 item.to_owned()
62 } else {
63 unreachable!()
64 }
65}
66
67///
68/// Check if every endpoint is valid uri and also check if at least one endpoint is given
69///
70pub(crate) fn balance_list<U: TryInto<Uri>, E: Into<Endpoints<U>>>(
71 endpoints: E,
72) -> Result<Vec<Uri>> {
73 let endpoints: Endpoints<U> = endpoints.into();
74 let mut balance_list: Vec<Uri> = Vec::new();
75 for maybe_endpoint in endpoints.endpoints {
76 let endpoint = match maybe_endpoint.try_into() {
77 Ok(endpoint) => endpoint,
78 Err(_err) => {
79 return Err(ClientError::InvalidEndpoint.into());
80 }
81 };
82 balance_list.push(endpoint);
83 }
84 if balance_list.is_empty() {
85 return Err(ClientError::NoEndpointsDefined.into());
86 };
87 Ok(balance_list)
88}
89
90///
91/// Available types of DgraphClient
92///
93#[derive(Debug, Clone)]
94pub enum DgraphClient {
95 Default {
96 client: DClient<Channel>,
97 },
98 #[cfg(feature = "acl")]
99 Acl {
100 client: DgraphAclClient,
101 },
102 #[cfg(feature = "slash-ql")]
103 SlashQl {
104 client: DgraphSlashQlClient,
105 },
106}
107
108///
109/// Dgraph client with interceptor
110///
111#[cfg(any(feature = "acl", feature = "slash-ql"))]
112pub type DgraphInterceptorClient<T> = DClient<InterceptedService<Channel, T>>;
113
114///
115/// Allow custom configuration of endpoint
116///
117pub trait EndpointConfig: Send + Sync + Debug {
118 fn configure_endpoint(&self, endpoint: Endpoint) -> Endpoint;
119}
120
121///
122/// Marker for client variant implementation
123///
124pub trait IClient: Debug + Send + Sync {
125 type Client: ILazyClient<Channel = Self::Channel>;
126 type Channel: ILazyChannel;
127 ///
128 /// Return lazy Dgraph gRPC client
129 ///
130 fn client(&self) -> Self::Client;
131
132 ///
133 /// consume self and return all lazy clients
134 ///
135 fn clients(self) -> Vec<Self::Client>;
136}
137
138///
139/// Client state.
140///
141#[derive(Debug, Default)]
142pub struct ClientState;
143
144impl ClientState {
145 ///
146 /// Create new client state
147 ///
148 pub fn new() -> Self {
149 Self::default()
150 }
151}
152
153///
154/// Dgraph client has several variants which offer different behavior.
155///
156#[derive(Debug)]
157pub struct ClientVariant<S: IClient> {
158 state: Box<ClientState>,
159 pub(crate) extra: S,
160}
161
162impl<S: IClient> Deref for ClientVariant<S> {
163 type Target = Box<ClientState>;
164
165 fn deref(&self) -> &Self::Target {
166 &self.state
167 }
168}
169
170impl<S: IClient> DerefMut for ClientVariant<S> {
171 fn deref_mut(&mut self) -> &mut Self::Target {
172 &mut self.state
173 }
174}
175
176impl<C: IClient> ClientVariant<C> {
177 ///
178 /// Return new stub with grpc client implemented according to actual variant.
179 ///
180 fn any_stub(&self) -> Stub<C::Client> {
181 Stub::new(self.extra.client())
182 }
183
184 ///
185 /// Return transaction in default state, which can be specialized into ReadOnly or Mutated
186 ///
187 pub fn new_txn(&self) -> TxnType<C::Client> {
188 TxnType::new(self.any_stub())
189 }
190
191 ///
192 /// Create new transaction which can only do queries.
193 ///
194 /// Read-only transactions are useful to increase read speed because they can circumvent the
195 /// usual consensus protocol.
196 ///
197 pub fn new_read_only_txn(&self) -> TxnReadOnlyType<C::Client> {
198 self.new_txn().read_only()
199 }
200
201 ///
202 /// Create new transaction which can only do queries in best effort mode.
203 ///
204 /// Read-only queries can optionally be set as best-effort. Using this flag will ask the
205 /// Dgraph Alpha to try to get timestamps from memory on a best-effort basis to reduce the number
206 /// of outbound requests to Zero. This may yield improved latencies in read-bound workloads where
207 /// linearizable reads are not strictly needed.
208 ///
209 pub fn new_best_effort_txn(&self) -> TxnBestEffortType<C::Client> {
210 self.new_read_only_txn().best_effort()
211 }
212
213 ///
214 /// Create new transaction which can do mutate, commit and discard operations
215 ///
216 pub fn new_mutated_txn(&self) -> TxnMutatedType<C::Client> {
217 self.new_txn().mutated()
218 }
219
220 ///
221 /// The /alter endpoint is used to create or change the schema.
222 ///
223 /// # Arguments
224 ///
225 /// - `op`: Alter operation
226 ///
227 /// # Errors
228 ///
229 /// * gRPC error
230 /// * DB reject alter command
231 ///
232 /// # Example
233 ///
234 /// Install a schema into dgraph. A `name` predicate is string type and has exact index.
235 ///
236 /// ```
237 /// use dgraph_tonic::{Client, Operation};
238 /// #[cfg(feature = "acl")]
239 /// use dgraph_tonic::{AclClientType, LazyChannel};
240 ///
241 /// #[cfg(not(feature = "acl"))]
242 /// async fn client() -> Client {
243 /// Client::new("http://127.0.0.1:19080").expect("Dgraph client")
244 /// }
245 ///
246 /// #[cfg(feature = "acl")]
247 /// async fn client() -> AclClientType<LazyChannel> {
248 /// let default = Client::new("http://127.0.0.1:19080").unwrap();
249 /// default.login("groot", "password").await.expect("Acl client")
250 /// }
251 ///
252 /// #[tokio::main]
253 /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
254 /// let client = client().await;
255 /// let op = Operation {
256 /// schema: "name: string @index(exact) .".into(),
257 /// ..Default::default()
258 /// };
259 /// client.alter(op).await.expect("Schema is not updated");
260 /// Ok(())
261 /// }
262 /// ```
263 ///
264 pub async fn alter(&self, op: Operation) -> Result<Payload> {
265 let mut stub = self.any_stub();
266 stub.alter(op).await
267 }
268
269 ///
270 /// Create or change the schema.
271 ///
272 /// # Arguments
273 ///
274 /// - `schema`: Schema modification
275 ///
276 /// # Errors
277 ///
278 /// * gRPC error
279 /// * DB reject alter command
280 ///
281 /// # Example
282 ///
283 /// Install a schema into dgraph. A `name` predicate is string type and has exact index.
284 ///
285 /// ```
286 /// use dgraph_tonic::{Client, Operation};
287 /// #[cfg(feature = "acl")]
288 /// use dgraph_tonic::{AclClientType, LazyChannel};
289 ///
290 /// #[cfg(not(feature = "acl"))]
291 /// async fn client() -> Client {
292 /// Client::new("http://127.0.0.1:19080").expect("Dgraph client")
293 /// }
294 ///
295 /// #[cfg(feature = "acl")]
296 /// async fn client() -> AclClientType<LazyChannel> {
297 /// let default = Client::new("http://127.0.0.1:19080").unwrap();
298 /// default.login("groot", "password").await.expect("Acl client")
299 /// }
300 ///
301 /// #[tokio::main]
302 /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
303 /// let client = client().await;
304 /// client.set_schema("name: string @index(exact) .").await.expect("Schema is not updated");
305 /// Ok(())
306 /// }
307 /// ```
308 ///
309 pub async fn set_schema<S: Into<String>>(&self, schema: S) -> Result<Payload> {
310 let op = Operation {
311 schema: schema.into(),
312 ..Default::default()
313 };
314 self.alter(op).await
315 }
316
317 ///
318 /// Create or change the schema in background.
319 ///
320 /// # Arguments
321 ///
322 /// - `schema`: Schema modification
323 ///
324 /// # Errors
325 ///
326 /// * gRPC error
327 /// * DB reject alter command
328 ///
329 /// # Example
330 ///
331 /// Install a schema into dgraph. A `name` predicate is string type and has exact index.
332 ///
333 /// ```
334 /// use dgraph_tonic::{Client, Operation};
335 /// #[cfg(feature = "acl")]
336 /// use dgraph_tonic::{AclClientType, LazyChannel};
337 ///
338 /// #[cfg(not(feature = "acl"))]
339 /// async fn client() -> Client {
340 /// Client::new("http://127.0.0.1:19080").expect("Dgraph client")
341 /// }
342 ///
343 /// #[cfg(feature = "acl")]
344 /// async fn client() -> AclClientType<LazyChannel> {
345 /// let default = Client::new("http://127.0.0.1:19080").unwrap();
346 /// default.login("groot", "password").await.expect("Acl client")
347 /// }
348 ///
349 /// #[tokio::main]
350 /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
351 /// let client = client().await;
352 /// client.set_schema_in_background("name: string @index(exact) .").await.expect("Schema is not updated");
353 /// Ok(())
354 /// }
355 /// ```
356 ///
357 #[cfg(any(feature = "dgraph-1-1", feature = "dgraph-21-03"))]
358 pub async fn set_schema_in_background<S: Into<String>>(&self, schema: S) -> Result<Payload> {
359 let op = Operation {
360 schema: schema.into(),
361 run_in_background: true,
362 ..Default::default()
363 };
364 self.alter(op).await
365 }
366
367 ///
368 /// Drop all data in DB
369 ///
370 ///
371 /// # Errors
372 ///
373 /// * gRPC error
374 /// * DB reject alter command
375 ///
376 /// # Example
377 ///
378 ///
379 /// ```
380 /// use dgraph_tonic::{Client, Operation};
381 /// #[cfg(feature = "acl")]
382 /// use dgraph_tonic::{AclClientType, LazyChannel};
383 ///
384 /// #[cfg(not(feature = "acl"))]
385 /// async fn client() -> Client {
386 /// Client::new("http://127.0.0.1:19080").expect("Dgraph client")
387 /// }
388 ///
389 /// #[cfg(feature = "acl")]
390 /// async fn client() -> AclClientType<LazyChannel> {
391 /// let default = Client::new("http://127.0.0.1:19080").unwrap();
392 /// default.login("groot", "password").await.expect("Acl client")
393 /// }
394 ///
395 /// #[tokio::main]
396 /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
397 /// let client = client().await;
398 /// client.drop_all().await.expect("Data not dropped");
399 /// Ok(())
400 /// }
401 /// ```
402 ///
403 pub async fn drop_all(&self) -> Result<Payload> {
404 let op = Operation {
405 drop_all: true,
406 ..Default::default()
407 };
408 self.alter(op).await
409 }
410
411 ///
412 /// Check DB version
413 ///
414 /// # Errors
415 ///
416 /// * gRPC error
417 ///
418 /// # Example
419 ///
420 /// ```
421 /// use dgraph_tonic::{Client, Operation};
422 ///
423 /// #[tokio::main]
424 /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
425 /// let client = Client::new(vec!["http://127.0.0.1:19080"]).expect("Dgraph client");
426 /// let version = client.check_version().await.expect("Version");
427 /// println!("{:#?}", version);
428 /// Ok(())
429 /// }
430 /// ```
431 ///
432 pub async fn check_version(&self) -> Result<Version> {
433 let mut stub = self.any_stub();
434 stub.check_version().await
435 }
436}
437
438#[cfg(test)]
439mod tests {
440 #[cfg(feature = "acl")]
441 use crate::client::{Client, LazyChannel};
442
443 use super::*;
444
445 #[cfg(not(feature = "acl"))]
446 async fn client() -> Client {
447 Client::new("http://127.0.0.1:19080").unwrap()
448 }
449
450 #[cfg(feature = "acl")]
451 async fn client() -> AclClientType<LazyChannel> {
452 let default = Client::new("http://127.0.0.1:19080").unwrap();
453 default.login("groot", "password").await.unwrap()
454 }
455
456 #[tokio::test]
457 async fn alter() {
458 let client = client().await;
459 let op = Operation {
460 schema: "name: string @index(exact) .".into(),
461 ..Default::default()
462 };
463 let response = client.alter(op).await;
464 assert!(response.is_ok());
465 }
466
467 #[tokio::test]
468 async fn drop_all() {
469 let client = client().await;
470 let response = client.drop_all().await;
471 assert!(response.is_ok());
472 }
473
474 #[tokio::test]
475 async fn check_version() {
476 let client = client().await;
477 let response = client.check_version().await;
478 assert!(response.is_ok());
479 }
480}