dynamo_runtime/component/
service.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4use async_nats::service::Service as NatsService;
5use async_nats::service::ServiceExt as _;
6use derive_builder::Builder;
7use derive_getters::Dissolve;
8use parking_lot::Mutex;
9use std::collections::HashMap;
10use std::sync::Arc;
11
12use crate::component::Component;
13
14pub use super::endpoint::EndpointStats;
15
16type StatsHandlerRegistry = Arc<Mutex<HashMap<String, EndpointStatsHandler>>>;
17pub type StatsHandler =
18    Box<dyn FnMut(String, EndpointStats) -> serde_json::Value + Send + Sync + 'static>;
19pub type EndpointStatsHandler =
20    Box<dyn FnMut(EndpointStats) -> serde_json::Value + Send + Sync + 'static>;
21
22pub const PROJECT_NAME: &str = "Dynamo";
23const SERVICE_VERSION: &str = env!("CARGO_PKG_VERSION");
24
25pub async fn build_nats_service(
26    nats_client: &crate::transports::nats::Client,
27    component: &Component,
28    description: Option<String>,
29) -> anyhow::Result<(NatsService, StatsHandlerRegistry)> {
30    let service_name = component.service_name();
31    tracing::trace!("component: {component}; creating, service_name: {service_name}");
32
33    let description = description.unwrap_or(format!(
34        "{PROJECT_NAME} component {} in namespace {}",
35        component.name, component.namespace
36    ));
37
38    let stats_handler_registry: StatsHandlerRegistry = Arc::new(Mutex::new(HashMap::new()));
39    let stats_handler_registry_clone = stats_handler_registry.clone();
40
41    let nats_service_builder = nats_client.client().service_builder();
42
43    let nats_service_builder =
44        nats_service_builder
45            .description(description)
46            .stats_handler(move |name, stats| {
47                tracing::trace!("stats_handler: {name}, {stats:?}");
48                let mut guard = stats_handler_registry.lock();
49                match guard.get_mut(&name) {
50                    Some(handler) => handler(stats),
51                    None => serde_json::Value::Null,
52                }
53            });
54    let nats_service = nats_service_builder
55        .start(service_name, SERVICE_VERSION.to_string())
56        .await
57        .map_err(|e| anyhow::anyhow!("Failed to start NATS service: {e}"))?;
58
59    Ok((nats_service, stats_handler_registry_clone))
60}