dynamo_runtime/component/
service.rs1use super::*;
5use crate::component::Component;
6use crate::config::RequestPlaneMode;
7use async_nats::service::Service as NatsService;
8use async_nats::service::ServiceExt as _;
9use derive_builder::Builder;
10use derive_getters::Dissolve;
11use parking_lot::Mutex;
12use std::collections::HashMap;
13use std::sync::Arc;
14
15pub use super::endpoint::EndpointStats;
16
17type StatsHandlerRegistry = Arc<Mutex<HashMap<String, EndpointStatsHandler>>>;
18pub type StatsHandler =
19 Box<dyn FnMut(String, EndpointStats) -> serde_json::Value + Send + Sync + 'static>;
20pub type EndpointStatsHandler =
21 Box<dyn FnMut(EndpointStats) -> serde_json::Value + Send + Sync + 'static>;
22
23pub const PROJECT_NAME: &str = "Dynamo";
24const SERVICE_VERSION: &str = env!("CARGO_PKG_VERSION");
25
26pub async fn build_nats_service(
27 nats_client: &crate::transports::nats::Client,
28 component: &Component,
29 description: Option<String>,
30) -> anyhow::Result<(NatsService, StatsHandlerRegistry)> {
31 let service_name = component.service_name();
32 tracing::trace!("component: {component}; creating, service_name: {service_name}");
33
34 let description = description.unwrap_or(format!(
35 "{PROJECT_NAME} component {} in namespace {}",
36 component.name, component.namespace
37 ));
38
39 let stats_handler_registry: StatsHandlerRegistry = Arc::new(Mutex::new(HashMap::new()));
40 let stats_handler_registry_clone = stats_handler_registry.clone();
41
42 let nats_service_builder = nats_client.client().service_builder();
43
44 let nats_service_builder =
45 nats_service_builder
46 .description(description)
47 .stats_handler(move |name, stats| {
48 tracing::trace!("stats_handler: {name}, {stats:?}");
49 let mut guard = stats_handler_registry.lock();
50 match guard.get_mut(&name) {
51 Some(handler) => handler(stats),
52 None => serde_json::Value::Null,
53 }
54 });
55 let nats_service = nats_service_builder
56 .start(service_name, SERVICE_VERSION.to_string())
57 .await
58 .map_err(|e| anyhow::anyhow!("Failed to start NATS service: {e}"))?;
59
60 Ok((nats_service, stats_handler_registry_clone))
61}