mod common;
use apollo_client::{
conf::{
meta::IpValue,
requests::{CachedFetchRequest, FetchRequest, WatchRequest},
ApolloConfClient, ApolloConfClientBuilder,
},
errors::ApolloClientError,
};
use common::{ensure_timeout, setup};
use futures_util::{pin_mut, stream::StreamExt};
use http::status::StatusCode;
use ini::Properties;
use std::time::Duration;
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_cached_fetch_request() {
setup();
let client = new_client_via_config_service();
{
let properties = client
.cached_fetch(CachedFetchRequest {
app_id: "SampleApp".to_string(),
namespace_name: "application".to_string(),
ip: Some(IpValue::HostName),
..Default::default()
})
.await
.unwrap();
assert_eq!(properties, {
let mut props = Properties::new();
props.insert("timeout", "100");
props
});
}
{
let properties = client
.cached_fetch(CachedFetchRequest {
app_id: "SampleApp".to_string(),
namespace_name: "application.json".to_string(),
ip: Some(IpValue::HostName),
..Default::default()
})
.await
.unwrap();
assert_eq!(properties, {
let mut props = Properties::new();
props.insert("content", r#"{"timeout": 100}"#);
props
});
}
{
let result = client
.cached_fetch(CachedFetchRequest {
app_id: "NotExistsApp".to_string(),
namespace_name: "application.json".to_string(),
ip: Some(IpValue::HostName),
..Default::default()
})
.await;
assert!(matches!(
result,
Err(ApolloClientError::ApolloResponse(e)) if e.status == StatusCode::NOT_FOUND
));
}
{
let result = client
.cached_fetch(CachedFetchRequest {
app_id: "SampleApp".to_string(),
namespace_name: "notExistsNamesapce".to_string(),
ip: Some(IpValue::HostName),
..Default::default()
})
.await;
assert!(matches!(
result,
Err(ApolloClientError::ApolloResponse(
e
)) if e.status == StatusCode::NOT_FOUND
));
}
{
let result = client
.cached_fetch(CachedFetchRequest {
app_id: "TestApp1".to_string(),
namespace_name: "application".to_string(),
ip: Some(IpValue::HostName),
..Default::default()
})
.await;
assert_eq!(result.unwrap(), Properties::new());
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_fetch_request() {
setup();
let client = new_client_via_config_service();
{
let response = client
.fetch(FetchRequest {
app_id: "SampleApp".to_string(),
namespace_name: "application.properties".to_string(),
ip: Some(IpValue::HostName),
..Default::default()
})
.await
.unwrap();
assert_eq!(response.app_id, "SampleApp");
assert_eq!(response.cluster, "default");
assert_eq!(response.namespace_name, "application.properties");
assert_eq!(response.configurations["timeout"], "100");
}
{
let response = client
.fetch(FetchRequest {
app_id: "SampleApp".to_string(),
namespace_name: "application.json".to_string(),
ip: Some(IpValue::HostName),
..Default::default()
})
.await
.unwrap();
assert_eq!(response.app_id, "SampleApp");
assert_eq!(response.cluster, "default");
assert_eq!(response.namespace_name, "application.json");
assert_eq!(response.configurations["content"], r#"{"timeout": 100}"#);
}
{
let result = client
.fetch(FetchRequest {
app_id: "NotExistsApp".to_string(),
namespace_name: "application.json".to_string(),
ip: Some(IpValue::HostName),
..Default::default()
})
.await;
assert!(matches!(
result,
Err(ApolloClientError::ApolloResponse(
e
)) if e.status == StatusCode::NOT_FOUND
));
}
{
let result = client
.fetch(FetchRequest {
app_id: "SampleApp".to_string(),
namespace_name: "notExistsNamesapce".to_string(),
ip: Some(IpValue::HostName),
..Default::default()
})
.await;
assert!(matches!(
result,
Err(ApolloClientError::ApolloResponse(
e
)) if e.status == StatusCode::NOT_FOUND
));
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_watch_first() {
setup();
ensure_timeout(Duration::from_secs(10));
{
let client = new_client_via_config_service();
let stream = client.watch(WatchRequest {
app_id: "TestApp1".to_string(),
namespace_names: vec!["foo1".into(), "foo2.properties".into()],
ip: Some(IpValue::HostName),
..Default::default()
});
pin_mut!(stream);
let responses = stream.next().await.unwrap().unwrap();
let foo1_response = responses["foo1"].as_ref().unwrap();
assert_eq!(foo1_response.app_id, "TestApp1");
assert_eq!(foo1_response.namespace_name, "foo1");
assert_eq!(foo1_response.configurations["foo1"], "bar1");
assert_eq!(foo1_response.configurations["foo1"], "bar1");
let foo2_response = responses["foo2.properties"].as_ref().unwrap();
assert_eq!(foo2_response.app_id, "TestApp1");
assert_eq!(foo2_response.namespace_name, "foo2.properties");
assert_eq!(foo2_response.configurations["foo2"], "bar2");
}
{
let client = new_client_via_config_service();
let stream = client.watch(WatchRequest {
app_id: "NotExistsApp".to_string(),
namespace_names: vec!["foo1".into(), "foo2.properties".into()],
ip: Some(IpValue::HostName),
..Default::default()
});
pin_mut!(stream);
let responses = stream.next().await.unwrap().unwrap();
assert!(matches!(
responses["foo1"].as_ref(),
Err(ApolloClientError::ApolloResponse(
e
)) if e.status == StatusCode::NOT_FOUND
));
assert!(matches!(
responses["foo2.properties"].as_ref(),
Err(ApolloClientError::ApolloResponse(
e
)) if e.status == StatusCode::NOT_FOUND
));
}
{
let client = new_client_via_config_service();
let stream = client.watch(WatchRequest {
app_id: "TestApp1".to_string(),
namespace_names: vec!["foo1".into(), "not_exists_namespace".into()],
ip: Some(IpValue::HostName),
..Default::default()
});
pin_mut!(stream);
let responses = stream.next().await.unwrap().unwrap();
assert_eq!(responses["foo1"].as_ref().unwrap().app_id, "TestApp1");
assert_eq!(
responses["foo1"].as_ref().unwrap().configurations["foo1"],
"bar1"
);
assert!(matches!(
responses["not_exists_namespace"].as_ref(),
Err(ApolloClientError::ApolloResponse(
e
)) if e.status == StatusCode::NOT_FOUND
));
}
}
#[cfg(feature = "open")]
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_watch_changed() {
use apollo_client::open::{
meta::{OpenRelease, OpenUpdateItem},
requests::{OpenPublishNamespaceRequest, OpenUpdateItemRequest},
};
use tokio::time::sleep;
setup();
ensure_timeout(Duration::from_secs(15));
let handle = tokio::spawn(async move {
let client = common::create_open_client();
sleep(Duration::from_secs(3)).await;
client
.update_item(OpenUpdateItemRequest {
env: "DEV".to_string(),
app_id: "TestApp2".to_string(),
namespace_name: "watcher".to_string(),
item: OpenUpdateItem {
key: "a".to_string(),
value: "2".to_string(),
comment: Some("a comment".to_string()),
data_change_last_modified_by: "apollo".to_string(),
..Default::default()
},
..Default::default()
})
.await
.unwrap();
client
.publish_namespace(OpenPublishNamespaceRequest {
env: "DEV".to_string(),
app_id: "TestApp2".to_string(),
namespace_name: "watcher".to_string(),
release: OpenRelease {
release_title: "release a".to_string(),
release_comment: Some("release a comment".to_string()),
released_by: "apollo".to_string(),
},
..Default::default()
})
.await
.unwrap();
sleep(Duration::from_secs(3)).await;
client
.update_item(OpenUpdateItemRequest {
env: "DEV".to_string(),
app_id: "TestApp2".to_string(),
namespace_name: "watcher2.json".to_string(),
item: OpenUpdateItem {
key: "content".to_string(),
value: r#"{"timeout":"2000"}"#.to_string(),
comment: Some("timeout comment".to_string()),
data_change_last_modified_by: "apollo".to_string(),
..Default::default()
},
..Default::default()
})
.await
.unwrap();
client
.publish_namespace(OpenPublishNamespaceRequest {
env: "DEV".to_string(),
app_id: "TestApp2".to_string(),
namespace_name: "watcher2.json".to_string(),
release: OpenRelease {
release_title: "release timeout".to_string(),
release_comment: Some("release timeout comment".to_string()),
released_by: "apollo".to_string(),
},
..Default::default()
})
.await
.unwrap();
sleep(Duration::from_secs(3)).await;
client
.update_item(OpenUpdateItemRequest {
env: "DEV".to_string(),
app_id: "TestApp2".to_string(),
namespace_name: "watcher".to_string(),
item: OpenUpdateItem {
key: "a".to_string(),
value: "1".to_string(),
comment: None,
data_change_last_modified_by: "apollo".to_string(),
..Default::default()
},
..Default::default()
})
.await
.unwrap();
client
.update_item(OpenUpdateItemRequest {
env: "DEV".to_string(),
app_id: "TestApp2".to_string(),
namespace_name: "watcher2.json".to_string(),
item: OpenUpdateItem {
key: "content".to_string(),
value: r#"{"timeout":"1500"}"#.to_string(),
data_change_last_modified_by: "apollo".to_string(),
..Default::default()
},
..Default::default()
})
.await
.unwrap();
client
.publish_namespace(OpenPublishNamespaceRequest {
env: "DEV".to_string(),
app_id: "TestApp2".to_string(),
namespace_name: "watcher".to_string(),
release: OpenRelease {
release_title: "restore".to_string(),
released_by: "apollo".to_string(),
..Default::default()
},
..Default::default()
})
.await
.unwrap();
client
.publish_namespace(OpenPublishNamespaceRequest {
env: "DEV".to_string(),
app_id: "TestApp2".to_string(),
namespace_name: "watcher2.json".to_string(),
release: OpenRelease {
release_title: "restore".to_string(),
released_by: "apollo".to_string(),
..Default::default()
},
..Default::default()
})
.await
.unwrap();
});
{
let client = new_client_via_config_service();
let stream = client.watch(WatchRequest {
app_id: "TestApp2".to_string(),
namespace_names: vec!["watcher".into(), "watcher2.json".into()],
ip: Some(IpValue::HostName),
..Default::default()
});
pin_mut!(stream);
let mut index = 0usize;
while let Some(responses) = stream.next().await {
let responses = responses.unwrap();
match index {
0 => {
assert_eq!(responses.len(), 2);
let watcher_response = responses["watcher"].as_ref().unwrap();
assert_eq!(watcher_response.app_id, "TestApp2");
assert_eq!(watcher_response.cluster, "default");
assert_eq!(watcher_response.namespace_name, "watcher");
assert_eq!(watcher_response.configurations.len(), 1);
assert_eq!(watcher_response.configurations["a"], "1");
let watcher2_response = responses["watcher2.json"].as_ref().unwrap();
assert_eq!(watcher2_response.app_id, "TestApp2");
assert_eq!(watcher2_response.cluster, "default");
assert_eq!(watcher2_response.namespace_name, "watcher2.json");
assert_eq!(watcher2_response.configurations.len(), 1);
assert_eq!(
watcher2_response.configurations["content"],
r#"{"timeout":"1500"}"#
);
}
1 => {
assert_eq!(responses.len(), 1);
let watcher_response = responses["watcher"].as_ref().unwrap();
assert_eq!(watcher_response.app_id, "TestApp2");
assert_eq!(watcher_response.cluster, "default");
assert_eq!(watcher_response.namespace_name, "watcher");
assert_eq!(watcher_response.configurations.len(), 1);
assert_eq!(watcher_response.configurations["a"], "2");
}
2 => {
assert_eq!(responses.len(), 1);
let watcher2_response = responses["watcher2.json"].as_ref().unwrap();
assert_eq!(watcher2_response.app_id, "TestApp2");
assert_eq!(watcher2_response.cluster, "default");
assert_eq!(watcher2_response.namespace_name, "watcher2.json");
assert_eq!(watcher2_response.configurations.len(), 1);
assert_eq!(
watcher2_response.configurations["content"],
r#"{"timeout":"2000"}"#
);
break;
}
_ => unreachable!(),
}
index += 1;
}
}
handle.await.unwrap();
}
fn new_client_via_config_service() -> ApolloConfClient {
ApolloConfClientBuilder::new_via_config_service("http://localhost:8080".parse().unwrap())
.unwrap()
.build()
.unwrap()
}