dynamo_runtime/
distributed.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8// http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16pub use crate::component::Component;
17use crate::{
18    component::{self, ComponentBuilder, Namespace},
19    discovery::DiscoveryClient,
20    service::ServiceClient,
21    transports::{etcd, nats, tcp},
22    ErrorContext,
23};
24
25use super::{error, Arc, DistributedRuntime, OnceCell, Result, Runtime, OK};
26
27use derive_getters::Dissolve;
28use figment::error;
29
30impl DistributedRuntime {
31    pub async fn new(runtime: Runtime, config: DistributedConfig) -> Result<Self> {
32        let secondary = runtime.secondary();
33        let (etcd_config, nats_config) = config.dissolve();
34
35        let runtime_clone = runtime.clone();
36
37        let etcd_client = secondary
38            .spawn(async move {
39                let client = etcd::Client::new(etcd_config.clone(), runtime_clone)
40                    .await
41                    .context(format!(
42                        "Failed to connect to etcd server with config {:?}",
43                        etcd_config
44                    ))?;
45                OK(client)
46            })
47            .await??;
48
49        let nats_client = secondary
50            .spawn(async move {
51                let client = nats_config.clone().connect().await.context(format!(
52                    "Failed to connect to NATS server with config {:?}",
53                    nats_config
54                ))?;
55                anyhow::Ok(client)
56            })
57            .await??;
58
59        Ok(Self {
60            runtime,
61            etcd_client,
62            nats_client,
63            tcp_server: Arc::new(OnceCell::new()),
64            component_registry: component::Registry::new(),
65        })
66    }
67
68    pub async fn from_settings(runtime: Runtime) -> Result<Self> {
69        let config = DistributedConfig::from_settings();
70        Self::new(runtime, config).await
71    }
72
73    pub fn runtime(&self) -> &Runtime {
74        &self.runtime
75    }
76
77    pub fn primary_lease(&self) -> etcd::Lease {
78        self.etcd_client.primary_lease()
79    }
80
81    pub fn shutdown(&self) {
82        self.runtime.shutdown();
83    }
84
85    /// Create a [`Namespace`]
86    pub fn namespace(&self, name: impl Into<String>) -> Result<Namespace> {
87        Namespace::new(self.clone(), name.into())
88    }
89
90    // /// Create a [`Component`]
91    // pub fn component(
92    //     &self,
93    //     name: impl Into<String>,
94    //     namespace: impl Into<String>,
95    // ) -> Result<Component> {
96    //     Ok(ComponentBuilder::from_runtime(self.clone())
97    //         .name(name.into())
98    //         .namespace(namespace.into())
99    //         .build()?)
100    // }
101
102    pub(crate) fn discovery_client(&self, namespace: impl Into<String>) -> DiscoveryClient {
103        DiscoveryClient::new(namespace.into(), self.etcd_client.clone())
104    }
105
106    pub(crate) fn service_client(&self) -> ServiceClient {
107        ServiceClient::new(self.nats_client.clone())
108    }
109
110    pub async fn tcp_server(&self) -> Result<Arc<tcp::server::TcpStreamServer>> {
111        Ok(self
112            .tcp_server
113            .get_or_try_init(async move {
114                let options = tcp::server::ServerOptions::default();
115                let server = tcp::server::TcpStreamServer::new(options).await?;
116                OK(server)
117            })
118            .await?
119            .clone())
120    }
121
122    pub fn nats_client(&self) -> nats::Client {
123        self.nats_client.clone()
124    }
125
126    pub fn etcd_client(&self) -> etcd::Client {
127        self.etcd_client.clone()
128    }
129}
130
131#[derive(Dissolve)]
132pub struct DistributedConfig {
133    pub etcd_config: etcd::ClientOptions,
134    pub nats_config: nats::ClientOptions,
135}
136
137impl DistributedConfig {
138    pub fn from_settings() -> DistributedConfig {
139        DistributedConfig {
140            etcd_config: etcd::ClientOptions::default(),
141            nats_config: nats::ClientOptions::default(),
142        }
143    }
144
145    pub fn for_cli() -> DistributedConfig {
146        let mut config = DistributedConfig {
147            etcd_config: etcd::ClientOptions::default(),
148            nats_config: nats::ClientOptions::default(),
149        };
150
151        config.etcd_config.attach_lease = false;
152
153        config
154    }
155}