dgraph_tonic/client/
default.rs

1use std::convert::TryInto;
2
3use anyhow::Result;
4use async_trait::async_trait;
5use http::Uri;
6use std::fmt::Debug;
7use std::sync::Arc;
8use tonic::transport::Channel;
9use tracing::trace;
10use tracing_attributes::instrument;
11
12use crate::client::lazy::{ILazyChannel, LazyClient};
13use crate::client::{balance_list, rnd_item, ClientState, ClientVariant, IClient};
14use crate::{
15    Endpoint, EndpointConfig, Endpoints, TxnBestEffortType, TxnMutatedType, TxnReadOnlyType,
16    TxnType,
17};
18
19///
20/// Lazy initialization of gRPC channel
21///
22#[derive(Clone, Debug)]
23pub struct LazyChannel {
24    uri: Uri,
25    channel: Option<Channel>,
26    endpoint_config: Option<Arc<dyn EndpointConfig>>,
27}
28
29impl LazyChannel {
30    fn new(uri: Uri) -> Self {
31        Self {
32            uri,
33            channel: None,
34            endpoint_config: None,
35        }
36    }
37
38    fn with_endpoint_config(mut self, endpoint_config: Option<Arc<dyn EndpointConfig>>) -> Self {
39        self.endpoint_config = endpoint_config;
40        self
41    }
42}
43
44#[async_trait]
45impl ILazyChannel for LazyChannel {
46    async fn channel(&mut self) -> Result<Channel> {
47        if let Some(channel) = &self.channel {
48            Ok(channel.to_owned())
49        } else {
50            let mut endpoint: Endpoint = self.uri.to_owned().into();
51            if let Some(endpoint_config) = &self.endpoint_config {
52                endpoint = endpoint_config.configure_endpoint(endpoint);
53            }
54            let channel = endpoint.connect().await?;
55            self.channel.replace(channel.to_owned());
56            Ok(channel)
57        }
58    }
59}
60
61///
62/// Inner state for default Client
63///
64#[derive(Debug)]
65#[doc(hidden)]
66pub struct Http {
67    clients: Vec<LazyClient<LazyChannel>>,
68}
69
70#[async_trait]
71impl IClient for Http {
72    type Client = LazyClient<Self::Channel>;
73    type Channel = LazyChannel;
74
75    fn client(&self) -> Self::Client {
76        rnd_item(&self.clients)
77    }
78
79    fn clients(self) -> Vec<Self::Client> {
80        self.clients
81    }
82}
83
84///
85/// Default client.
86///
87pub type Client = ClientVariant<Http>;
88
89///
90/// Txn over http
91///
92pub type Txn = TxnType<LazyClient<LazyChannel>>;
93
94///
95/// Readonly txn over http
96///
97pub type TxnReadOnly = TxnReadOnlyType<LazyClient<LazyChannel>>;
98
99///
100/// Best effort txn over http
101///
102pub type TxnBestEffort = TxnBestEffortType<LazyClient<LazyChannel>>;
103
104///
105/// Mutated txn over http
106///
107pub type TxnMutated = TxnMutatedType<LazyClient<LazyChannel>>;
108
109impl Client {
110    fn init_clients<S: TryInto<Uri>, E: Into<Endpoints<S>> + Debug>(
111        endpoints: E,
112        endpoint_config: Option<Arc<dyn EndpointConfig>>,
113    ) -> Result<Vec<LazyClient<LazyChannel>>> {
114        Ok(balance_list(endpoints)?
115            .into_iter()
116            .map(|uri| {
117                LazyClient::new(LazyChannel::new(uri).with_endpoint_config(endpoint_config.clone()))
118            })
119            .collect())
120    }
121
122    ///
123    /// Create new Dgraph client for interacting v DB.
124    ///
125    /// The client can be backed by multiple endpoints (to the same server, or multiple servers in a cluster).
126    ///
127    /// # Arguments
128    ///
129    /// * `endpoints` - one endpoint or vector of endpoints
130    ///
131    /// # Errors
132    ///
133    /// * endpoints vector is empty
134    /// * item in vector cannot by converted into Uri
135    ///
136    /// # Example
137    ///
138    /// ```
139    /// use dgraph_tonic::Client;
140    ///
141    /// // vector of endpoints
142    /// let client = Client::new(vec!["http://127.0.0.1:19080", "http://127.0.0.1:19080"]).expect("Dgraph client");
143    /// // one endpoint
144    /// let client = Client::new("http://127.0.0.1:19080").expect("Dgraph client");
145    /// ```
146    ///
147    #[instrument]
148    pub fn new<S: TryInto<Uri>, E: Into<Endpoints<S>> + Debug>(endpoints: E) -> Result<Self> {
149        let extra = Http {
150            clients: Self::init_clients(endpoints, None)?,
151        };
152        let state = Box::new(ClientState::new());
153        trace!("New http client");
154        Ok(Self { state, extra })
155    }
156
157    ///
158    /// Create new Dgraph client with custom endpoint configuration for interacting with DB.
159    ///
160    /// The client can be backed by multiple endpoints (to the same server, or multiple servers in a cluster).
161    ///
162    /// # Arguments
163    ///
164    /// * `endpoints` - one endpoint or vector of endpoints
165    /// * `endpoint_config` - custom endpoint configuration
166    ///
167    /// # Errors
168    ///
169    /// * endpoints vector is empty
170    /// * item in vector cannot by converted into Uri
171    ///
172    /// # Example
173    ///
174    /// ```
175    /// use dgraph_tonic::{Endpoint, EndpointConfig, Client};
176    ///
177    /// use std::time::Duration;
178    ///
179    /// #[derive(Debug, Default, Clone)]
180    /// struct EndpointWithTimeout {}
181    ///
182    /// impl EndpointConfig for EndpointWithTimeout {
183    ///     fn configure_endpoint(&self, endpoint: Endpoint) -> Endpoint {
184    ///         endpoint.timeout(Duration::from_secs(5))
185    ///     }
186    /// }
187    ///
188    /// // custom configuration
189    /// let endpoint_config = EndpointWithTimeout::default();
190    /// // vector of endpoints
191    /// let client = Client::new_with_endpoint_config(vec!["http://127.0.0.1:19080", "http://127.0.0.1:19080"], endpoint_config.clone()).expect("Dgraph client");
192    /// // one endpoint
193    /// let client = Client::new_with_endpoint_config("http://127.0.0.1:19080", endpoint_config).expect("Dgraph client");
194    /// ```
195    ///
196    #[instrument]
197    pub fn new_with_endpoint_config<
198        S: TryInto<Uri>,
199        E: Into<Endpoints<S>> + Debug,
200        C: EndpointConfig + 'static,
201    >(
202        endpoints: E,
203        endpoint_config: C,
204    ) -> Result<Self> {
205        let extra = Http {
206            clients: Self::init_clients(endpoints, Some(Arc::new(endpoint_config)))?,
207        };
208        let state = Box::new(ClientState::new());
209        trace!("New http client");
210        Ok(Self { state, extra })
211    }
212}