use std::sync::Arc;
use std::sync::atomic::Ordering;
use std::time::Duration;
use crate::client::{AkribesClient, Inner};
use crate::error::Result;
use crate::models::*;
fn heartbeat_backoff(consecutive_failures: u32) -> Duration {
if consecutive_failures == 0 {
return Duration::ZERO;
}
let base_ms: u64 = 1_000;
let cap_ms: u64 = 30_000;
let exponent = consecutive_failures.saturating_sub(1).min(20);
let exp_ms = base_ms.saturating_mul(1u64 << exponent).min(cap_ms);
let now_nanos = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.subsec_nanos() as u64)
.unwrap_or(0);
let jitter_ms = if exp_ms == 0 { 0 } else { now_nanos % exp_ms };
Duration::from_millis(jitter_ms)
}
#[derive(Clone, Debug)]
pub struct RegisteredClientsClient {
pub(crate) inner: Arc<Inner>,
pub(crate) project_id: i64,
}
impl RegisteredClientsClient {
pub(crate) fn new(inner: Arc<Inner>, project_id: i64) -> Self {
Self { inner, project_id }
}
fn c(&self) -> AkribesClient {
AkribesClient {
inner: Arc::clone(&self.inner),
}
}
fn project_url(&self) -> String {
format!("{}/projects/{}", self.inner.base_url, self.project_id)
}
fn script_url(&self, script_name: &str) -> String {
format!(
"{}/scripts/{}",
self.project_url(),
urlencoding::encode(script_name)
)
}
pub async fn init(&self, interests: Vec<ClientInterest>) -> Result<RegisterClientResponse> {
let url = format!("{}/clients", self.project_url());
let response: RegisterClientResponse = self
.c()
.post(
&url,
&RegisterRequest {
id: self.inner.id.clone(),
name: self.inner.name.clone(),
interests,
},
)
.await?;
{
let mut schemas = self.inner.schema_cache.lock().unwrap();
schemas.clear();
for interest in &response.interests {
schemas.insert(interest.script_name.clone(), interest.input_schema.clone());
}
}
self.inner.broken_scripts.lock().unwrap().clear();
let base_url = self.inner.base_url.clone();
let client_id = self.inner.id.clone();
let http = self.inner.http.clone();
let token = self.inner.token.clone();
let shutdown = Arc::clone(&self.inner.shutdown);
let handle = tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(30));
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
let mut consecutive_failures: u32 = 0;
loop {
interval.tick().await;
if shutdown.load(Ordering::Acquire) {
break;
}
let mut req =
http.post(format!("{}/heartbeat", base_url))
.json(&HeartbeatRequest {
client_id: client_id.clone(),
});
if let Some(ref t) = *token.read().await {
req = req.bearer_auth(t);
}
let failed = match req.send().await {
Ok(res) if res.status().is_success() => {
consecutive_failures = 0;
false
}
Ok(res) => {
tracing::warn!(status = res.status().as_u16(), "heartbeat rejected");
true
}
Err(e) => {
tracing::warn!(error = %e, "heartbeat failed");
true
}
};
if failed {
consecutive_failures += 1;
let backoff = heartbeat_backoff(consecutive_failures);
if !backoff.is_zero() {
tokio::time::sleep(backoff).await;
}
}
}
})
.abort_handle();
if let Ok(mut h) = self.inner.heartbeat_handle.lock() {
if let Some(old) = h.take() {
old.abort();
}
*h = Some(handle);
}
Ok(response)
}
pub fn destroy(&self) {
self.inner.shutdown.store(true, Ordering::Release);
if let Ok(mut h) = self.inner.heartbeat_handle.lock() {
if let Some(handle) = h.take() {
handle.abort();
}
}
}
pub async fn list(&self) -> Result<Vec<ClientInfo>> {
let url = format!("{}/clients", self.project_url());
self.c().get_list(&url).await
}
pub async fn delete(&self, client_id: &str) -> Result<()> {
let url = format!("{}/clients/{}", self.inner.base_url, client_id);
self.c().delete(&url).await?;
Ok(())
}
pub async fn list_locks(&self, script_name: &str) -> Result<Vec<ContractLockInfo>> {
let url = format!("{}/locks", self.script_url(script_name));
self.c().get_list(&url).await
}
pub async fn revoke_lock(&self, script_name: &str, lock_id: i64) -> Result<()> {
let url = format!("{}/locks/{}", self.script_url(script_name), lock_id);
self.c().delete(&url).await?;
Ok(())
}
pub async fn rebind_lock(
&self,
script_name: &str,
lock_id: i64,
version_id: Option<i64>,
) -> Result<ContractLockInfo> {
let url = format!("{}/locks/{}/rebind", self.script_url(script_name), lock_id);
self.c()
.patch(&url, &RebindLockRequest { version_id })
.await
}
fn flat_script_url(&self, project_id: i64, script_name: &str) -> String {
format!(
"{}/projects/{}/scripts/{}",
self.inner.base_url,
project_id,
urlencoding::encode(script_name)
)
}
pub async fn list_locks_for(
&self,
project_id: i64,
script_name: &str,
) -> Result<Vec<ContractLockInfo>> {
let url = format!("{}/locks", self.flat_script_url(project_id, script_name));
self.c().get_list(&url).await
}
pub async fn delete_lock(
&self,
project_id: i64,
script_name: &str,
lock_id: i64,
) -> Result<()> {
let url = format!(
"{}/locks/{}",
self.flat_script_url(project_id, script_name),
lock_id
);
self.c().delete(&url).await?;
Ok(())
}
pub async fn update_lock(
&self,
project_id: i64,
script_name: &str,
lock_id: i64,
version_id: Option<i64>,
) -> Result<ContractLockInfo> {
let url = format!(
"{}/locks/{}/rebind",
self.flat_script_url(project_id, script_name),
lock_id
);
self.c()
.patch(&url, &RebindLockRequest { version_id })
.await
}
}