use std::fs;
use std::path::PathBuf;
use std::sync::Once;
use std::time::{Duration, SystemTime};
use tracing::debug;
use crate::config::Mode;
static HEARTBEAT_ONCE: Once = Once::new();
const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(7 * 24 * 60 * 60); const HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(3);
const DEFAULT_CHECKPOINT_URL: &str = "https://checkpoint.getaxonflow.com/v1/ping";
const STREAM_SANDBOX: &str = "sandbox";
const ENDPOINT_TYPE_LOCALHOST: &str = "localhost";
const ENDPOINT_TYPE_PRIVATE: &str = "private_network";
const ENDPOINT_TYPE_REMOTE: &str = "remote";
const ENDPOINT_TYPE_UNKNOWN: &str = "unknown";
pub fn maybe_send_heartbeat(endpoint: &str, mode: &Mode) {
HEARTBEAT_ONCE.call_once(|| {
if telemetry_off() {
debug!("Telemetry disabled via AXONFLOW_TELEMETRY=off");
return;
}
let stamp_path = resolve_stamp_path();
if let Some(ref p) = stamp_path {
if stamp_is_fresh(p) {
debug!("Telemetry heartbeat is still fresh (<7 days)");
return;
}
} else {
debug!("Telemetry stamp path unavailable; falling back to per-process gate");
}
let endpoint = endpoint.to_string();
let mode = mode.clone();
if let Ok(handle) = tokio::runtime::Handle::try_current() {
handle.spawn(async move {
send_heartbeat(&endpoint, &mode, stamp_path).await;
});
} else {
debug!("Telemetry skipped: no tokio runtime in scope");
}
});
}
fn telemetry_off() -> bool {
std::env::var("AXONFLOW_TELEMETRY")
.unwrap_or_default()
.trim()
.eq_ignore_ascii_case("off")
}
fn stamp_is_fresh(stamp_path: &PathBuf) -> bool {
let metadata = match fs::metadata(stamp_path) {
Ok(m) => m,
Err(_) => return false,
};
let modified = match metadata.modified() {
Ok(m) => m,
Err(_) => return false,
};
let elapsed = match SystemTime::now().duration_since(modified) {
Ok(e) => e,
Err(_) => return false,
};
elapsed < HEARTBEAT_INTERVAL
}
fn resolve_stamp_path() -> Option<PathBuf> {
home::home_dir().map(|mut p| {
#[cfg(target_os = "macos")]
{
p.push("Library");
p.push("Caches");
}
#[cfg(not(target_os = "macos"))]
{
p.push(".cache");
}
p.push("axonflow");
p.push("rust-telemetry-last-sent");
p
})
}
fn classify_endpoint(endpoint: &str) -> &'static str {
if endpoint.is_empty() {
return ENDPOINT_TYPE_UNKNOWN;
}
let parsed = match url::Url::parse(endpoint) {
Ok(u) => u,
Err(_) => return ENDPOINT_TYPE_UNKNOWN,
};
let host = match parsed.host_str() {
Some(h) => h
.trim_start_matches('[')
.trim_end_matches(']')
.to_lowercase(),
None => return ENDPOINT_TYPE_UNKNOWN,
};
if host == "localhost" || host == "0.0.0.0" || host.ends_with(".localhost") {
return ENDPOINT_TYPE_LOCALHOST;
}
for suffix in &[".local", ".internal", ".lan", ".intranet"] {
if host.ends_with(suffix) {
return ENDPOINT_TYPE_PRIVATE;
}
}
if let Ok(ip) = host.parse::<std::net::IpAddr>() {
if ip.is_loopback() {
return ENDPOINT_TYPE_LOCALHOST;
}
if is_private_or_link_local(&ip) {
return ENDPOINT_TYPE_PRIVATE;
}
return ENDPOINT_TYPE_REMOTE;
}
ENDPOINT_TYPE_REMOTE
}
fn is_private_or_link_local(ip: &std::net::IpAddr) -> bool {
match ip {
std::net::IpAddr::V4(v4) => v4.is_private() || v4.is_link_local(),
std::net::IpAddr::V6(v6) => {
let segs = v6.segments();
(segs[0] & 0xfe00) == 0xfc00 || (segs[0] & 0xffc0) == 0xfe80
}
}
}
pub const DEPLOYMENT_MODE_SELF_HOSTED: &str = "self_hosted";
pub const DEPLOYMENT_MODE_COMMUNITY_SAAS: &str = "community_saas";
pub const DEPLOYMENT_MODE_UNKNOWN: &str = "unknown";
fn classify_deployment_mode(endpoint: &str) -> &'static str {
if std::env::var("AXONFLOW_TRY").unwrap_or_default() == "1" {
return DEPLOYMENT_MODE_COMMUNITY_SAAS;
}
if endpoint.is_empty() {
return DEPLOYMENT_MODE_UNKNOWN;
}
let parsed = match url::Url::parse(endpoint) {
Ok(u) => u,
Err(_) => return DEPLOYMENT_MODE_UNKNOWN,
};
let host = match parsed.host_str() {
Some(h) => h.to_lowercase(),
None => return DEPLOYMENT_MODE_UNKNOWN,
};
if host == "try.getaxonflow.com" || host.ends_with(".try.getaxonflow.com") {
return DEPLOYMENT_MODE_COMMUNITY_SAAS;
}
DEPLOYMENT_MODE_SELF_HOSTED
}
fn stream_for_mode(mode: &Mode) -> Option<&'static str> {
match mode {
Mode::Sandbox => Some(STREAM_SANDBOX),
Mode::Production => None,
}
}
pub const ORG_ID_LOCAL_DEV_SENTINEL: &str = "local-dev-org";
pub fn telemetry_org_id() -> String {
match std::env::var("ORG_ID") {
Ok(v) if !v.is_empty() => v,
_ => ORG_ID_LOCAL_DEV_SENTINEL.to_string(),
}
}
fn os_str() -> &'static str {
std::env::consts::OS
}
fn arch_str() -> &'static str {
std::env::consts::ARCH
}
fn runtime_version_str() -> String {
"rustc-stable".to_string()
}
fn instance_id() -> String {
use std::time::UNIX_EPOCH;
let nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_nanos() as u64)
.unwrap_or(0);
let pid = std::process::id() as u64;
let mix = nanos ^ pid.rotate_left(17);
let bytes = mix.to_le_bytes();
format!(
"{:08x}-{:04x}-4{:03x}-{:04x}-{:012x}",
u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]),
u16::from_le_bytes([bytes[4], bytes[5]]),
u16::from_le_bytes([bytes[6], bytes[7]]) & 0x0fff,
(u16::from_le_bytes([bytes[0], bytes[7]]) & 0x3fff) | 0x8000,
u64::from_le_bytes([
bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[0], bytes[7]
]) & 0xffff_ffff_ffff
)
}
async fn send_heartbeat(endpoint: &str, mode: &Mode, stamp_path: Option<PathBuf>) {
let checkpoint_url = std::env::var("AXONFLOW_CHECKPOINT_URL")
.unwrap_or_else(|_| DEFAULT_CHECKPOINT_URL.to_string());
let client = match reqwest::Client::builder()
.timeout(HEARTBEAT_TIMEOUT)
.build()
{
Ok(c) => c,
Err(e) => {
debug!("Telemetry skipped: client build failed: {}", e);
return;
}
};
let mut payload = serde_json::Map::new();
payload.insert("telemetry_type".into(), serde_json::Value::from("sdk"));
payload.insert("sdk".into(), serde_json::Value::from("rust"));
payload.insert(
"sdk_version".into(),
serde_json::Value::from(env!("CARGO_PKG_VERSION")),
);
payload.insert("os".into(), serde_json::Value::from(os_str()));
payload.insert("arch".into(), serde_json::Value::from(arch_str()));
payload.insert(
"runtime_version".into(),
serde_json::Value::from(runtime_version_str()),
);
payload.insert(
"deployment_mode".into(),
serde_json::Value::from(classify_deployment_mode(endpoint)),
);
payload.insert(
"endpoint_type".into(),
serde_json::Value::from(classify_endpoint(endpoint)),
);
payload.insert("features".into(), serde_json::Value::Array(vec![]));
payload.insert("instance_id".into(), serde_json::Value::from(instance_id()));
if let Some(stream) = stream_for_mode(mode) {
payload.insert("stream".into(), serde_json::Value::from(stream));
}
payload.insert("org_id".into(), serde_json::Value::from(telemetry_org_id()));
debug!(
"[AxonFlow] Telemetry enabled. Opt out: AXONFLOW_TELEMETRY=off | https://docs.getaxonflow.com/docs/telemetry"
);
match client.post(&checkpoint_url).json(&payload).send().await {
Ok(resp) if resp.status().is_success() => {
debug!("Telemetry heartbeat delivered");
if let Some(stamp_path) = stamp_path {
if let Some(parent) = stamp_path.parent() {
let _ = fs::create_dir_all(parent);
}
let _ = fs::write(
&stamp_path,
format!("last_sent={}", chrono::Utc::now().to_rfc3339()),
);
}
}
Ok(resp) => debug!("Telemetry heartbeat rejected by server: {}", resp.status()),
Err(e) => debug!("Telemetry heartbeat failed: {}", e),
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn classify_endpoint_localhost_variants() {
assert_eq!(
classify_endpoint("http://localhost:8080"),
ENDPOINT_TYPE_LOCALHOST
);
assert_eq!(
classify_endpoint("https://127.0.0.1:8080"),
ENDPOINT_TYPE_LOCALHOST
);
assert_eq!(
classify_endpoint("http://0.0.0.0:9090"),
ENDPOINT_TYPE_LOCALHOST
);
assert_eq!(
classify_endpoint("http://my.localhost"),
ENDPOINT_TYPE_LOCALHOST
);
assert_eq!(
classify_endpoint("http://[::1]:8080"),
ENDPOINT_TYPE_LOCALHOST
);
}
#[test]
fn classify_endpoint_private_variants() {
assert_eq!(classify_endpoint("http://10.1.2.3"), ENDPOINT_TYPE_PRIVATE);
assert_eq!(
classify_endpoint("http://192.168.1.1"),
ENDPOINT_TYPE_PRIVATE
);
assert_eq!(
classify_endpoint("http://172.16.0.1"),
ENDPOINT_TYPE_PRIVATE
);
assert_eq!(classify_endpoint("http://api.local"), ENDPOINT_TYPE_PRIVATE);
assert_eq!(
classify_endpoint("http://api.internal"),
ENDPOINT_TYPE_PRIVATE
);
}
#[test]
fn classify_endpoint_remote() {
assert_eq!(
classify_endpoint("https://api.example.com"),
ENDPOINT_TYPE_REMOTE
);
assert_eq!(
classify_endpoint("https://203.0.113.5"),
ENDPOINT_TYPE_REMOTE
);
}
#[test]
fn classify_endpoint_unknown() {
assert_eq!(classify_endpoint(""), ENDPOINT_TYPE_UNKNOWN);
assert_eq!(classify_endpoint("not a url"), ENDPOINT_TYPE_UNKNOWN);
}
#[test]
fn stream_for_mode_classification() {
assert_eq!(stream_for_mode(&Mode::Sandbox), Some(STREAM_SANDBOX));
assert_eq!(stream_for_mode(&Mode::Production), None);
}
#[test]
fn classify_deployment_mode_v1_schema() {
std::env::remove_var("AXONFLOW_TRY");
assert_eq!(classify_deployment_mode(""), DEPLOYMENT_MODE_UNKNOWN);
assert_eq!(
classify_deployment_mode("not a url"),
DEPLOYMENT_MODE_UNKNOWN
);
assert_eq!(
classify_deployment_mode("https://api.example.com"),
DEPLOYMENT_MODE_SELF_HOSTED
);
assert_eq!(
classify_deployment_mode("https://try.getaxonflow.com"),
DEPLOYMENT_MODE_COMMUNITY_SAAS
);
assert_eq!(
classify_deployment_mode("https://eu.try.getaxonflow.com"),
DEPLOYMENT_MODE_COMMUNITY_SAAS
);
std::env::set_var("AXONFLOW_TRY", "1");
assert_eq!(
classify_deployment_mode("https://my-proxy.example.com"),
DEPLOYMENT_MODE_COMMUNITY_SAAS
);
std::env::remove_var("AXONFLOW_TRY");
}
#[test]
fn telemetry_off_recognizes_off_value() {
std::env::set_var("AXONFLOW_TELEMETRY", "off");
assert!(telemetry_off());
std::env::set_var("AXONFLOW_TELEMETRY", "OFF");
assert!(telemetry_off());
std::env::set_var("AXONFLOW_TELEMETRY", " off ");
assert!(telemetry_off());
std::env::set_var("AXONFLOW_TELEMETRY", "");
assert!(!telemetry_off());
std::env::set_var("AXONFLOW_TELEMETRY", "on");
assert!(!telemetry_off());
std::env::remove_var("AXONFLOW_TELEMETRY");
assert!(!telemetry_off());
}
use std::sync::Mutex;
static ORG_ID_TEST_LOCK: Mutex<()> = Mutex::new(());
#[test]
fn telemetry_org_id_env_wins() {
let _g = ORG_ID_TEST_LOCK.lock().unwrap();
std::env::set_var("ORG_ID", "acme-corp");
assert_eq!(telemetry_org_id(), "acme-corp");
std::env::remove_var("ORG_ID");
}
#[test]
fn telemetry_org_id_unset_returns_sentinel() {
let _g = ORG_ID_TEST_LOCK.lock().unwrap();
std::env::remove_var("ORG_ID");
assert_eq!(telemetry_org_id(), ORG_ID_LOCAL_DEV_SENTINEL);
assert_eq!(ORG_ID_LOCAL_DEV_SENTINEL, "local-dev-org");
}
#[test]
fn telemetry_org_id_empty_falls_through_to_sentinel() {
let _g = ORG_ID_TEST_LOCK.lock().unwrap();
std::env::set_var("ORG_ID", "");
assert_eq!(telemetry_org_id(), ORG_ID_LOCAL_DEV_SENTINEL);
std::env::remove_var("ORG_ID");
}
#[test]
fn telemetry_org_id_cs_prefixed_passes_through() {
let _g = ORG_ID_TEST_LOCK.lock().unwrap();
let cs_id = "cs_e3a4b5c6-d7e8-4f90-a1b2-c3d4e5f6a7b8";
std::env::set_var("ORG_ID", cs_id);
assert_eq!(telemetry_org_id(), cs_id);
std::env::remove_var("ORG_ID");
}
}