use crate::FPError;
use crate::Repository;
use headers::HeaderValue;
use parking_lot::{Mutex, RwLock};
use reqwest::{header::AUTHORIZATION, Client, Method};
use std::{sync::mpsc::sync_channel, time::Instant};
use std::{sync::Arc, time::Duration};
use tracing::trace;
use tracing::{debug, error};
use url::Url;
pub type UpdateCallback = Box<dyn Fn(Repository, Repository, SyncType) + Send>;
#[derive(Debug, Clone)]
pub struct Synchronizer {
inner: Arc<Inner>,
}
#[derive(Debug)]
pub enum SyncType {
Realtime,
Polling,
}
struct Inner {
toggles_url: Url,
refresh_interval: Duration,
auth: HeaderValue,
client: Client,
repo: Arc<RwLock<Repository>>,
is_init: Arc<RwLock<bool>>,
update_callback: Arc<Mutex<Option<UpdateCallback>>>,
}
impl std::fmt::Debug for Inner {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_tuple("SynchronizerInner")
.field(&self.toggles_url)
.field(&self.refresh_interval)
.field(&self.repo)
.field(&self.is_init)
.finish()
}
}
impl Synchronizer {
pub fn new(
toggles_url: Url,
refresh_interval: Duration,
auth: HeaderValue,
client: Client,
repo: Arc<RwLock<Repository>>,
) -> Self {
Self {
inner: Arc::new(Inner {
toggles_url,
refresh_interval,
auth,
client,
repo,
is_init: Default::default(),
update_callback: Arc::new(Mutex::new(None)),
}),
}
}
pub fn initialized(&self) -> bool {
let lock = self.inner.is_init.read();
*lock
}
pub fn start_sync(&self, start_wait: Option<Duration>, should_stop: Arc<RwLock<bool>>) {
let inner = self.inner.clone();
let (tx, rx) = sync_channel(1);
let start = Instant::now();
let mut is_send = false;
let interval_duration = inner.refresh_interval;
let is_timeout = Self::init_timeout_fn(start_wait, interval_duration, start);
tokio::spawn(async move {
let mut interval = tokio::time::interval(inner.refresh_interval);
loop {
let result = inner.sync_now(SyncType::Polling).await;
if let Some(r) = Self::should_send(result, &is_timeout, is_send) {
is_send = true;
let _ = tx.try_send(r);
}
if *should_stop.read() {
break;
}
interval.tick().await;
}
});
if start_wait.is_some() {
let _ = rx.recv();
}
}
pub fn set_update_callback(&mut self, update_callback: UpdateCallback) {
let mut lock = self.inner.update_callback.lock();
*lock = Some(update_callback);
}
pub fn version(&self) -> Option<u128> {
let repo = self.inner.repo.read();
repo.version
}
#[cfg(test)]
pub fn repository(&self) -> Arc<RwLock<Repository>> {
self.inner.repo.clone()
}
#[cfg(test)]
fn notify_update(&self, old_repo: Repository, new_repo: Repository, t: SyncType) {
self.inner.notify_update(old_repo, new_repo, t)
}
fn init_timeout_fn(
start_wait: Option<Duration>,
interval: Duration,
start: Instant,
) -> Option<Box<dyn Fn() -> bool + Send>> {
match start_wait {
Some(timeout) => Some(Box::new(move || start.elapsed() + interval > timeout)),
None => None,
}
}
fn should_send(
result: Result<(), FPError>,
is_timeout: &Option<Box<dyn Fn() -> bool + Send>>,
is_send: bool,
) -> Option<Result<(), FPError>> {
if let Some(is_timeout) = is_timeout {
match result {
Ok(_) if !is_send => {
return Some(Ok(()));
}
Err(e) if !is_send && is_timeout() => {
error!("sync error: {}", e);
return Some(Err(e));
}
Err(e) => error!("sync error: {}", e),
_ => {}
}
}
None
}
pub fn sync_now(&self, t: SyncType) {
let slf = self.clone();
tokio::spawn(async move { slf.inner.sync_now(t).await });
}
}
impl Inner {
pub async fn sync_now(&self, t: SyncType) -> Result<(), FPError> {
use http::header::USER_AGENT;
trace!("sync_now {:?} {:?}", self.auth, t);
let mut request = self
.client
.request(Method::GET, self.toggles_url.clone())
.header(AUTHORIZATION, self.auth.clone())
.header(USER_AGENT, &*crate::USER_AGENT)
.timeout(self.refresh_interval);
{
let repo = self.repo.read();
if let Some(version) = &repo.version {
request = request.query(&[("version", &version.to_string())]);
}
} match request.send().await {
Err(e) => Err(FPError::HttpError(e.to_string())),
Ok(resp) => match resp.text().await {
Err(e) => Err(FPError::HttpError(e.to_string())),
Ok(body) => match serde_json::from_str::<Repository>(&body) {
Err(e) => Err(FPError::JsonError(body, e)),
Ok(r) => {
debug!("sync success {:?}", r);
let mut repo = self.repo.write();
if r.version > repo.version {
let old = (*repo).clone();
let new = r.clone();
*repo = r;
self.notify_update(old, new, t);
}
let mut is_init = self.is_init.write();
*is_init = true;
Ok(())
}
},
},
}
}
fn notify_update(&self, old_repo: Repository, new_repo: Repository, t: SyncType) {
let lock = self.update_callback.lock();
if let Some(cb) = &*lock {
cb(old_repo, new_repo, t)
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::SdkAuthorization;
use axum::{routing::get, Json, Router, TypedHeader};
use headers::UserAgent;
use std::{fs, net::SocketAddr, path::PathBuf, sync::mpsc::channel};
#[test]
fn test_update_callback() {
let mut syncer = build_synchronizer(9000);
let (tx, rx) = channel();
syncer.set_update_callback(Box::new(move |_old, _new, _| tx.send(()).unwrap()));
let old = Repository::default();
let new = Repository::default();
syncer.notify_update(old, new, SyncType::Polling);
assert!(rx.try_recv().is_ok())
}
#[tokio::test]
async fn test_init_timeout_fn() {
let now = Instant::now();
let now = now - Duration::from_millis(10);
let is_timeout_fn = Synchronizer::init_timeout_fn(None, Duration::from_millis(1), now);
assert!(is_timeout_fn.is_none());
let is_timeout_fn = Synchronizer::init_timeout_fn(
Some(Duration::from_millis(20)),
Duration::from_millis(1),
now,
);
assert!(!is_timeout_fn.unwrap()());
let is_timeout_fn = Synchronizer::init_timeout_fn(
Some(Duration::from_millis(5)),
Duration::from_millis(1),
now,
);
assert!(is_timeout_fn.unwrap()());
}
#[test]
fn test_should_send() {
let is_timeout_fn = None;
let r = Synchronizer::should_send(Ok(()), &is_timeout_fn, false);
assert!(r.is_none(), "no need send because not set timeout");
let is_timeout_fn: Option<Box<dyn Fn() -> bool + Send>> = Some(Box::new(|| false));
let r = Synchronizer::should_send(Ok(()), &is_timeout_fn, false);
assert!(r.is_some(), "need send because not timeout, and return Ok");
let r = r.unwrap();
assert!(r.is_ok());
let is_timeout_fn: Option<Box<dyn Fn() -> bool + Send>> = Some(Box::new(|| false));
let r = Synchronizer::should_send(Ok(()), &is_timeout_fn, true);
assert!(
r.is_none(),
"no need send because not timeout, and return error, wait next loop"
);
let is_timeout_fn: Option<Box<dyn Fn() -> bool + Send>> = Some(Box::new(|| false));
let is_send = true;
let r = Synchronizer::should_send(
Err(FPError::InternalError("unknown".to_owned())),
&is_timeout_fn,
is_send,
);
assert!(r.is_none(), "no need send because already send before");
let is_timeout_fn: Option<Box<dyn Fn() -> bool + Send>> = Some(Box::new(|| true));
let r = Synchronizer::should_send(
Err(FPError::InternalError("unknown".to_owned())),
&is_timeout_fn,
is_send,
);
assert!(r.is_none(), "no need send because already send before");
let is_send = false;
let is_timeout_fn: Option<Box<dyn Fn() -> bool + Send>> = Some(Box::new(|| true));
let r = Synchronizer::should_send(
Err(FPError::InternalError("unknown".to_owned())),
&is_timeout_fn,
is_send,
);
assert!(r.is_some(), "need send because already timeout");
let r = r.unwrap();
assert!(r.is_err());
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_sync() {
let port = 9009;
setup_mock_api(port).await;
let syncer = build_synchronizer(port);
let should_stop = Arc::new(RwLock::new(false));
syncer.start_sync(Some(Duration::from_secs(5)), should_stop);
let repo = syncer.repository();
let repo = repo.read();
assert!(!repo.toggles.is_empty());
assert!(syncer.initialized());
}
fn build_synchronizer(port: u16) -> Synchronizer {
let toggles_url =
Url::parse(&format!("http://127.0.0.1:{}/api/server-sdk/toggles", port)).unwrap();
let refresh_interval = Duration::from_secs(10);
let auth = SdkAuthorization("sdk-key".to_owned()).encode();
Synchronizer {
inner: Arc::new(Inner {
toggles_url,
refresh_interval,
auth,
client: Default::default(),
repo: Default::default(),
is_init: Default::default(),
update_callback: Default::default(),
}),
}
}
async fn setup_mock_api(port: u16) {
let app = Router::new().route("/api/server-sdk/toggles", get(server_sdk_toggles));
let addr = SocketAddr::from(([0, 0, 0, 0], port));
tokio::spawn(async move {
let _ = axum::Server::bind(&addr)
.serve(app.into_make_service())
.await;
});
tokio::time::sleep(Duration::from_millis(100)).await;
}
async fn server_sdk_toggles(
TypedHeader(SdkAuthorization(sdk_key)): TypedHeader<SdkAuthorization>,
TypedHeader(user_agent): TypedHeader<UserAgent>,
) -> Json<Repository> {
assert_eq!(sdk_key, "sdk-key");
assert!(!user_agent.to_string().is_empty());
let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
path.push("resources/fixtures/repo.json");
let json_str = fs::read_to_string(path).unwrap();
let repo = serde_json::from_str::<Repository>(&json_str).unwrap();
repo.into()
}
}