dgraph_tonic/client/
mod.rs

1use std::convert::TryInto;
2use std::fmt::Debug;
3use std::ops::{Deref, DerefMut};
4
5use anyhow::Result;
6use http::Uri;
7use rand::Rng;
8#[cfg(any(feature = "acl", feature = "slash-ql"))]
9use tonic::codegen::InterceptedService;
10use tonic::transport::{Channel, Endpoint};
11
12use crate::api::dgraph_client::DgraphClient as DClient;
13use crate::api::Version;
14#[cfg(feature = "acl")]
15pub use crate::client::acl::{
16    AclClient, AclClientType, DgraphAclClient, TxnAcl, TxnAclBestEffort, TxnAclMutated,
17    TxnAclReadOnly,
18};
19#[cfg(all(feature = "acl", feature = "tls"))]
20pub use crate::client::acl::{
21    AclTlsClient, TxnAclTls, TxnAclTlsBestEffort, TxnAclTlsMutated, TxnAclTlsReadOnly,
22};
23pub use crate::client::default::{
24    Client, Http, LazyChannel, Txn, TxnBestEffort, TxnMutated, TxnReadOnly,
25};
26pub use crate::client::endpoints::Endpoints;
27use crate::client::lazy::ILazyChannel;
28pub(crate) use crate::client::lazy::ILazyClient;
29#[cfg(feature = "slash-ql")]
30pub use crate::client::slash_ql::{
31    DgraphSlashQlClient, SlashQl, SlashQlClient, TxnSlashQl, TxnSlashQlBestEffort,
32    TxnSlashQlMutated, TxnSlashQlReadOnly,
33};
34#[cfg(feature = "tls")]
35pub use crate::client::tls::{
36    Tls, TlsClient, TxnTls, TxnTlsBestEffort, TxnTlsMutated, TxnTlsReadOnly,
37};
38use crate::errors::ClientError;
39use crate::stub::Stub;
40use crate::{
41    IDgraphClient, Operation, Payload, TxnBestEffortType, TxnMutatedType, TxnReadOnlyType, TxnType,
42};
43
44#[cfg(feature = "acl")]
45pub(crate) mod acl;
46pub(crate) mod default;
47pub(crate) mod endpoints;
48pub(crate) mod lazy;
49#[cfg(feature = "slash-ql")]
50pub(crate) mod slash_ql;
51#[cfg(feature = "tls")]
52pub(crate) mod tls;
53
54///
55/// return random cloned item from vector
56///
57pub(crate) fn rnd_item<T: Clone>(items: &[T]) -> T {
58    let mut rng = rand::thread_rng();
59    let i = rng.gen_range(0..items.len());
60    if let Some(item) = items.get(i) {
61        item.to_owned()
62    } else {
63        unreachable!()
64    }
65}
66
67///
68/// Check if every endpoint is valid uri and also check if at least one endpoint is given
69///
70pub(crate) fn balance_list<U: TryInto<Uri>, E: Into<Endpoints<U>>>(
71    endpoints: E,
72) -> Result<Vec<Uri>> {
73    let endpoints: Endpoints<U> = endpoints.into();
74    let mut balance_list: Vec<Uri> = Vec::new();
75    for maybe_endpoint in endpoints.endpoints {
76        let endpoint = match maybe_endpoint.try_into() {
77            Ok(endpoint) => endpoint,
78            Err(_err) => {
79                return Err(ClientError::InvalidEndpoint.into());
80            }
81        };
82        balance_list.push(endpoint);
83    }
84    if balance_list.is_empty() {
85        return Err(ClientError::NoEndpointsDefined.into());
86    };
87    Ok(balance_list)
88}
89
90///
91/// Available types of DgraphClient
92///
93#[derive(Debug, Clone)]
94pub enum DgraphClient {
95    Default {
96        client: DClient<Channel>,
97    },
98    #[cfg(feature = "acl")]
99    Acl {
100        client: DgraphAclClient,
101    },
102    #[cfg(feature = "slash-ql")]
103    SlashQl {
104        client: DgraphSlashQlClient,
105    },
106}
107
108///
109/// Dgraph client with interceptor
110///
111#[cfg(any(feature = "acl", feature = "slash-ql"))]
112pub type DgraphInterceptorClient<T> = DClient<InterceptedService<Channel, T>>;
113
114///
115/// Allow custom configuration of endpoint
116///
117pub trait EndpointConfig: Send + Sync + Debug {
118    fn configure_endpoint(&self, endpoint: Endpoint) -> Endpoint;
119}
120
121///
122/// Marker for client variant implementation
123///
124pub trait IClient: Debug + Send + Sync {
125    type Client: ILazyClient<Channel = Self::Channel>;
126    type Channel: ILazyChannel;
127    ///
128    /// Return lazy Dgraph gRPC client
129    ///
130    fn client(&self) -> Self::Client;
131
132    ///
133    /// consume self and return all lazy clients
134    ///
135    fn clients(self) -> Vec<Self::Client>;
136}
137
138///
139/// Client state.
140///
141#[derive(Debug, Default)]
142pub struct ClientState;
143
144impl ClientState {
145    ///
146    /// Create new client state
147    ///
148    pub fn new() -> Self {
149        Self::default()
150    }
151}
152
153///
154/// Dgraph client has several variants which offer different behavior.
155///
156#[derive(Debug)]
157pub struct ClientVariant<S: IClient> {
158    state: Box<ClientState>,
159    pub(crate) extra: S,
160}
161
162impl<S: IClient> Deref for ClientVariant<S> {
163    type Target = Box<ClientState>;
164
165    fn deref(&self) -> &Self::Target {
166        &self.state
167    }
168}
169
170impl<S: IClient> DerefMut for ClientVariant<S> {
171    fn deref_mut(&mut self) -> &mut Self::Target {
172        &mut self.state
173    }
174}
175
176impl<C: IClient> ClientVariant<C> {
177    ///
178    /// Return new stub with grpc client implemented according to actual variant.
179    ///
180    fn any_stub(&self) -> Stub<C::Client> {
181        Stub::new(self.extra.client())
182    }
183
184    ///
185    /// Return transaction in default state, which can be specialized into ReadOnly or Mutated
186    ///
187    pub fn new_txn(&self) -> TxnType<C::Client> {
188        TxnType::new(self.any_stub())
189    }
190
191    ///
192    /// Create new transaction which can only do queries.
193    ///
194    /// Read-only transactions are useful to increase read speed because they can circumvent the
195    /// usual consensus protocol.
196    ///
197    pub fn new_read_only_txn(&self) -> TxnReadOnlyType<C::Client> {
198        self.new_txn().read_only()
199    }
200
201    ///
202    /// Create new transaction which can only do queries in best effort mode.
203    ///
204    /// Read-only queries can optionally be set as best-effort. Using this flag will ask the
205    /// Dgraph Alpha to try to get timestamps from memory on a best-effort basis to reduce the number
206    /// of outbound requests to Zero. This may yield improved latencies in read-bound workloads where
207    /// linearizable reads are not strictly needed.
208    ///
209    pub fn new_best_effort_txn(&self) -> TxnBestEffortType<C::Client> {
210        self.new_read_only_txn().best_effort()
211    }
212
213    ///
214    /// Create new transaction which can do mutate, commit and discard operations
215    ///
216    pub fn new_mutated_txn(&self) -> TxnMutatedType<C::Client> {
217        self.new_txn().mutated()
218    }
219
220    ///
221    /// The /alter endpoint is used to create or change the schema.
222    ///
223    /// # Arguments
224    ///
225    /// - `op`: Alter operation
226    ///
227    /// # Errors
228    ///
229    /// * gRPC error
230    /// * DB reject alter command
231    ///
232    /// # Example
233    ///
234    /// Install a schema into dgraph. A `name` predicate is string type and has exact index.
235    ///
236    /// ```
237    /// use dgraph_tonic::{Client, Operation};
238    /// #[cfg(feature = "acl")]
239    /// use dgraph_tonic::{AclClientType, LazyChannel};
240    ///
241    /// #[cfg(not(feature = "acl"))]
242    /// async fn client() -> Client {
243    ///     Client::new("http://127.0.0.1:19080").expect("Dgraph client")
244    /// }
245    ///
246    /// #[cfg(feature = "acl")]
247    /// async fn client() -> AclClientType<LazyChannel> {
248    ///     let default = Client::new("http://127.0.0.1:19080").unwrap();
249    ///     default.login("groot", "password").await.expect("Acl client")
250    /// }
251    ///
252    /// #[tokio::main]
253    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
254    ///     let client = client().await;
255    ///     let op = Operation {
256    ///         schema: "name: string @index(exact) .".into(),
257    ///         ..Default::default()
258    ///     };
259    ///     client.alter(op).await.expect("Schema is not updated");
260    ///     Ok(())
261    /// }
262    /// ```
263    ///
264    pub async fn alter(&self, op: Operation) -> Result<Payload> {
265        let mut stub = self.any_stub();
266        stub.alter(op).await
267    }
268
269    ///
270    /// Create or change the schema.
271    ///
272    /// # Arguments
273    ///
274    /// - `schema`: Schema modification
275    ///
276    /// # Errors
277    ///
278    /// * gRPC error
279    /// * DB reject alter command
280    ///
281    /// # Example
282    ///
283    /// Install a schema into dgraph. A `name` predicate is string type and has exact index.
284    ///
285    /// ```
286    /// use dgraph_tonic::{Client, Operation};
287    /// #[cfg(feature = "acl")]
288    /// use dgraph_tonic::{AclClientType, LazyChannel};
289    ///
290    /// #[cfg(not(feature = "acl"))]
291    /// async fn client() -> Client {
292    ///     Client::new("http://127.0.0.1:19080").expect("Dgraph client")
293    /// }
294    ///
295    /// #[cfg(feature = "acl")]
296    /// async fn client() -> AclClientType<LazyChannel> {
297    ///     let default = Client::new("http://127.0.0.1:19080").unwrap();
298    ///     default.login("groot", "password").await.expect("Acl client")
299    /// }
300    ///
301    /// #[tokio::main]
302    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
303    ///     let client = client().await;
304    ///     client.set_schema("name: string @index(exact) .").await.expect("Schema is not updated");
305    ///     Ok(())
306    /// }
307    /// ```
308    ///
309    pub async fn set_schema<S: Into<String>>(&self, schema: S) -> Result<Payload> {
310        let op = Operation {
311            schema: schema.into(),
312            ..Default::default()
313        };
314        self.alter(op).await
315    }
316
317    ///
318    /// Create or change the schema in background.
319    ///
320    /// # Arguments
321    ///
322    /// - `schema`: Schema modification
323    ///
324    /// # Errors
325    ///
326    /// * gRPC error
327    /// * DB reject alter command
328    ///
329    /// # Example
330    ///
331    /// Install a schema into dgraph. A `name` predicate is string type and has exact index.
332    ///
333    /// ```
334    /// use dgraph_tonic::{Client, Operation};
335    /// #[cfg(feature = "acl")]
336    /// use dgraph_tonic::{AclClientType, LazyChannel};
337    ///
338    /// #[cfg(not(feature = "acl"))]
339    /// async fn client() -> Client {
340    ///     Client::new("http://127.0.0.1:19080").expect("Dgraph client")
341    /// }
342    ///
343    /// #[cfg(feature = "acl")]
344    /// async fn client() -> AclClientType<LazyChannel> {
345    ///     let default = Client::new("http://127.0.0.1:19080").unwrap();
346    ///     default.login("groot", "password").await.expect("Acl client")
347    /// }
348    ///
349    /// #[tokio::main]
350    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
351    ///     let client = client().await;
352    ///     client.set_schema_in_background("name: string @index(exact) .").await.expect("Schema is not updated");
353    ///     Ok(())
354    /// }
355    /// ```
356    ///
357    #[cfg(any(feature = "dgraph-1-1", feature = "dgraph-21-03"))]
358    pub async fn set_schema_in_background<S: Into<String>>(&self, schema: S) -> Result<Payload> {
359        let op = Operation {
360            schema: schema.into(),
361            run_in_background: true,
362            ..Default::default()
363        };
364        self.alter(op).await
365    }
366
367    ///
368    /// Drop all data in DB
369    ///
370    ///
371    /// # Errors
372    ///
373    /// * gRPC error
374    /// * DB reject alter command
375    ///
376    /// # Example
377    ///
378    ///
379    /// ```
380    /// use dgraph_tonic::{Client, Operation};
381    /// #[cfg(feature = "acl")]
382    /// use dgraph_tonic::{AclClientType, LazyChannel};
383    ///
384    /// #[cfg(not(feature = "acl"))]
385    /// async fn client() -> Client {
386    ///     Client::new("http://127.0.0.1:19080").expect("Dgraph client")
387    /// }
388    ///
389    /// #[cfg(feature = "acl")]
390    /// async fn client() -> AclClientType<LazyChannel> {
391    ///     let default = Client::new("http://127.0.0.1:19080").unwrap();
392    ///     default.login("groot", "password").await.expect("Acl client")
393    /// }
394    ///
395    /// #[tokio::main]
396    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
397    ///     let client = client().await;
398    ///     client.drop_all().await.expect("Data not dropped");
399    ///     Ok(())
400    /// }
401    /// ```
402    ///
403    pub async fn drop_all(&self) -> Result<Payload> {
404        let op = Operation {
405            drop_all: true,
406            ..Default::default()
407        };
408        self.alter(op).await
409    }
410
411    ///
412    /// Check DB version
413    ///
414    /// # Errors
415    ///
416    /// * gRPC error
417    ///
418    /// # Example
419    ///
420    /// ```
421    /// use dgraph_tonic::{Client, Operation};
422    ///
423    /// #[tokio::main]
424    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
425    ///     let client = Client::new(vec!["http://127.0.0.1:19080"]).expect("Dgraph client");
426    ///     let version = client.check_version().await.expect("Version");
427    ///     println!("{:#?}", version);
428    ///     Ok(())
429    /// }
430    /// ```
431    ///
432    pub async fn check_version(&self) -> Result<Version> {
433        let mut stub = self.any_stub();
434        stub.check_version().await
435    }
436}
437
438#[cfg(test)]
439mod tests {
440    #[cfg(feature = "acl")]
441    use crate::client::{Client, LazyChannel};
442
443    use super::*;
444
445    #[cfg(not(feature = "acl"))]
446    async fn client() -> Client {
447        Client::new("http://127.0.0.1:19080").unwrap()
448    }
449
450    #[cfg(feature = "acl")]
451    async fn client() -> AclClientType<LazyChannel> {
452        let default = Client::new("http://127.0.0.1:19080").unwrap();
453        default.login("groot", "password").await.unwrap()
454    }
455
456    #[tokio::test]
457    async fn alter() {
458        let client = client().await;
459        let op = Operation {
460            schema: "name: string @index(exact) .".into(),
461            ..Default::default()
462        };
463        let response = client.alter(op).await;
464        assert!(response.is_ok());
465    }
466
467    #[tokio::test]
468    async fn drop_all() {
469        let client = client().await;
470        let response = client.drop_all().await;
471        assert!(response.is_ok());
472    }
473
474    #[tokio::test]
475    async fn check_version() {
476        let client = client().await;
477        let response = client.check_version().await;
478        assert!(response.is_ok());
479    }
480}