apollo_client/conf/
mod.rs

1//! Apollo configuration apis.
2//!
3//! Refs: <https://www.apolloconfig.com/#/zh/usage/other-language-client-user-guide>.
4//!
5//! # Example
6//!
7//! Simple fetch configuration:
8//!
9//! ```
10//! use apollo_client::{
11//!     conf::{meta::IpValue, requests::CachedFetchRequest, ApolloConfClientBuilder},
12//!     errors::ApolloClientResult,
13//! };
14//! use ini::Properties;
15//! use std::error::Error;
16//! use url::Url;
17//!
18//! #[tokio::main]
19//! async fn main() -> Result<(), Box<dyn Error>> {
20//!     env_logger::init();
21//!
22//!     // Create configuration client.
23//!     let client =
24//!         ApolloConfClientBuilder::new_via_config_service(Url::parse("http://localhost:8080")?)?
25//!             .build()?;
26//!
27//!     // Request apollo cached configuration api.
28//!     let configuration: Properties = client
29//!         .cached_fetch(CachedFetchRequest {
30//!             app_id: "SampleApp".to_string(),
31//!             namespace_name: "application.json".to_string(),
32//!             ip: Some(IpValue::HostName),
33//!             ..Default::default()
34//!         })
35//!         .await?;
36//!
37//!     // Get the content of configuration.
38//!     let content = configuration.get("content");
39//!     dbg!(content);
40//!
41//!     Ok(())
42//! }
43//! ```
44//!
45//! Watch configuration and fetch when changed:
46//!
47//! ```no_run
48//! use apollo_client::conf::{meta::IpValue, requests::WatchRequest, ApolloConfClientBuilder};
49//! use cidr_utils::cidr::IpCidr;
50//! use futures_util::{pin_mut, stream::StreamExt};
51//! use std::{error::Error, str::FromStr};
52//! use url::Url;
53//!
54//! #[tokio::main]
55//! async fn main() -> Result<(), Box<dyn Error>> {
56//!     env_logger::init();
57//!
58//!     // Create configuration client.
59//!     let client =
60//!         ApolloConfClientBuilder::new_via_config_service(Url::parse("http://localhost:8080")?)?
61//!             .build()?;
62//!
63//!     // Request apollo notification api, and fetch configuration when notified.
64//!     let stream = client.watch(WatchRequest {
65//!         app_id: "SampleApp".to_string(),
66//!         namespace_names: vec![
67//!             "application.properties".into(),
68//!             "application.json".into(),
69//!             "application.yml".into(),
70//!         ],
71//!         ip: Some(IpValue::HostCidr(IpCidr::from_str("172.16.0.0/16")?)),
72//!         ..Default::default()
73//!     });
74//!
75//!     pin_mut!(stream);
76//!
77//!     // There is a dead loop, `next()` is returned when configuration has changed.
78//!     while let Some(response) = stream.next().await {
79//!         let responses = response?;
80//!         for response in responses {
81//!             let _ = dbg!(response);
82//!         }
83//!     }
84//!
85//!     Ok(())
86//! }
87//! ```
88
89pub mod meta;
90pub mod requests;
91pub mod responses;
92
93use crate::{
94    conf::{
95        meta::Notification,
96        requests::{
97            CachedFetchRequest, FetchRequest, NotifyRequest, PerformConfRequest, WatchRequest,
98        },
99        responses::FetchResponse,
100    },
101    errors::{ApolloClientError::ApolloResponse, ApolloClientResult},
102    meta::{
103        handle_url, validate_response, PerformResponse, DEFAULT_NOTIFY_TIMEOUT, DEFAULT_TIMEOUT,
104    },
105};
106use async_stream::stream;
107use futures_core::Stream;
108use futures_util::{stream, StreamExt};
109use http::status::StatusCode;
110use ini::Properties;
111use reqwest::{Client, ClientBuilder};
112use std::collections::HashMap;
113use url::Url;
114
115#[derive(Clone)]
116enum ServerUrl {
117    ConfigServer(Url),
118    /// Todo implement fetch config via meta server.
119    #[allow(dead_code)]
120    MetaServer(Url),
121}
122
123/// Builder for [ApolloConfClient].
124pub struct ApolloConfClientBuilder {
125    server_url: ServerUrl,
126    client_builder: ClientBuilder,
127}
128
129impl ApolloConfClientBuilder {
130    /// Create a client request api via config service.
131    ///
132    /// # Example
133    ///
134    /// ```
135    /// use apollo_client::conf::ApolloConfClientBuilder;
136    /// use url::Url;
137    ///
138    /// let _builder = ApolloConfClientBuilder::new_via_config_service(
139    ///     Url::parse("http://localhost:8080").unwrap(),
140    /// )
141    /// .unwrap();
142    /// ```
143    pub fn new_via_config_service(config_server_url: Url) -> ApolloClientResult<Self> {
144        let mut builder = Self {
145            server_url: ServerUrl::ConfigServer(config_server_url),
146            client_builder: Default::default(),
147        };
148        builder.client_builder = builder.client_builder.timeout(DEFAULT_TIMEOUT);
149        Ok(builder)
150    }
151
152    /// Customize inner http client.
153    ///
154    /// # Example
155    ///
156    /// ```no_run
157    /// use apollo_client::conf::ApolloConfClientBuilder;
158    /// use std::time::Duration;
159    /// use url::Url;
160    ///
161    /// ApolloConfClientBuilder::new_via_config_service(Url::parse("http://localhost:8080").unwrap())
162    ///     .unwrap()
163    ///     .with_client_builder(|builder| builder.timeout(Duration::from_secs(6)));
164    /// ```
165    pub fn with_client_builder(mut self, f: impl FnOnce(ClientBuilder) -> ClientBuilder) -> Self {
166        self.client_builder = f(self.client_builder);
167        self
168    }
169
170    /// Build the [ApolloConfClient].
171    pub fn build(self) -> ApolloClientResult<ApolloConfClient> {
172        Ok(ApolloConfClient {
173            server_url: self.server_url,
174            client: self.client_builder.build()?,
175        })
176    }
177}
178
179/// Apollo configuration apis client.
180#[derive(Clone)]
181pub struct ApolloConfClient {
182    server_url: ServerUrl,
183    client: Client,
184}
185
186impl ApolloConfClient {
187    /// 通过带缓存的Http接口从Apollo读取配置。
188    /// [Ref](https://www.apolloconfig.com/#/zh/usage/other-language-client-user-guide?id=_12-%e9%80%9a%e8%bf%87%e5%b8%a6%e7%bc%93%e5%ad%98%e7%9a%84http%e6%8e%a5%e5%8f%a3%e4%bb%8eapollo%e8%af%bb%e5%8f%96%e9%85%8d%e7%bd%ae)
189    pub async fn cached_fetch(
190        &self,
191        request: CachedFetchRequest,
192    ) -> ApolloClientResult<Properties> {
193        self.execute(request).await
194    }
195
196    /// 通过不带缓存的Http接口从Apollo读取配置。
197    /// [Ref](https://www.apolloconfig.com/#/zh/usage/other-language-client-user-guide?id=_13-%e9%80%9a%e8%bf%87%e4%b8%8d%e5%b8%a6%e7%bc%93%e5%ad%98%e7%9a%84http%e6%8e%a5%e5%8f%a3%e4%bb%8eapollo%e8%af%bb%e5%8f%96%e9%85%8d%e7%bd%ae)
198    pub async fn fetch(&self, request: FetchRequest) -> ApolloClientResult<FetchResponse> {
199        self.execute(request).await
200    }
201
202    /// 应用感知配置更新。
203    /// [Ref](https://www.apolloconfig.com/#/zh/usage/other-language-client-user-guide?id=_14-%e5%ba%94%e7%94%a8%e6%84%9f%e7%9f%a5%e9%85%8d%e7%bd%ae%e6%9b%b4%e6%96%b0)
204    pub async fn notify(&self, request: NotifyRequest) -> ApolloClientResult<Vec<Notification>> {
205        self.execute(request).await
206    }
207
208    async fn execute<R: PerformResponse>(
209        &self,
210        request: impl PerformConfRequest<Response = R>,
211    ) -> ApolloClientResult<R> {
212        let url = match &self.server_url {
213            ServerUrl::ConfigServer(url) => handle_url(&request, url.clone())?,
214            ServerUrl::MetaServer(_) => todo!("unreachable here now"),
215        };
216        let mut request_builder = self.client.request(request.method(), url);
217        request_builder = request.request_builder(request_builder);
218        let response = request_builder.send().await?;
219        let response = validate_response(response).await?;
220        <R>::from_response(response).await
221    }
222
223    /// Watch the multi namespaces change, and fetch namespaces configuration when changed.
224    ///
225    /// Return the Stream implemented [futures_core::Stream], and the return value of `poll_next`
226    /// will never be None (Dead Loop).
227    ///
228    /// The first `poll_next` will fetch all namespaces, the remained will only fetch changed
229    /// namespaces.
230    ///
231    /// # Panic
232    ///
233    /// panic if `request.namespace_names` is empty.
234    ///
235    /// # Example
236    ///
237    /// ```no_run
238    /// use apollo_client::conf::{meta::IpValue, requests::WatchRequest, ApolloConfClient};
239    /// use cidr_utils::cidr::IpCidr;
240    /// use futures_util::{pin_mut, stream::StreamExt};
241    /// use std::{error::Error, str::FromStr};
242    ///
243    /// #[tokio::main]
244    /// async fn main() -> Result<(), Box<dyn Error>> {
245    ///     let client: ApolloConfClient = todo!();
246    ///
247    ///     let stream = client.watch(WatchRequest {
248    ///         app_id: "SampleApp".to_string(),
249    ///         namespace_names: vec![
250    ///             "application.properties".into(),
251    ///             "application.json".into(),
252    ///             "application.yml".into(),
253    ///         ],
254    ///         ip: Some(IpValue::HostCidr(IpCidr::from_str("172.16.0.0/16")?)),
255    ///         ..Default::default()
256    ///     });
257    ///
258    ///     pin_mut!(stream);
259    ///
260    ///     // This is a dead loop, `next()` is returned when configuration has changed.
261    ///     while let Some(response) = stream.next().await {
262    ///         let _ = response?;
263    ///     }
264    ///
265    ///     Ok(())
266    /// }
267    /// ```
268    pub fn watch(
269        &self,
270        request: WatchRequest,
271    ) -> impl Stream<Item = ApolloClientResult<HashMap<String, ApolloClientResult<FetchResponse>>>> + '_
272    {
273        let mut watch_notifications = request.create_notifications();
274        let mut fetch_notifications = watch_notifications.clone();
275        assert_ne!(
276            watch_notifications.len(),
277            0,
278            "watch namespaces should not be null"
279        );
280
281        stream! {
282            loop {
283                let requests = Notification::create_fetch_requests(fetch_notifications, &request);
284                yield Ok(self.fetch_multi(requests).await);
285
286                loop {
287                    match self
288                        .execute(NotifyRequest::from_watch(
289                            &request,
290                            watch_notifications.clone(),
291                            DEFAULT_NOTIFY_TIMEOUT,
292                        ))
293                        .await
294                    {
295                        Ok(notifications) => {
296                            let is_uninitialized = watch_notifications[0].is_uninitialized();
297                            Notification::update_notifications(
298                                &mut watch_notifications,
299                                &notifications,
300                            );
301                            fetch_notifications = notifications;
302                            if !is_uninitialized {
303                                break;
304                            }
305                        },
306                        Err(ApolloResponse(e)) if e.status == StatusCode::NOT_MODIFIED => {},
307                        Err(e) => yield Err(e),
308                    }
309                }
310            }
311        }
312    }
313
314    async fn fetch_multi(
315        &self,
316        requests: Vec<FetchRequest>,
317    ) -> HashMap<String, ApolloClientResult<FetchResponse>> {
318        let executors = requests.into_iter().map(|fetch_request| async move {
319            (
320                fetch_request.namespace_name(),
321                self.execute(fetch_request).await,
322            )
323        });
324
325        let executors_len = executors.len();
326        let executors_stream = stream::iter(executors);
327        let mut buffered = executors_stream.buffer_unordered(executors_len);
328
329        let mut map = HashMap::with_capacity(executors_len);
330        while let Some(item) = buffered.next().await {
331            map.insert(item.0, item.1);
332        }
333        map
334    }
335}