apollo_client/conf/mod.rs
1//! Apollo configuration apis.
2//!
3//! Refs: <https://www.apolloconfig.com/#/zh/usage/other-language-client-user-guide>.
4//!
5//! # Example
6//!
7//! Simple fetch configuration:
8//!
9//! ```
10//! use apollo_client::{
11//! conf::{meta::IpValue, requests::CachedFetchRequest, ApolloConfClientBuilder},
12//! errors::ApolloClientResult,
13//! };
14//! use ini::Properties;
15//! use std::error::Error;
16//! use url::Url;
17//!
18//! #[tokio::main]
19//! async fn main() -> Result<(), Box<dyn Error>> {
20//! env_logger::init();
21//!
22//! // Create configuration client.
23//! let client =
24//! ApolloConfClientBuilder::new_via_config_service(Url::parse("http://localhost:8080")?)?
25//! .build()?;
26//!
27//! // Request apollo cached configuration api.
28//! let configuration: Properties = client
29//! .cached_fetch(CachedFetchRequest {
30//! app_id: "SampleApp".to_string(),
31//! namespace_name: "application.json".to_string(),
32//! ip: Some(IpValue::HostName),
33//! ..Default::default()
34//! })
35//! .await?;
36//!
37//! // Get the content of configuration.
38//! let content = configuration.get("content");
39//! dbg!(content);
40//!
41//! Ok(())
42//! }
43//! ```
44//!
45//! Watch configuration and fetch when changed:
46//!
47//! ```no_run
48//! use apollo_client::conf::{meta::IpValue, requests::WatchRequest, ApolloConfClientBuilder};
49//! use cidr_utils::cidr::IpCidr;
50//! use futures_util::{pin_mut, stream::StreamExt};
51//! use std::{error::Error, str::FromStr};
52//! use url::Url;
53//!
54//! #[tokio::main]
55//! async fn main() -> Result<(), Box<dyn Error>> {
56//! env_logger::init();
57//!
58//! // Create configuration client.
59//! let client =
60//! ApolloConfClientBuilder::new_via_config_service(Url::parse("http://localhost:8080")?)?
61//! .build()?;
62//!
63//! // Request apollo notification api, and fetch configuration when notified.
64//! let stream = client.watch(WatchRequest {
65//! app_id: "SampleApp".to_string(),
66//! namespace_names: vec![
67//! "application.properties".into(),
68//! "application.json".into(),
69//! "application.yml".into(),
70//! ],
71//! ip: Some(IpValue::HostCidr(IpCidr::from_str("172.16.0.0/16")?)),
72//! ..Default::default()
73//! });
74//!
75//! pin_mut!(stream);
76//!
77//! // There is a dead loop, `next()` is returned when configuration has changed.
78//! while let Some(response) = stream.next().await {
79//! let responses = response?;
80//! for response in responses {
81//! let _ = dbg!(response);
82//! }
83//! }
84//!
85//! Ok(())
86//! }
87//! ```
88
89pub mod meta;
90pub mod requests;
91pub mod responses;
92
93use crate::{
94 conf::{
95 meta::Notification,
96 requests::{
97 CachedFetchRequest, FetchRequest, NotifyRequest, PerformConfRequest, WatchRequest,
98 },
99 responses::FetchResponse,
100 },
101 errors::{ApolloClientError::ApolloResponse, ApolloClientResult},
102 meta::{
103 handle_url, validate_response, PerformResponse, DEFAULT_NOTIFY_TIMEOUT, DEFAULT_TIMEOUT,
104 },
105};
106use async_stream::stream;
107use futures_core::Stream;
108use futures_util::{stream, StreamExt};
109use http::status::StatusCode;
110use ini::Properties;
111use reqwest::{Client, ClientBuilder};
112use std::collections::HashMap;
113use url::Url;
114
115#[derive(Clone)]
116enum ServerUrl {
117 ConfigServer(Url),
118 /// Todo implement fetch config via meta server.
119 #[allow(dead_code)]
120 MetaServer(Url),
121}
122
123/// Builder for [ApolloConfClient].
124pub struct ApolloConfClientBuilder {
125 server_url: ServerUrl,
126 client_builder: ClientBuilder,
127}
128
129impl ApolloConfClientBuilder {
130 /// Create a client request api via config service.
131 ///
132 /// # Example
133 ///
134 /// ```
135 /// use apollo_client::conf::ApolloConfClientBuilder;
136 /// use url::Url;
137 ///
138 /// let _builder = ApolloConfClientBuilder::new_via_config_service(
139 /// Url::parse("http://localhost:8080").unwrap(),
140 /// )
141 /// .unwrap();
142 /// ```
143 pub fn new_via_config_service(config_server_url: Url) -> ApolloClientResult<Self> {
144 let mut builder = Self {
145 server_url: ServerUrl::ConfigServer(config_server_url),
146 client_builder: Default::default(),
147 };
148 builder.client_builder = builder.client_builder.timeout(DEFAULT_TIMEOUT);
149 Ok(builder)
150 }
151
152 /// Customize inner http client.
153 ///
154 /// # Example
155 ///
156 /// ```no_run
157 /// use apollo_client::conf::ApolloConfClientBuilder;
158 /// use std::time::Duration;
159 /// use url::Url;
160 ///
161 /// ApolloConfClientBuilder::new_via_config_service(Url::parse("http://localhost:8080").unwrap())
162 /// .unwrap()
163 /// .with_client_builder(|builder| builder.timeout(Duration::from_secs(6)));
164 /// ```
165 pub fn with_client_builder(mut self, f: impl FnOnce(ClientBuilder) -> ClientBuilder) -> Self {
166 self.client_builder = f(self.client_builder);
167 self
168 }
169
170 /// Build the [ApolloConfClient].
171 pub fn build(self) -> ApolloClientResult<ApolloConfClient> {
172 Ok(ApolloConfClient {
173 server_url: self.server_url,
174 client: self.client_builder.build()?,
175 })
176 }
177}
178
179/// Apollo configuration apis client.
180#[derive(Clone)]
181pub struct ApolloConfClient {
182 server_url: ServerUrl,
183 client: Client,
184}
185
186impl ApolloConfClient {
187 /// 通过带缓存的Http接口从Apollo读取配置。
188 /// [Ref](https://www.apolloconfig.com/#/zh/usage/other-language-client-user-guide?id=_12-%e9%80%9a%e8%bf%87%e5%b8%a6%e7%bc%93%e5%ad%98%e7%9a%84http%e6%8e%a5%e5%8f%a3%e4%bb%8eapollo%e8%af%bb%e5%8f%96%e9%85%8d%e7%bd%ae)
189 pub async fn cached_fetch(
190 &self,
191 request: CachedFetchRequest,
192 ) -> ApolloClientResult<Properties> {
193 self.execute(request).await
194 }
195
196 /// 通过不带缓存的Http接口从Apollo读取配置。
197 /// [Ref](https://www.apolloconfig.com/#/zh/usage/other-language-client-user-guide?id=_13-%e9%80%9a%e8%bf%87%e4%b8%8d%e5%b8%a6%e7%bc%93%e5%ad%98%e7%9a%84http%e6%8e%a5%e5%8f%a3%e4%bb%8eapollo%e8%af%bb%e5%8f%96%e9%85%8d%e7%bd%ae)
198 pub async fn fetch(&self, request: FetchRequest) -> ApolloClientResult<FetchResponse> {
199 self.execute(request).await
200 }
201
202 /// 应用感知配置更新。
203 /// [Ref](https://www.apolloconfig.com/#/zh/usage/other-language-client-user-guide?id=_14-%e5%ba%94%e7%94%a8%e6%84%9f%e7%9f%a5%e9%85%8d%e7%bd%ae%e6%9b%b4%e6%96%b0)
204 pub async fn notify(&self, request: NotifyRequest) -> ApolloClientResult<Vec<Notification>> {
205 self.execute(request).await
206 }
207
208 async fn execute<R: PerformResponse>(
209 &self,
210 request: impl PerformConfRequest<Response = R>,
211 ) -> ApolloClientResult<R> {
212 let url = match &self.server_url {
213 ServerUrl::ConfigServer(url) => handle_url(&request, url.clone())?,
214 ServerUrl::MetaServer(_) => todo!("unreachable here now"),
215 };
216 let mut request_builder = self.client.request(request.method(), url);
217 request_builder = request.request_builder(request_builder);
218 let response = request_builder.send().await?;
219 let response = validate_response(response).await?;
220 <R>::from_response(response).await
221 }
222
223 /// Watch the multi namespaces change, and fetch namespaces configuration when changed.
224 ///
225 /// Return the Stream implemented [futures_core::Stream], and the return value of `poll_next`
226 /// will never be None (Dead Loop).
227 ///
228 /// The first `poll_next` will fetch all namespaces, the remained will only fetch changed
229 /// namespaces.
230 ///
231 /// # Panic
232 ///
233 /// panic if `request.namespace_names` is empty.
234 ///
235 /// # Example
236 ///
237 /// ```no_run
238 /// use apollo_client::conf::{meta::IpValue, requests::WatchRequest, ApolloConfClient};
239 /// use cidr_utils::cidr::IpCidr;
240 /// use futures_util::{pin_mut, stream::StreamExt};
241 /// use std::{error::Error, str::FromStr};
242 ///
243 /// #[tokio::main]
244 /// async fn main() -> Result<(), Box<dyn Error>> {
245 /// let client: ApolloConfClient = todo!();
246 ///
247 /// let stream = client.watch(WatchRequest {
248 /// app_id: "SampleApp".to_string(),
249 /// namespace_names: vec![
250 /// "application.properties".into(),
251 /// "application.json".into(),
252 /// "application.yml".into(),
253 /// ],
254 /// ip: Some(IpValue::HostCidr(IpCidr::from_str("172.16.0.0/16")?)),
255 /// ..Default::default()
256 /// });
257 ///
258 /// pin_mut!(stream);
259 ///
260 /// // This is a dead loop, `next()` is returned when configuration has changed.
261 /// while let Some(response) = stream.next().await {
262 /// let _ = response?;
263 /// }
264 ///
265 /// Ok(())
266 /// }
267 /// ```
268 pub fn watch(
269 &self,
270 request: WatchRequest,
271 ) -> impl Stream<Item = ApolloClientResult<HashMap<String, ApolloClientResult<FetchResponse>>>> + '_
272 {
273 let mut watch_notifications = request.create_notifications();
274 let mut fetch_notifications = watch_notifications.clone();
275 assert_ne!(
276 watch_notifications.len(),
277 0,
278 "watch namespaces should not be null"
279 );
280
281 stream! {
282 loop {
283 let requests = Notification::create_fetch_requests(fetch_notifications, &request);
284 yield Ok(self.fetch_multi(requests).await);
285
286 loop {
287 match self
288 .execute(NotifyRequest::from_watch(
289 &request,
290 watch_notifications.clone(),
291 DEFAULT_NOTIFY_TIMEOUT,
292 ))
293 .await
294 {
295 Ok(notifications) => {
296 let is_uninitialized = watch_notifications[0].is_uninitialized();
297 Notification::update_notifications(
298 &mut watch_notifications,
299 ¬ifications,
300 );
301 fetch_notifications = notifications;
302 if !is_uninitialized {
303 break;
304 }
305 },
306 Err(ApolloResponse(e)) if e.status == StatusCode::NOT_MODIFIED => {},
307 Err(e) => yield Err(e),
308 }
309 }
310 }
311 }
312 }
313
314 async fn fetch_multi(
315 &self,
316 requests: Vec<FetchRequest>,
317 ) -> HashMap<String, ApolloClientResult<FetchResponse>> {
318 let executors = requests.into_iter().map(|fetch_request| async move {
319 (
320 fetch_request.namespace_name(),
321 self.execute(fetch_request).await,
322 )
323 });
324
325 let executors_len = executors.len();
326 let executors_stream = stream::iter(executors);
327 let mut buffered = executors_stream.buffer_unordered(executors_len);
328
329 let mut map = HashMap::with_capacity(executors_len);
330 while let Some(item) = buffered.next().await {
331 map.insert(item.0, item.1);
332 }
333 map
334 }
335}