use smbcloud_gresiq_sdk::{Environment, GresiqClient, GresiqCredentials};
use super::events::{InferenceEvent, ModelLoadedEvent};
const EMBEDDED_API_KEY_DEV: Option<&str> = option_env!("GRESIQ_API_KEY_DEV");
const EMBEDDED_API_SECRET_DEV: Option<&str> = option_env!("GRESIQ_API_SECRET_DEV");
const EMBEDDED_API_KEY_PRODUCTION: Option<&str> = option_env!("GRESIQ_API_KEY_PRODUCTION");
const EMBEDDED_API_SECRET_SECRET_PRODUCTION: Option<&str> =
option_env!("GRESIQ_API_SECRET_PRODUCTION");
#[derive(Debug, Clone)]
pub struct PulseClient {
inner: GresiqClient,
edge_id: String,
}
impl PulseClient {
pub fn disabled_by_env() -> bool {
matches!(
std::env::var("ONDE_DISABLE_PULSE")
.ok()
.as_deref()
.map(str::trim)
.map(str::to_ascii_lowercase)
.as_deref(),
Some("1") | Some("true") | Some("yes") | Some("on")
)
}
pub fn new(environment: Environment, edge_id: String) -> Option<Self> {
if Self::disabled_by_env() {
return None;
}
let (api_key, api_secret) = match environment {
Environment::Dev => (EMBEDDED_API_KEY_DEV?, EMBEDDED_API_SECRET_DEV?),
Environment::Production => (
EMBEDDED_API_KEY_PRODUCTION?,
EMBEDDED_API_SECRET_SECRET_PRODUCTION?,
),
};
let edge_id = if edge_id.is_empty() {
"onde-unknown".to_string()
} else {
edge_id
};
if tokio::runtime::Handle::try_current().is_err() {
log::warn!(
"pulse: no Tokio runtime available — \
deferring PulseClient creation"
);
return None;
}
let credentials = GresiqCredentials {
api_key,
api_secret,
};
let inner = match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
GresiqClient::from_credentials(environment, credentials)
})) {
Ok(client) => client,
Err(_) => {
log::warn!(
"pulse: GresiqClient::from_credentials panicked \
(likely missing Tokio reactor) — telemetry disabled"
);
return None;
}
};
Some(PulseClient { inner, edge_id })
}
pub async fn record_model_loaded(
&self,
model_id: String,
model_name: String,
load_duration_ms: u64,
) {
let client = self.clone();
let send_event = async move {
let event = ModelLoadedEvent {
edge_id: client.edge_id.clone(),
model_id,
model_name,
load_duration_ms,
};
if let Err(error) = client.inner.insert("pulse/model_loaded", &event).await {
log::warn!("pulse: model_loaded failed: {}", error);
}
};
if tokio::runtime::Handle::try_current().is_ok() {
send_event.await;
return;
}
let join = std::thread::spawn(move || {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build();
match runtime {
Ok(runtime) => {
runtime.block_on(send_event);
}
Err(error) => {
log::warn!(
"pulse: could not create fallback runtime for model_loaded: {}",
error
);
}
}
});
if join.join().is_err() {
log::warn!("pulse: fallback model_loaded thread panicked");
}
}
pub fn record_inference(
&self,
model_id: String,
request_id: String,
duration_ms: u64,
status: String,
) {
let client = self.clone();
let send_event = async move {
let event = InferenceEvent {
edge_id: client.edge_id.clone(),
model_id,
request_id,
duration_ms,
status,
};
if let Err(error) = client.inner.insert("pulse/inference_event", &event).await {
log::warn!("pulse: inference_event failed: {}", error);
}
};
if let Ok(handle) = tokio::runtime::Handle::try_current() {
handle.spawn(send_event);
return;
}
std::thread::spawn(move || {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build();
match runtime {
Ok(runtime) => {
runtime.block_on(send_event);
}
Err(error) => {
log::warn!(
"pulse: could not create fallback runtime for inference_event: {}",
error
);
}
}
});
}
}