dynamo_runtime/
distributed.rs1pub use crate::component::Component;
17use crate::{
18 component::{self, ComponentBuilder, Namespace},
19 discovery::DiscoveryClient,
20 service::ServiceClient,
21 transports::{etcd, nats, tcp},
22 ErrorContext,
23};
24
25use super::{error, Arc, DistributedRuntime, OnceCell, Result, Runtime, OK};
26
27use derive_getters::Dissolve;
28use figment::error;
29
30impl DistributedRuntime {
31 pub async fn new(runtime: Runtime, config: DistributedConfig) -> Result<Self> {
32 let secondary = runtime.secondary();
33 let (etcd_config, nats_config) = config.dissolve();
34
35 let runtime_clone = runtime.clone();
36
37 let etcd_client = secondary
38 .spawn(async move {
39 let client = etcd::Client::new(etcd_config.clone(), runtime_clone)
40 .await
41 .context(format!(
42 "Failed to connect to etcd server with config {:?}",
43 etcd_config
44 ))?;
45 OK(client)
46 })
47 .await??;
48
49 let nats_client = secondary
50 .spawn(async move {
51 let client = nats_config.clone().connect().await.context(format!(
52 "Failed to connect to NATS server with config {:?}",
53 nats_config
54 ))?;
55 anyhow::Ok(client)
56 })
57 .await??;
58
59 Ok(Self {
60 runtime,
61 etcd_client,
62 nats_client,
63 tcp_server: Arc::new(OnceCell::new()),
64 component_registry: component::Registry::new(),
65 })
66 }
67
68 pub async fn from_settings(runtime: Runtime) -> Result<Self> {
69 let config = DistributedConfig::from_settings();
70 Self::new(runtime, config).await
71 }
72
73 pub fn runtime(&self) -> &Runtime {
74 &self.runtime
75 }
76
77 pub fn primary_lease(&self) -> etcd::Lease {
78 self.etcd_client.primary_lease()
79 }
80
81 pub fn shutdown(&self) {
82 self.runtime.shutdown();
83 }
84
85 pub fn namespace(&self, name: impl Into<String>) -> Result<Namespace> {
87 Namespace::new(self.clone(), name.into())
88 }
89
90 pub(crate) fn discovery_client(&self, namespace: impl Into<String>) -> DiscoveryClient {
103 DiscoveryClient::new(namespace.into(), self.etcd_client.clone())
104 }
105
106 pub(crate) fn service_client(&self) -> ServiceClient {
107 ServiceClient::new(self.nats_client.clone())
108 }
109
110 pub async fn tcp_server(&self) -> Result<Arc<tcp::server::TcpStreamServer>> {
111 Ok(self
112 .tcp_server
113 .get_or_try_init(async move {
114 let options = tcp::server::ServerOptions::default();
115 let server = tcp::server::TcpStreamServer::new(options).await?;
116 OK(server)
117 })
118 .await?
119 .clone())
120 }
121
122 pub fn nats_client(&self) -> nats::Client {
123 self.nats_client.clone()
124 }
125
126 pub fn etcd_client(&self) -> etcd::Client {
127 self.etcd_client.clone()
128 }
129}
130
131#[derive(Dissolve)]
132pub struct DistributedConfig {
133 pub etcd_config: etcd::ClientOptions,
134 pub nats_config: nats::ClientOptions,
135}
136
137impl DistributedConfig {
138 pub fn from_settings() -> DistributedConfig {
139 DistributedConfig {
140 etcd_config: etcd::ClientOptions::default(),
141 nats_config: nats::ClientOptions::default(),
142 }
143 }
144
145 pub fn for_cli() -> DistributedConfig {
146 let mut config = DistributedConfig {
147 etcd_config: etcd::ClientOptions::default(),
148 nats_config: nats::ClientOptions::default(),
149 };
150
151 config.etcd_config.attach_lease = false;
152
153 config
154 }
155}