dgraph_tonic/client/
default.rs1use std::convert::TryInto;
2
3use anyhow::Result;
4use async_trait::async_trait;
5use http::Uri;
6use std::fmt::Debug;
7use std::sync::Arc;
8use tonic::transport::Channel;
9use tracing::trace;
10use tracing_attributes::instrument;
11
12use crate::client::lazy::{ILazyChannel, LazyClient};
13use crate::client::{balance_list, rnd_item, ClientState, ClientVariant, IClient};
14use crate::{
15 Endpoint, EndpointConfig, Endpoints, TxnBestEffortType, TxnMutatedType, TxnReadOnlyType,
16 TxnType,
17};
18
19#[derive(Clone, Debug)]
23pub struct LazyChannel {
24 uri: Uri,
25 channel: Option<Channel>,
26 endpoint_config: Option<Arc<dyn EndpointConfig>>,
27}
28
29impl LazyChannel {
30 fn new(uri: Uri) -> Self {
31 Self {
32 uri,
33 channel: None,
34 endpoint_config: None,
35 }
36 }
37
38 fn with_endpoint_config(mut self, endpoint_config: Option<Arc<dyn EndpointConfig>>) -> Self {
39 self.endpoint_config = endpoint_config;
40 self
41 }
42}
43
44#[async_trait]
45impl ILazyChannel for LazyChannel {
46 async fn channel(&mut self) -> Result<Channel> {
47 if let Some(channel) = &self.channel {
48 Ok(channel.to_owned())
49 } else {
50 let mut endpoint: Endpoint = self.uri.to_owned().into();
51 if let Some(endpoint_config) = &self.endpoint_config {
52 endpoint = endpoint_config.configure_endpoint(endpoint);
53 }
54 let channel = endpoint.connect().await?;
55 self.channel.replace(channel.to_owned());
56 Ok(channel)
57 }
58 }
59}
60
61#[derive(Debug)]
65#[doc(hidden)]
66pub struct Http {
67 clients: Vec<LazyClient<LazyChannel>>,
68}
69
70#[async_trait]
71impl IClient for Http {
72 type Client = LazyClient<Self::Channel>;
73 type Channel = LazyChannel;
74
75 fn client(&self) -> Self::Client {
76 rnd_item(&self.clients)
77 }
78
79 fn clients(self) -> Vec<Self::Client> {
80 self.clients
81 }
82}
83
84pub type Client = ClientVariant<Http>;
88
89pub type Txn = TxnType<LazyClient<LazyChannel>>;
93
94pub type TxnReadOnly = TxnReadOnlyType<LazyClient<LazyChannel>>;
98
99pub type TxnBestEffort = TxnBestEffortType<LazyClient<LazyChannel>>;
103
104pub type TxnMutated = TxnMutatedType<LazyClient<LazyChannel>>;
108
109impl Client {
110 fn init_clients<S: TryInto<Uri>, E: Into<Endpoints<S>> + Debug>(
111 endpoints: E,
112 endpoint_config: Option<Arc<dyn EndpointConfig>>,
113 ) -> Result<Vec<LazyClient<LazyChannel>>> {
114 Ok(balance_list(endpoints)?
115 .into_iter()
116 .map(|uri| {
117 LazyClient::new(LazyChannel::new(uri).with_endpoint_config(endpoint_config.clone()))
118 })
119 .collect())
120 }
121
122 #[instrument]
148 pub fn new<S: TryInto<Uri>, E: Into<Endpoints<S>> + Debug>(endpoints: E) -> Result<Self> {
149 let extra = Http {
150 clients: Self::init_clients(endpoints, None)?,
151 };
152 let state = Box::new(ClientState::new());
153 trace!("New http client");
154 Ok(Self { state, extra })
155 }
156
157 #[instrument]
197 pub fn new_with_endpoint_config<
198 S: TryInto<Uri>,
199 E: Into<Endpoints<S>> + Debug,
200 C: EndpointConfig + 'static,
201 >(
202 endpoints: E,
203 endpoint_config: C,
204 ) -> Result<Self> {
205 let extra = Http {
206 clients: Self::init_clients(endpoints, Some(Arc::new(endpoint_config)))?,
207 };
208 let state = Box::new(ClientState::new());
209 trace!("New http client");
210 Ok(Self { state, extra })
211 }
212}