#[cfg(target_arch = "wasm32")]
use serde::{Deserialize, Serialize};
#[cfg(target_arch = "wasm32")]
use worker::*;
#[cfg(target_arch = "wasm32")]
#[derive(Debug, Serialize, Deserialize)]
pub struct RegisterRequest {
pub services: Vec<ServiceRegistration>,
}
#[cfg(target_arch = "wasm32")]
#[derive(Debug, Serialize, Deserialize)]
pub struct ServiceRegistration {
pub name: String,
pub signal: String,
}
#[cfg(target_arch = "wasm32")]
#[derive(Debug, Serialize, Deserialize)]
pub struct ServiceRecord {
pub name: String,
pub first_seen_at: i64,
pub has_logs: i64,
pub has_traces: i64,
pub has_metrics: i64,
}
#[cfg(target_arch = "wasm32")]
#[derive(Debug, Serialize, Deserialize)]
pub struct MetricRecord {
pub name: String,
pub metric_type: String,
}
#[cfg(target_arch = "wasm32")]
#[derive(Debug, Serialize, Deserialize)]
pub struct RegisterMetricsRequest {
pub metrics: Vec<MetricRegistration>,
}
#[cfg(target_arch = "wasm32")]
#[derive(Debug, Serialize, Deserialize)]
pub struct MetricRegistration {
pub name: String,
pub metric_type: String,
}
#[cfg(target_arch = "wasm32")]
#[derive(Debug, Deserialize)]
struct CountRow {
count: i64,
}
#[cfg(target_arch = "wasm32")]
#[durable_object]
pub struct RegistryDO {
state: State,
#[allow(dead_code)]
env: Env,
}
#[cfg(target_arch = "wasm32")]
impl DurableObject for RegistryDO {
fn new(state: State, env: Env) -> Self {
let do_instance = Self { state, env };
if let Err(e) = do_instance.ensure_schema() {
worker::console_error!("Failed to initialize SQLite schema: {}", e);
}
do_instance
}
async fn fetch(&self, req: Request) -> Result<Response> {
let path = req.path();
match (req.method(), path.as_str()) {
(Method::Post, "/register") => self.handle_register(req).await,
(Method::Get, "/list") => self.handle_list().await,
(Method::Post, "/register-metrics") => self.handle_register_metrics(req).await,
(Method::Get, "/list-metrics") => self.handle_list_metrics().await,
_ => Response::error("Not found", 404),
}
}
}
#[cfg(target_arch = "wasm32")]
impl RegistryDO {
const MAX_SERVICES: usize = 10_000;
const MAX_METRICS: usize = 10_000;
const DDL: &'static str = "CREATE TABLE IF NOT EXISTS services (
name TEXT PRIMARY KEY,
first_seen_at INTEGER NOT NULL,
has_logs INTEGER DEFAULT 0,
has_traces INTEGER DEFAULT 0,
has_metrics INTEGER DEFAULT 0
)";
const METRICS_DDL: &'static str = "CREATE TABLE IF NOT EXISTS metrics (
name TEXT NOT NULL,
metric_type TEXT NOT NULL,
first_seen_at INTEGER NOT NULL,
PRIMARY KEY (name, metric_type)
)";
fn ensure_schema(&self) -> Result<()> {
self.state.storage().sql().exec(Self::DDL, None)?;
self.state.storage().sql().exec(Self::METRICS_DDL, None)?;
Ok(())
}
fn now_ms() -> i64 {
worker::Date::now().as_millis() as i64
}
fn get_service_count(&self) -> Result<usize> {
let sql = self.state.storage().sql();
let rows: Vec<CountRow> = sql
.exec("SELECT COUNT(*) as count FROM services", None)?
.to_array()
.map_err(|e| worker::Error::RustError(format!("Failed to count services: {}", e)))?;
let count = rows.first().map(|r| r.count).unwrap_or(0) as usize;
Ok(count)
}
fn get_metric_count(&self) -> Result<usize> {
let sql = self.state.storage().sql();
let rows: Vec<CountRow> = sql
.exec("SELECT COUNT(*) as count FROM metrics", None)?
.to_array()
.map_err(|e| worker::Error::RustError(format!("Failed to count metrics: {}", e)))?;
let count = rows.first().map(|r| r.count).unwrap_or(0) as usize;
Ok(count)
}
fn count_new_services(&self, names: &[String]) -> Result<usize> {
if names.is_empty() {
return Ok(0);
}
let placeholders = names.iter().map(|_| "?").collect::<Vec<_>>().join(",");
let query = format!(
"SELECT COUNT(*) as count FROM services WHERE name IN ({})",
placeholders
);
let params: Vec<SqlStorageValue> = names
.iter()
.map(|name| SqlStorageValue::String(name.to_string()))
.collect();
let sql = self.state.storage().sql();
let rows: Vec<CountRow> = sql.exec(&query, Some(params))?.to_array().map_err(|e| {
worker::Error::RustError(format!("Failed to count existing services: {}", e))
})?;
let existing_count = rows.first().map(|r| r.count).unwrap_or(0) as usize;
Ok(names.len() - existing_count)
}
fn count_new_metrics(&self, metrics: &[(String, String)]) -> Result<usize> {
if metrics.is_empty() {
return Ok(0);
}
let sql = self.state.storage().sql();
let mut existing_count = 0;
for (name, metric_type) in metrics {
let rows: Vec<CountRow> = sql
.exec(
"SELECT COUNT(*) as count FROM metrics WHERE name = ? AND metric_type = ?",
vec![
SqlStorageValue::String(name.clone()),
SqlStorageValue::String(metric_type.clone()),
],
)?
.to_array()
.map_err(|e| {
worker::Error::RustError(format!("Failed to check metric existence: {}", e))
})?;
existing_count += rows.first().map(|r| r.count).unwrap_or(0) as usize;
}
Ok(metrics.len() - existing_count)
}
async fn handle_register(&self, mut req: Request) -> Result<Response> {
let body = req.text().await?;
let request: RegisterRequest = serde_json::from_str(&body)
.map_err(|e| worker::Error::RustError(format!("Invalid JSON: {}", e)))?;
if request.services.is_empty() {
return Response::ok("0");
}
let service_names: Vec<String> = request.services.iter().map(|s| s.name.clone()).collect();
let current_count = self.get_service_count()?;
let new_count = self.count_new_services(&service_names)?;
if current_count + new_count > Self::MAX_SERVICES {
worker::console_warn!(
"Service registry limit exceeded: {} current + {} new would exceed maximum of {}",
current_count,
new_count,
Self::MAX_SERVICES
);
return Response::error(
format!(
"Service registry limit exceeded: {} current + {} new would exceed maximum of {}",
current_count, new_count, Self::MAX_SERVICES
),
507, );
}
let now = Self::now_ms();
let mut registered = 0;
for service in &request.services {
self.upsert_service(&service.name, &service.signal, now)?;
registered += 1;
}
Response::ok(format!("{}", registered))
}
async fn handle_register_metrics(&self, mut req: Request) -> Result<Response> {
let body = req.text().await?;
let request: RegisterMetricsRequest = serde_json::from_str(&body)
.map_err(|e| worker::Error::RustError(format!("Invalid JSON: {}", e)))?;
if request.metrics.is_empty() {
return Response::ok("0");
}
let metric_pairs: Vec<(String, String)> = request
.metrics
.iter()
.map(|m| (m.name.clone(), m.metric_type.clone()))
.collect();
let current_count = self.get_metric_count()?;
let new_count = self.count_new_metrics(&metric_pairs)?;
if current_count + new_count > Self::MAX_METRICS {
worker::console_warn!(
"Metrics registry limit exceeded: {} current + {} new would exceed maximum of {}",
current_count,
new_count,
Self::MAX_METRICS
);
return Response::error(
format!(
"Metrics registry limit exceeded: {} current + {} new would exceed maximum of {}",
current_count, new_count, Self::MAX_METRICS
),
507,
);
}
let now = Self::now_ms();
let mut registered = 0;
for metric in &request.metrics {
self.upsert_metric(&metric.name, &metric.metric_type, now)?;
registered += 1;
}
Response::ok(format!("{}", registered))
}
fn upsert_service(&self, name: &str, signal: &str, first_seen_at: i64) -> Result<()> {
let (has_logs, has_traces, has_metrics) = match signal {
"logs" => (1, 0, 0),
"traces" => (0, 1, 0),
"metrics" => (0, 0, 1),
_ => (0, 0, 0), };
let sql = self.state.storage().sql();
sql.exec(
"INSERT INTO services (name, first_seen_at, has_logs, has_traces, has_metrics)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT(name) DO UPDATE SET
has_logs = MAX(has_logs, excluded.has_logs),
has_traces = MAX(has_traces, excluded.has_traces),
has_metrics = MAX(has_metrics, excluded.has_metrics)",
vec![
SqlStorageValue::String(name.to_string()),
SqlStorageValue::Integer(first_seen_at),
SqlStorageValue::Integer(has_logs),
SqlStorageValue::Integer(has_traces),
SqlStorageValue::Integer(has_metrics),
],
)?;
Ok(())
}
fn upsert_metric(&self, name: &str, metric_type: &str, first_seen_at: i64) -> Result<()> {
let sql = self.state.storage().sql();
sql.exec(
"INSERT INTO metrics (name, metric_type, first_seen_at)
VALUES (?, ?, ?)
ON CONFLICT(name, metric_type) DO NOTHING",
vec![
SqlStorageValue::String(name.to_string()),
SqlStorageValue::String(metric_type.to_string()),
SqlStorageValue::Integer(first_seen_at),
],
)?;
Ok(())
}
async fn handle_list(&self) -> Result<Response> {
let sql = self.state.storage().sql();
let result = sql.exec("SELECT * FROM services ORDER BY name", None)?;
let services: Vec<ServiceRecord> = result.to_array().map_err(|e| {
worker::Error::RustError(format!("Failed to deserialize service records: {}", e))
})?;
Response::from_json(&services)
}
async fn handle_list_metrics(&self) -> Result<Response> {
let sql = self.state.storage().sql();
let result = sql.exec(
"SELECT name, metric_type FROM metrics ORDER BY name, metric_type",
None,
)?;
let metrics: Vec<MetricRecord> = result.to_array().map_err(|e| {
worker::Error::RustError(format!("Failed to deserialize metric records: {}", e))
})?;
Response::from_json(&metrics)
}
}
#[cfg(all(test, target_arch = "wasm32"))]
mod tests {
use super::*;
#[test]
fn test_max_services_constant() {
assert_eq!(RegistryDO::MAX_SERVICES, 10_000);
}
#[test]
fn test_registration_request_serialization() {
let req = RegisterRequest {
services: vec![
ServiceRegistration {
name: "service1".to_string(),
signal: "logs".to_string(),
},
ServiceRegistration {
name: "service2".to_string(),
signal: "traces".to_string(),
},
],
};
let json = serde_json::to_string(&req).unwrap();
let deserialized: RegisterRequest = serde_json::from_str(&json).unwrap();
assert_eq!(deserialized.services.len(), 2);
assert_eq!(deserialized.services[0].name, "service1");
assert_eq!(deserialized.services[0].signal, "logs");
}
#[test]
fn test_service_record_serialization() {
let record = ServiceRecord {
name: "test-service".to_string(),
first_seen_at: 1234567890,
has_logs: 1,
has_traces: 0,
has_metrics: 1,
};
let json = serde_json::to_string(&record).unwrap();
let deserialized: ServiceRecord = serde_json::from_str(&json).unwrap();
assert_eq!(deserialized.name, "test-service");
assert_eq!(deserialized.first_seen_at, 1234567890);
assert_eq!(deserialized.has_logs, 1);
assert_eq!(deserialized.has_traces, 0);
assert_eq!(deserialized.has_metrics, 1);
}
}
#[cfg(all(test, not(target_arch = "wasm32")))]
mod tests {
#[test]
fn test_cardinality_limit_documented() {
assert!(
true,
"Cardinality protection is implemented in WASM-only code"
);
}
}