use crate::signal::Signal;
#[cfg(target_arch = "wasm32")]
use super::cache;
#[cfg(target_arch = "wasm32")]
use super::durable_object::{
MetricRecord, MetricRegistration, RegisterMetricsRequest, RegisterRequest, ServiceRecord,
ServiceRegistration,
};
#[cfg(not(target_arch = "wasm32"))]
use super::{MetricRecord, ServiceRecord};
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
pub trait RegistrySender {
async fn register_services(&self, services: Vec<String>, signal: Signal) -> Result<(), String>;
async fn get_all_services(&self) -> Result<Vec<ServiceRecord>, String>;
async fn register_metrics(&self, metrics: Vec<(String, String)>) -> Result<(), String>;
async fn get_all_metrics(&self) -> Result<Vec<MetricRecord>, String>;
}
#[cfg(target_arch = "wasm32")]
pub struct WasmRegistrySender {
env: worker::Env,
}
#[cfg(target_arch = "wasm32")]
impl WasmRegistrySender {
pub fn new(env: worker::Env) -> Self {
Self { env }
}
fn get_stub(&self) -> Result<worker::Stub, worker::Error> {
let namespace = self.env.durable_object("REGISTRY")?;
let id = namespace.id_from_name("services-registry")?;
id.get_stub()
}
}
#[cfg(target_arch = "wasm32")]
#[async_trait::async_trait(?Send)]
impl RegistrySender for WasmRegistrySender {
async fn register_services(&self, services: Vec<String>, signal: Signal) -> Result<(), String> {
let signal_name = match signal {
Signal::Logs => "logs",
Signal::Traces => "traces",
Signal::Gauge
| Signal::Sum
| Signal::Histogram
| Signal::ExpHistogram
| Signal::Summary => "metrics",
};
let new_services: Vec<String> = services
.into_iter()
.filter(|name| !cache::is_known(name, signal_name))
.collect();
if new_services.is_empty() {
return Ok(());
}
let stub = self
.get_stub()
.map_err(|e| format!("Failed to get RegistryDO stub: {}", e))?;
let registrations: Vec<ServiceRegistration> = new_services
.iter()
.map(|name| ServiceRegistration {
name: name.clone(),
signal: signal_name.to_string(),
})
.collect();
let request_body = RegisterRequest {
services: registrations,
};
let body = serde_json::to_string(&request_body)
.map_err(|e| format!("Failed to serialize registration request: {}", e))?;
let mut request = worker::Request::new_with_init(
"http://do/register",
worker::RequestInit::new()
.with_method(worker::Method::Post)
.with_body(Some(body.into())),
)
.map_err(|e| format!("Failed to create request: {}", e))?;
request
.headers_mut()
.map_err(|e| format!("Failed to get request headers: {}", e))?
.set("Content-Type", "application/json")
.map_err(|e| format!("Failed to set Content-Type header: {}", e))?;
let response = stub
.fetch_with_request(request)
.await
.map_err(|e| format!("Failed to send to RegistryDO: {}", e))?;
if response.status_code() >= 400 {
return Err(format!(
"RegistryDO returned status {}",
response.status_code()
));
}
for service in &new_services {
cache::add_locally(service.clone(), signal_name.to_string());
}
Ok(())
}
async fn get_all_services(&self) -> Result<Vec<ServiceRecord>, String> {
let stub = self
.get_stub()
.map_err(|e| format!("Failed to get RegistryDO stub: {}", e))?;
let request = worker::Request::new_with_init(
"http://do/list",
worker::RequestInit::new().with_method(worker::Method::Get),
)
.map_err(|e| format!("Failed to create request: {}", e))?;
let mut response = stub
.fetch_with_request(request)
.await
.map_err(|e| format!("Failed to fetch from RegistryDO: {}", e))?;
if response.status_code() >= 400 {
return Err(format!(
"RegistryDO returned status {}",
response.status_code()
));
}
let text = response
.text()
.await
.map_err(|e| format!("Failed to read response body: {}", e))?;
let services: Vec<ServiceRecord> = serde_json::from_str(&text)
.map_err(|e| format!("Failed to parse service records: {}", e))?;
let service_names: Vec<String> = services.iter().map(|s| s.name.clone()).collect();
cache::refresh(service_names);
Ok(services)
}
async fn register_metrics(&self, metrics: Vec<(String, String)>) -> Result<(), String> {
let new_metrics: Vec<(String, String)> = metrics
.into_iter()
.filter(|(name, metric_type)| !cache::is_metric_known(name, metric_type))
.collect();
if new_metrics.is_empty() {
return Ok(());
}
let stub = self
.get_stub()
.map_err(|e| format!("Failed to get RegistryDO stub: {}", e))?;
let registrations: Vec<MetricRegistration> = new_metrics
.iter()
.map(|(name, metric_type)| MetricRegistration {
name: name.clone(),
metric_type: metric_type.clone(),
})
.collect();
let request_body = RegisterMetricsRequest {
metrics: registrations,
};
let body = serde_json::to_string(&request_body)
.map_err(|e| format!("Failed to serialize metrics request: {}", e))?;
let mut request = worker::Request::new_with_init(
"http://do/register-metrics",
worker::RequestInit::new()
.with_method(worker::Method::Post)
.with_body(Some(body.into())),
)
.map_err(|e| format!("Failed to create request: {}", e))?;
request
.headers_mut()
.map_err(|e| format!("Failed to get request headers: {}", e))?
.set("Content-Type", "application/json")
.map_err(|e| format!("Failed to set Content-Type header: {}", e))?;
let response = stub
.fetch_with_request(request)
.await
.map_err(|e| format!("Failed to send to RegistryDO: {}", e))?;
if response.status_code() >= 400 {
return Err(format!(
"RegistryDO returned status {}",
response.status_code()
));
}
for (name, metric_type) in &new_metrics {
cache::add_metric_locally(name.clone(), metric_type.clone());
}
Ok(())
}
async fn get_all_metrics(&self) -> Result<Vec<MetricRecord>, String> {
let stub = self
.get_stub()
.map_err(|e| format!("Failed to get RegistryDO stub: {}", e))?;
let request = worker::Request::new_with_init(
"http://do/list-metrics",
worker::RequestInit::new().with_method(worker::Method::Get),
)
.map_err(|e| format!("Failed to create request: {}", e))?;
let mut response = stub
.fetch_with_request(request)
.await
.map_err(|e| format!("Failed to fetch from RegistryDO: {}", e))?;
if response.status_code() >= 400 {
return Err(format!(
"RegistryDO returned status {}",
response.status_code()
));
}
let text = response
.text()
.await
.map_err(|e| format!("Failed to read response body: {}", e))?;
let metrics: Vec<MetricRecord> = serde_json::from_str(&text)
.map_err(|e| format!("Failed to parse metric records: {}", e))?;
let metric_tuples: Vec<(String, String)> = metrics
.iter()
.map(|m| (m.name.clone(), m.metric_type.clone()))
.collect();
cache::refresh_metrics(metric_tuples);
Ok(metrics)
}
}
#[cfg(not(target_arch = "wasm32"))]
pub struct NativeRegistrySender;
#[cfg(not(target_arch = "wasm32"))]
impl Default for NativeRegistrySender {
fn default() -> Self {
Self::new()
}
}
#[cfg(not(target_arch = "wasm32"))]
impl NativeRegistrySender {
pub fn new() -> Self {
Self
}
}
#[cfg(not(target_arch = "wasm32"))]
#[async_trait::async_trait]
impl RegistrySender for NativeRegistrySender {
async fn register_services(
&self,
_services: Vec<String>,
_signal: Signal,
) -> Result<(), String> {
Ok(())
}
async fn get_all_services(&self) -> Result<Vec<ServiceRecord>, String> {
Ok(vec![])
}
async fn register_metrics(&self, _metrics: Vec<(String, String)>) -> Result<(), String> {
Ok(())
}
async fn get_all_metrics(&self) -> Result<Vec<MetricRecord>, String> {
Ok(vec![])
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_native_sender_register_returns_ok() {
let sender = NativeRegistrySender::new();
let result = sender
.register_services(vec!["service1".to_string()], Signal::Logs)
.await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_native_sender_get_all_returns_empty() {
let sender = NativeRegistrySender::new();
let result = sender.get_all_services().await;
assert!(result.is_ok());
assert_eq!(result.unwrap().len(), 0);
}
#[tokio::test]
async fn test_native_sender_register_metrics_returns_ok() {
let sender = NativeRegistrySender::new();
let metrics = vec![
("metric1".to_string(), "gauge".to_string()),
("metric2".to_string(), "sum".to_string()),
];
let result = sender.register_metrics(metrics).await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_native_sender_get_all_metrics_returns_empty() {
let sender = NativeRegistrySender::new();
let result = sender.get_all_metrics().await;
assert!(result.is_ok());
assert_eq!(result.unwrap().len(), 0);
}
}