1mod node_target;
4pub mod pool;
5#[cfg(test)]
6mod tests;
7
8pub use node_target::{NodeTarget, NODE_METADATA_KEY};
9
10use crate::api::machine::machine_service_client::MachineServiceClient;
11use crate::api::machine::ApplyConfigurationRequest as ProtoApplyConfigRequest;
12use crate::api::machine::BootstrapRequest as ProtoBootstrapRequest;
13use crate::api::machine::CopyRequest as ProtoCopyRequest;
14use crate::api::machine::DiskUsageRequest as ProtoDiskUsageRequest;
15use crate::api::machine::DmesgRequest as ProtoDmesgRequest;
16use crate::api::machine::EtcdForfeitLeadershipRequest as ProtoEtcdForfeitLeadershipRequest;
17use crate::api::machine::EtcdLeaveClusterRequest as ProtoEtcdLeaveClusterRequest;
18use crate::api::machine::EtcdMemberListRequest as ProtoEtcdMemberListRequest;
19use crate::api::machine::EtcdRemoveMemberByIdRequest as ProtoEtcdRemoveMemberByIdRequest;
20use crate::api::machine::GenerateClientConfigurationRequest as ProtoGenerateClientConfigRequest;
21use crate::api::machine::ListRequest as ProtoListRequest;
22use crate::api::machine::LogsRequest as ProtoLogsRequest;
23use crate::api::machine::NetstatRequest as ProtoNetstatRequest;
24use crate::api::machine::PacketCaptureRequest as ProtoPacketCaptureRequest;
25use crate::api::machine::ReadRequest as ProtoReadRequest;
26use crate::api::machine::ResetRequest as ProtoResetRequest;
27use crate::api::machine::RollbackRequest as ProtoRollbackRequest;
28use crate::api::machine::ServiceRestartRequest as ProtoServiceRestartRequest;
29use crate::api::machine::ServiceStartRequest as ProtoServiceStartRequest;
30use crate::api::machine::ServiceStopRequest as ProtoServiceStopRequest;
31use crate::api::machine::UpgradeRequest as ProtoUpgradeRequest;
32use crate::api::version::version_service_client::VersionServiceClient;
33use crate::error::Result;
34use crate::resources::{
35 ApplyConfigurationRequest, ApplyConfigurationResponse, BootstrapRequest, BootstrapResponse,
36 CopyRequest, CopyResponse, CpuInfoResponse, DiskStatsResponse, DiskUsageInfo, DiskUsageRequest,
37 DiskUsageResponse, DmesgRequest, DmesgResponse, EtcdAlarmDisarmResponse, EtcdAlarmListResponse,
38 EtcdDefragmentResponse, EtcdForfeitLeadershipRequest, EtcdForfeitLeadershipResponse,
39 EtcdLeaveClusterRequest, EtcdLeaveClusterResponse, EtcdMemberListRequest,
40 EtcdMemberListResponse, EtcdRemoveMemberByIdRequest, EtcdRemoveMemberByIdResponse,
41 EtcdStatusResponse, FileInfo, GenerateClientConfigurationRequest,
42 GenerateClientConfigurationResponse, KubeconfigResponse, ListRequest, ListResponse,
43 LoadAvgResponse, LogsRequest, LogsResponse, MemoryResponse, MountsResponse, NetstatRequest,
44 NetstatResponse, NetworkDeviceStatsResponse, PacketCaptureRequest, PacketCaptureResponse,
45 ProcessesResponse, ReadRequest, ReadResponse, ResetRequest, ResetResponse, RollbackResponse,
46 ServiceRestartRequest, ServiceRestartResponse, ServiceStartRequest, ServiceStartResponse,
47 ServiceStopRequest, ServiceStopResponse, UpgradeRequest, UpgradeResponse,
48};
49use hyper_util::rt::TokioIo;
50use rustls::pki_types::{CertificateDer, PrivateKeyDer, ServerName};
51use std::sync::Arc;
52use std::time::Duration;
53use tonic::transport::{Channel, Endpoint};
54
55#[derive(Clone, Debug)]
57pub struct TalosClientConfig {
58 pub endpoint: String,
60 pub crt_path: Option<String>,
62 pub key_path: Option<String>,
64 pub ca_path: Option<String>,
66 pub insecure: bool,
68 pub connect_timeout: Option<Duration>,
70 pub request_timeout: Option<Duration>,
72 pub keepalive_interval: Option<Duration>,
74 pub keepalive_timeout: Option<Duration>,
76}
77
78impl Default for TalosClientConfig {
79 fn default() -> Self {
80 Self {
81 endpoint: "https://127.0.0.1:50000".to_string(),
82 crt_path: None,
83 key_path: None,
84 ca_path: None,
85 insecure: false,
86 connect_timeout: Some(Duration::from_secs(10)),
87 request_timeout: Some(Duration::from_secs(30)),
88 keepalive_interval: Some(Duration::from_secs(30)),
89 keepalive_timeout: Some(Duration::from_secs(10)),
90 }
91 }
92}
93
94impl TalosClientConfig {
95 #[must_use]
97 pub fn new(endpoint: impl Into<String>) -> Self {
98 Self {
99 endpoint: endpoint.into(),
100 ..Default::default()
101 }
102 }
103
104 #[must_use]
106 pub fn builder(endpoint: impl Into<String>) -> TalosClientConfigBuilder {
107 TalosClientConfigBuilder::new(endpoint)
108 }
109
110 #[must_use]
112 pub fn with_client_cert(mut self, crt_path: impl Into<String>) -> Self {
113 self.crt_path = Some(crt_path.into());
114 self
115 }
116
117 #[must_use]
119 pub fn with_client_key(mut self, key_path: impl Into<String>) -> Self {
120 self.key_path = Some(key_path.into());
121 self
122 }
123
124 #[must_use]
126 pub fn with_ca(mut self, ca_path: impl Into<String>) -> Self {
127 self.ca_path = Some(ca_path.into());
128 self
129 }
130
131 #[must_use]
133 pub fn insecure(mut self) -> Self {
134 self.insecure = true;
135 self
136 }
137
138 #[must_use]
140 pub fn with_connect_timeout(mut self, timeout: Duration) -> Self {
141 self.connect_timeout = Some(timeout);
142 self
143 }
144
145 #[must_use]
147 pub fn with_request_timeout(mut self, timeout: Duration) -> Self {
148 self.request_timeout = Some(timeout);
149 self
150 }
151
152 #[must_use]
154 pub fn no_timeout(mut self) -> Self {
155 self.connect_timeout = None;
156 self.request_timeout = None;
157 self
158 }
159}
160
161#[derive(Debug, Clone)]
163pub struct TalosClientConfigBuilder {
164 endpoint: String,
165 crt_path: Option<String>,
166 key_path: Option<String>,
167 ca_path: Option<String>,
168 insecure: bool,
169 connect_timeout: Option<Duration>,
170 request_timeout: Option<Duration>,
171 keepalive_interval: Option<Duration>,
172 keepalive_timeout: Option<Duration>,
173}
174
175impl TalosClientConfigBuilder {
176 #[must_use]
178 pub fn new(endpoint: impl Into<String>) -> Self {
179 Self {
180 endpoint: endpoint.into(),
181 crt_path: None,
182 key_path: None,
183 ca_path: None,
184 insecure: false,
185 connect_timeout: Some(Duration::from_secs(10)),
186 request_timeout: Some(Duration::from_secs(30)),
187 keepalive_interval: Some(Duration::from_secs(30)),
188 keepalive_timeout: Some(Duration::from_secs(10)),
189 }
190 }
191
192 #[must_use]
194 pub fn client_cert(mut self, path: impl Into<String>) -> Self {
195 self.crt_path = Some(path.into());
196 self
197 }
198
199 #[must_use]
201 pub fn client_key(mut self, path: impl Into<String>) -> Self {
202 self.key_path = Some(path.into());
203 self
204 }
205
206 #[must_use]
208 pub fn ca_cert(mut self, path: impl Into<String>) -> Self {
209 self.ca_path = Some(path.into());
210 self
211 }
212
213 #[must_use]
215 pub fn insecure(mut self) -> Self {
216 self.insecure = true;
217 self
218 }
219
220 #[must_use]
222 pub fn connect_timeout(mut self, timeout: Duration) -> Self {
223 self.connect_timeout = Some(timeout);
224 self
225 }
226
227 #[must_use]
229 pub fn request_timeout(mut self, timeout: Duration) -> Self {
230 self.request_timeout = Some(timeout);
231 self
232 }
233
234 #[must_use]
236 pub fn keepalive(mut self, interval: Duration, timeout: Duration) -> Self {
237 self.keepalive_interval = Some(interval);
238 self.keepalive_timeout = Some(timeout);
239 self
240 }
241
242 #[must_use]
244 pub fn no_timeout(mut self) -> Self {
245 self.connect_timeout = None;
246 self.request_timeout = None;
247 self
248 }
249
250 #[must_use]
252 pub fn build(self) -> TalosClientConfig {
253 TalosClientConfig {
254 endpoint: self.endpoint,
255 crt_path: self.crt_path,
256 key_path: self.key_path,
257 ca_path: self.ca_path,
258 insecure: self.insecure,
259 connect_timeout: self.connect_timeout,
260 request_timeout: self.request_timeout,
261 keepalive_interval: self.keepalive_interval,
262 keepalive_timeout: self.keepalive_timeout,
263 }
264 }
265}
266
267#[derive(Clone)]
268pub struct TalosClient {
269 #[allow(dead_code)] config: TalosClientConfig,
271 channel: Channel,
272 node_target: NodeTarget,
274}
275
276impl TalosClient {
277 pub async fn new(config: TalosClientConfig) -> Result<Self> {
278 let _ = rustls::crypto::ring::default_provider().install_default();
280
281 let is_http = config.endpoint.starts_with("http://");
283
284 let channel = if is_http {
285 Self::create_http_channel(&config).await?
287 } else if config.insecure {
288 Self::create_insecure_channel(&config).await?
289 } else {
290 Self::create_mtls_channel(&config).await?
291 };
292
293 Ok(Self {
294 config,
295 channel,
296 node_target: NodeTarget::Default,
297 })
298 }
299
300 pub async fn from_talosconfig(
321 config: &crate::config::TalosConfig,
322 context_name: Option<&str>,
323 ) -> Result<Self> {
324 let context = if let Some(name) = context_name {
325 config.get_context(name).ok_or_else(|| {
326 crate::error::TalosError::Config(format!("Context '{}' not found", name))
327 })?
328 } else {
329 config.active_context().ok_or_else(|| {
330 crate::error::TalosError::Config("No active context in talosconfig".to_string())
331 })?
332 };
333
334 let endpoint = context.first_endpoint().ok_or_else(|| {
335 crate::error::TalosError::Config("No endpoints in context".to_string())
336 })?;
337
338 let endpoint_url = if endpoint.contains("://") {
340 endpoint.clone()
341 } else if endpoint.contains(':') {
342 format!("https://{}", endpoint)
343 } else {
344 format!("https://{}:50000", endpoint)
345 };
346
347 let mut client_config = TalosClientConfig::new(&endpoint_url);
349
350 if let (Some(ca), Some(crt), Some(key)) = (&context.ca, &context.crt, &context.key) {
352 let temp_dir = std::env::temp_dir().join("talos-api-rs");
353 std::fs::create_dir_all(&temp_dir).map_err(|e| {
354 crate::error::TalosError::Config(format!("Failed to create temp dir: {}", e))
355 })?;
356
357 let ca_path = temp_dir.join("ca.crt");
358 let crt_path = temp_dir.join("client.crt");
359 let key_path = temp_dir.join("client.key");
360
361 std::fs::write(&ca_path, ca).map_err(|e| {
362 crate::error::TalosError::Config(format!("Failed to write CA cert: {}", e))
363 })?;
364 std::fs::write(&crt_path, crt).map_err(|e| {
365 crate::error::TalosError::Config(format!("Failed to write client cert: {}", e))
366 })?;
367 std::fs::write(&key_path, key).map_err(|e| {
368 crate::error::TalosError::Config(format!("Failed to write client key: {}", e))
369 })?;
370
371 client_config = client_config
372 .with_ca(ca_path.to_string_lossy().to_string())
373 .with_client_cert(crt_path.to_string_lossy().to_string())
374 .with_client_key(key_path.to_string_lossy().to_string());
375 }
376
377 let mut client = Self::new(client_config).await?;
378
379 if let Some(nodes) = &context.nodes {
381 if !nodes.is_empty() {
382 client.node_target = NodeTarget::from(nodes.clone());
383 }
384 }
385
386 Ok(client)
387 }
388
389 #[must_use]
407 pub fn with_node(&self, target: NodeTarget) -> Self {
408 Self {
409 config: self.config.clone(),
410 channel: self.channel.clone(),
411 node_target: target,
412 }
413 }
414
415 #[must_use]
419 pub fn with_nodes(&self, nodes: impl IntoIterator<Item = impl Into<String>>) -> Self {
420 self.with_node(NodeTarget::multiple(nodes))
421 }
422
423 #[must_use]
425 pub fn node_target(&self) -> &NodeTarget {
426 &self.node_target
427 }
428
429 async fn create_http_channel(config: &TalosClientConfig) -> Result<Channel> {
431 let mut endpoint = Channel::from_shared(config.endpoint.clone())
432 .map_err(|e| crate::error::TalosError::Config(e.to_string()))?;
433
434 if let Some(timeout) = config.connect_timeout {
436 endpoint = endpoint.connect_timeout(timeout);
437 }
438 if let Some(timeout) = config.request_timeout {
439 endpoint = endpoint.timeout(timeout);
440 }
441 if let Some(interval) = config.keepalive_interval {
442 if let Some(ka_timeout) = config.keepalive_timeout {
443 endpoint = endpoint
444 .http2_keep_alive_interval(interval)
445 .keep_alive_timeout(ka_timeout);
446 }
447 }
448
449 let channel = endpoint.connect().await?;
450 Ok(channel)
451 }
452
453 async fn create_insecure_channel(config: &TalosClientConfig) -> Result<Channel> {
455 let tls_config = rustls::ClientConfig::builder()
456 .with_root_certificates(rustls::RootCertStore::empty())
457 .with_no_client_auth();
458
459 Self::connect_with_custom_tls(config, tls_config, true).await
460 }
461
462 async fn create_mtls_channel(config: &TalosClientConfig) -> Result<Channel> {
464 let root_store = if let Some(ca_path) = &config.ca_path {
466 let ca_pem = std::fs::read(ca_path).map_err(|e| {
467 crate::error::TalosError::Config(format!("Failed to read CA cert: {e}"))
468 })?;
469 let mut root_store = rustls::RootCertStore::empty();
470 let certs = Self::load_pem_certs(&ca_pem)?;
471 for cert in certs {
472 root_store.add(cert).map_err(|e| {
473 crate::error::TalosError::Config(format!("Failed to add CA cert: {e}"))
474 })?;
475 }
476 root_store
477 } else {
478 let mut root_store = rustls::RootCertStore::empty();
480 root_store.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned());
481 root_store
482 };
483
484 let tls_config =
486 if let (Some(crt_path), Some(key_path)) = (&config.crt_path, &config.key_path) {
487 let cert_pem = std::fs::read(crt_path).map_err(|e| {
489 crate::error::TalosError::Config(format!("Failed to read client cert: {e}"))
490 })?;
491 let key_pem = std::fs::read(key_path).map_err(|e| {
492 crate::error::TalosError::Config(format!("Failed to read client key: {e}"))
493 })?;
494
495 let client_certs = Self::load_pem_certs(&cert_pem)?;
496 let client_key = Self::load_pem_key(&key_pem)?;
497
498 rustls::ClientConfig::builder()
499 .with_root_certificates(root_store)
500 .with_client_auth_cert(client_certs, client_key)
501 .map_err(|e| {
502 crate::error::TalosError::Config(format!(
503 "Failed to configure client auth: {e}"
504 ))
505 })?
506 } else {
507 rustls::ClientConfig::builder()
509 .with_root_certificates(root_store)
510 .with_no_client_auth()
511 };
512
513 Self::connect_with_custom_tls(config, tls_config, false).await
514 }
515
516 async fn connect_with_custom_tls(
518 config: &TalosClientConfig,
519 mut tls_config: rustls::ClientConfig,
520 skip_verification: bool,
521 ) -> Result<Channel> {
522 if skip_verification {
524 tls_config
525 .dangerous()
526 .set_certificate_verifier(Arc::new(NoVerifier));
527 }
528
529 tls_config.alpn_protocols = vec![b"h2".to_vec()];
531 let tls_config = Arc::new(tls_config);
532 let connector = tokio_rustls::TlsConnector::from(tls_config);
533
534 let endpoint_url = if config.endpoint.starts_with("http") {
536 config.endpoint.clone()
537 } else {
538 format!("https://{}", config.endpoint)
539 };
540 let parsed_url = url::Url::parse(&endpoint_url)
541 .map_err(|e| crate::error::TalosError::Config(format!("Invalid endpoint URL: {e}")))?;
542 let host = parsed_url
543 .host_str()
544 .ok_or_else(|| crate::error::TalosError::Config("No host in endpoint".to_string()))?
545 .to_string();
546 let port = parsed_url.port().unwrap_or(50000);
547
548 let endpoint_for_connector = format!("http://{}:{}", host, port);
550
551 let mut endpoint = Endpoint::from_shared(endpoint_for_connector)
553 .map_err(|e| crate::error::TalosError::Config(e.to_string()))?;
554
555 if let Some(timeout) = config.connect_timeout {
557 endpoint = endpoint.connect_timeout(timeout);
558 }
559 if let Some(timeout) = config.request_timeout {
560 endpoint = endpoint.timeout(timeout);
561 }
562 if let Some(interval) = config.keepalive_interval {
563 if let Some(ka_timeout) = config.keepalive_timeout {
564 endpoint = endpoint
565 .http2_keep_alive_interval(interval)
566 .keep_alive_timeout(ka_timeout);
567 }
568 }
569
570 let channel = endpoint
571 .connect_with_connector(tower::service_fn(move |uri: tonic::transport::Uri| {
572 let connector = connector.clone();
573 let host = host.clone();
574 async move {
575 let uri_host = uri.host().unwrap_or("127.0.0.1");
576 let uri_port = uri.port_u16().unwrap_or(50000);
577 let addr = format!("{}:{}", uri_host, uri_port);
578
579 let tcp = tokio::net::TcpStream::connect(addr).await?;
580
581 let server_name = ServerName::try_from(host.clone())
583 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e))?;
584
585 let tls_stream = connector.connect(server_name, tcp).await?;
586 Ok::<_, std::io::Error>(TokioIo::new(tls_stream))
587 }
588 }))
589 .await?;
590
591 Ok(channel)
592 }
593
594 #[allow(clippy::result_large_err)]
596 fn load_pem_certs(pem_data: &[u8]) -> Result<Vec<CertificateDer<'static>>> {
597 let mut reader = std::io::BufReader::new(pem_data);
598 let certs: Vec<CertificateDer<'static>> = rustls_pemfile::certs(&mut reader)
599 .collect::<std::result::Result<Vec<_>, _>>()
600 .map_err(|e| {
601 crate::error::TalosError::Config(format!("Failed to parse PEM certificates: {e}"))
602 })?;
603 if certs.is_empty() {
604 return Err(crate::error::TalosError::Config(
605 "No certificates found in PEM data".to_string(),
606 ));
607 }
608 Ok(certs)
609 }
610
611 #[allow(clippy::result_large_err)]
613 fn load_pem_key(pem_data: &[u8]) -> Result<PrivateKeyDer<'static>> {
614 let mut reader = std::io::BufReader::new(pem_data);
616
617 loop {
618 match rustls_pemfile::read_one(&mut reader) {
619 Ok(Some(rustls_pemfile::Item::Pkcs1Key(key))) => {
620 return Ok(PrivateKeyDer::Pkcs1(key));
621 }
622 Ok(Some(rustls_pemfile::Item::Pkcs8Key(key))) => {
623 return Ok(PrivateKeyDer::Pkcs8(key));
624 }
625 Ok(Some(rustls_pemfile::Item::Sec1Key(key))) => {
626 return Ok(PrivateKeyDer::Sec1(key));
627 }
628 Ok(Some(_)) => {
629 continue;
631 }
632 Ok(None) => {
633 break;
634 }
635 Err(e) => {
636 return Err(crate::error::TalosError::Config(format!(
637 "Failed to parse PEM key: {e}"
638 )));
639 }
640 }
641 }
642
643 let pem_str = std::str::from_utf8(pem_data)
646 .map_err(|e| crate::error::TalosError::Config(format!("Invalid UTF-8 in key: {e}")))?;
647
648 if pem_str.contains("-----BEGIN ED25519 PRIVATE KEY-----") {
649 let start_marker = "-----BEGIN ED25519 PRIVATE KEY-----";
651 let end_marker = "-----END ED25519 PRIVATE KEY-----";
652
653 if let Some(start) = pem_str.find(start_marker) {
654 if let Some(end) = pem_str.find(end_marker) {
655 let base64_content = &pem_str[start + start_marker.len()..end];
656 let base64_clean: String = base64_content
657 .chars()
658 .filter(|c| !c.is_whitespace())
659 .collect();
660
661 let der_bytes = base64::Engine::decode(
662 &base64::engine::general_purpose::STANDARD,
663 &base64_clean,
664 )
665 .map_err(|e| {
666 crate::error::TalosError::Config(format!(
667 "Failed to decode ED25519 key: {e}"
668 ))
669 })?;
670
671 return Ok(PrivateKeyDer::Pkcs8(
673 rustls::pki_types::PrivatePkcs8KeyDer::from(der_bytes),
674 ));
675 }
676 }
677 }
678
679 Err(crate::error::TalosError::Config(
680 "No private key found in PEM data".to_string(),
681 ))
682 }
683
684 pub fn version(&self) -> VersionServiceClient<Channel> {
686 VersionServiceClient::new(self.channel.clone())
687 }
688
689 pub fn machine(&self) -> MachineServiceClient<Channel> {
691 MachineServiceClient::new(self.channel.clone())
692 }
693
694 fn make_request<T>(&self, inner: T) -> tonic::Request<T> {
696 self.node_target
697 .apply_to_request(tonic::Request::new(inner))
698 }
699
700 pub async fn apply_configuration(
737 &self,
738 request: ApplyConfigurationRequest,
739 ) -> Result<ApplyConfigurationResponse> {
740 let proto_request: ProtoApplyConfigRequest = request.into();
741 let grpc_request = self.make_request(proto_request);
742 let response = self
743 .machine()
744 .apply_configuration(grpc_request)
745 .await?
746 .into_inner();
747 Ok(response.into())
748 }
749
750 pub async fn apply_configuration_yaml(
767 &self,
768 yaml: &str,
769 mode: crate::ApplyMode,
770 dry_run: bool,
771 ) -> Result<ApplyConfigurationResponse> {
772 let request = ApplyConfigurationRequest::builder()
773 .config_yaml(yaml)
774 .mode(mode)
775 .dry_run(dry_run)
776 .build();
777 self.apply_configuration(request).await
778 }
779
780 pub async fn bootstrap(&self, request: BootstrapRequest) -> Result<BootstrapResponse> {
821 let proto_request: ProtoBootstrapRequest = request.into();
822 let grpc_request = self.make_request(proto_request);
823 let response = self.machine().bootstrap(grpc_request).await?.into_inner();
824 Ok(response.into())
825 }
826
827 pub async fn bootstrap_cluster(&self) -> Result<BootstrapResponse> {
831 self.bootstrap(BootstrapRequest::new()).await
832 }
833
834 pub async fn kubeconfig(&self) -> Result<KubeconfigResponse> {
869 use tonic::codegen::tokio_stream::StreamExt;
870
871 let mut stream = self.machine().kubeconfig(()).await?.into_inner();
872
873 let mut data = Vec::new();
874 let mut node = None;
875
876 while let Some(chunk) = stream.next().await {
877 let chunk = chunk?;
878 if node.is_none() {
880 if let Some(metadata) = &chunk.metadata {
881 node = Some(metadata.hostname.clone());
882 }
883 }
884 data.extend(chunk.bytes);
885 }
886
887 Ok(KubeconfigResponse::new(data, node))
888 }
889
890 pub async fn reset(&self, request: ResetRequest) -> Result<ResetResponse> {
927 let mut client = MachineServiceClient::new(self.channel.clone());
928
929 let proto_request: ProtoResetRequest = request.into();
930 let response = client.reset(proto_request).await?;
931 let inner = response.into_inner();
932
933 Ok(ResetResponse::from(inner))
934 }
935
936 pub async fn reset_graceful(&self) -> Result<ResetResponse> {
945 self.reset(ResetRequest::graceful()).await
946 }
947
948 pub async fn etcd_member_list(
971 &self,
972 request: EtcdMemberListRequest,
973 ) -> Result<EtcdMemberListResponse> {
974 let mut client = MachineServiceClient::new(self.channel.clone());
975
976 let proto_request: ProtoEtcdMemberListRequest = request.into();
977 let response = client.etcd_member_list(proto_request).await?;
978 let inner = response.into_inner();
979
980 Ok(EtcdMemberListResponse::from(inner))
981 }
982
983 pub async fn etcd_remove_member_by_id(
1008 &self,
1009 request: EtcdRemoveMemberByIdRequest,
1010 ) -> Result<EtcdRemoveMemberByIdResponse> {
1011 let mut client = MachineServiceClient::new(self.channel.clone());
1012
1013 let proto_request: ProtoEtcdRemoveMemberByIdRequest = request.into();
1014 let response = client.etcd_remove_member_by_id(proto_request).await?;
1015 let inner = response.into_inner();
1016
1017 Ok(EtcdRemoveMemberByIdResponse::from(inner))
1018 }
1019
1020 pub async fn etcd_leave_cluster(
1024 &self,
1025 request: EtcdLeaveClusterRequest,
1026 ) -> Result<EtcdLeaveClusterResponse> {
1027 let mut client = MachineServiceClient::new(self.channel.clone());
1028
1029 let proto_request: ProtoEtcdLeaveClusterRequest = request.into();
1030 let response = client.etcd_leave_cluster(proto_request).await?;
1031 let inner = response.into_inner();
1032
1033 Ok(EtcdLeaveClusterResponse::from(inner))
1034 }
1035
1036 pub async fn etcd_forfeit_leadership(
1040 &self,
1041 request: EtcdForfeitLeadershipRequest,
1042 ) -> Result<EtcdForfeitLeadershipResponse> {
1043 let mut client = MachineServiceClient::new(self.channel.clone());
1044
1045 let proto_request: ProtoEtcdForfeitLeadershipRequest = request.into();
1046 let response = client.etcd_forfeit_leadership(proto_request).await?;
1047 let inner = response.into_inner();
1048
1049 Ok(EtcdForfeitLeadershipResponse::from(inner))
1050 }
1051
1052 pub async fn etcd_status(&self) -> Result<EtcdStatusResponse> {
1054 let mut client = MachineServiceClient::new(self.channel.clone());
1055
1056 let response = client.etcd_status(()).await?;
1057 let inner = response.into_inner();
1058
1059 Ok(EtcdStatusResponse::from(inner))
1060 }
1061
1062 pub async fn etcd_alarm_list(&self) -> Result<EtcdAlarmListResponse> {
1064 let mut client = MachineServiceClient::new(self.channel.clone());
1065
1066 let response = client.etcd_alarm_list(()).await?;
1067 let inner = response.into_inner();
1068
1069 Ok(EtcdAlarmListResponse::from(inner))
1070 }
1071
1072 pub async fn etcd_alarm_disarm(&self) -> Result<EtcdAlarmDisarmResponse> {
1074 let mut client = MachineServiceClient::new(self.channel.clone());
1075
1076 let response = client.etcd_alarm_disarm(()).await?;
1077 let inner = response.into_inner();
1078
1079 Ok(EtcdAlarmDisarmResponse::from(inner))
1080 }
1081
1082 pub async fn etcd_defragment(&self) -> Result<EtcdDefragmentResponse> {
1086 let mut client = MachineServiceClient::new(self.channel.clone());
1087
1088 let response = client.etcd_defragment(()).await?;
1089 let inner = response.into_inner();
1090
1091 Ok(EtcdDefragmentResponse::from(inner))
1092 }
1093
1094 pub async fn dmesg(&self, request: DmesgRequest) -> Result<DmesgResponse> {
1117 use tonic::codegen::tokio_stream::StreamExt;
1118
1119 let mut client = MachineServiceClient::new(self.channel.clone());
1120
1121 let proto_request: ProtoDmesgRequest = request.into();
1122 let response = client.dmesg(proto_request).await?;
1123 let mut stream = response.into_inner();
1124
1125 let mut data = Vec::new();
1126 let mut node = None;
1127
1128 while let Some(chunk) = stream.next().await {
1129 let chunk = chunk?;
1130 if node.is_none() {
1131 if let Some(metadata) = &chunk.metadata {
1132 node = Some(metadata.hostname.clone());
1133 }
1134 }
1135 data.extend(chunk.bytes);
1136 }
1137
1138 Ok(DmesgResponse::new(data, node))
1139 }
1140
1141 pub async fn upgrade(&self, request: UpgradeRequest) -> Result<UpgradeResponse> {
1172 let mut client = MachineServiceClient::new(self.channel.clone());
1173
1174 let proto_request: ProtoUpgradeRequest = request.into();
1175 let response = client.upgrade(proto_request).await?;
1176 let inner = response.into_inner();
1177
1178 Ok(UpgradeResponse::from(inner))
1179 }
1180
1181 pub async fn service_start(
1187 &self,
1188 request: ServiceStartRequest,
1189 ) -> Result<ServiceStartResponse> {
1190 let mut client = MachineServiceClient::new(self.channel.clone());
1191
1192 let proto_request: ProtoServiceStartRequest = request.into();
1193 let response = client.service_start(proto_request).await?;
1194 let inner = response.into_inner();
1195
1196 Ok(ServiceStartResponse::from(inner))
1197 }
1198
1199 pub async fn service_stop(&self, request: ServiceStopRequest) -> Result<ServiceStopResponse> {
1201 let mut client = MachineServiceClient::new(self.channel.clone());
1202
1203 let proto_request: ProtoServiceStopRequest = request.into();
1204 let response = client.service_stop(proto_request).await?;
1205 let inner = response.into_inner();
1206
1207 Ok(ServiceStopResponse::from(inner))
1208 }
1209
1210 pub async fn service_restart(
1212 &self,
1213 request: ServiceRestartRequest,
1214 ) -> Result<ServiceRestartResponse> {
1215 let mut client = MachineServiceClient::new(self.channel.clone());
1216
1217 let proto_request: ProtoServiceRestartRequest = request.into();
1218 let response = client.service_restart(proto_request).await?;
1219 let inner = response.into_inner();
1220
1221 Ok(ServiceRestartResponse::from(inner))
1222 }
1223
1224 pub async fn logs(&self, request: LogsRequest) -> Result<LogsResponse> {
1226 use tonic::codegen::tokio_stream::StreamExt;
1227
1228 let mut client = MachineServiceClient::new(self.channel.clone());
1229
1230 let proto_request: ProtoLogsRequest = request.into();
1231 let response = client.logs(proto_request).await?;
1232 let mut stream = response.into_inner();
1233
1234 let mut data = Vec::new();
1235 let mut node = None;
1236
1237 while let Some(chunk) = stream.next().await {
1238 let chunk = chunk?;
1239 if node.is_none() {
1240 if let Some(metadata) = &chunk.metadata {
1241 node = Some(metadata.hostname.clone());
1242 }
1243 }
1244 data.extend(chunk.bytes);
1245 }
1246
1247 Ok(LogsResponse::new(data, node))
1248 }
1249
1250 pub async fn load_avg(&self) -> Result<LoadAvgResponse> {
1256 let mut client = MachineServiceClient::new(self.channel.clone());
1257
1258 let response = client.load_avg(()).await?;
1259 let inner = response.into_inner();
1260
1261 Ok(LoadAvgResponse::from(inner))
1262 }
1263
1264 pub async fn memory(&self) -> Result<MemoryResponse> {
1266 let mut client = MachineServiceClient::new(self.channel.clone());
1267
1268 let response = client.memory(()).await?;
1269 let inner = response.into_inner();
1270
1271 Ok(MemoryResponse::from(inner))
1272 }
1273
1274 pub async fn cpu_info(&self) -> Result<CpuInfoResponse> {
1276 let mut client = MachineServiceClient::new(self.channel.clone());
1277
1278 let response = client.cpu_info(()).await?;
1279 let inner = response.into_inner();
1280
1281 Ok(CpuInfoResponse::from(inner))
1282 }
1283
1284 pub async fn disk_stats(&self) -> Result<DiskStatsResponse> {
1286 let mut client = MachineServiceClient::new(self.channel.clone());
1287
1288 let response = client.disk_stats(()).await?;
1289 let inner = response.into_inner();
1290
1291 Ok(DiskStatsResponse::from(inner))
1292 }
1293
1294 pub async fn network_device_stats(&self) -> Result<NetworkDeviceStatsResponse> {
1296 let mut client = MachineServiceClient::new(self.channel.clone());
1297
1298 let response = client.network_device_stats(()).await?;
1299 let inner = response.into_inner();
1300
1301 Ok(NetworkDeviceStatsResponse::from(inner))
1302 }
1303
1304 pub async fn mounts(&self) -> Result<MountsResponse> {
1306 let mut client = MachineServiceClient::new(self.channel.clone());
1307
1308 let response = client.mounts(()).await?;
1309 let inner = response.into_inner();
1310
1311 Ok(MountsResponse::from(inner))
1312 }
1313
1314 pub async fn processes(&self) -> Result<ProcessesResponse> {
1316 let mut client = MachineServiceClient::new(self.channel.clone());
1317
1318 let response = client.processes(()).await?;
1319 let inner = response.into_inner();
1320
1321 Ok(ProcessesResponse::from(inner))
1322 }
1323
1324 pub async fn list(&self, request: ListRequest) -> Result<ListResponse> {
1330 use tonic::codegen::tokio_stream::StreamExt;
1331
1332 let mut client = MachineServiceClient::new(self.channel.clone());
1333
1334 let proto_request: ProtoListRequest = request.into();
1335 let response = client.list(proto_request).await?;
1336 let mut stream = response.into_inner();
1337
1338 let mut entries = Vec::new();
1339 while let Some(info) = stream.next().await {
1340 let info = info?;
1341 entries.push(FileInfo::from(info));
1342 }
1343
1344 Ok(ListResponse::new(entries))
1345 }
1346
1347 pub async fn read(&self, request: ReadRequest) -> Result<ReadResponse> {
1349 use tonic::codegen::tokio_stream::StreamExt;
1350
1351 let mut client = MachineServiceClient::new(self.channel.clone());
1352
1353 let proto_request: ProtoReadRequest = request.into();
1354 let response = client.read(proto_request).await?;
1355 let mut stream = response.into_inner();
1356
1357 let mut data = Vec::new();
1358 let mut node = None;
1359
1360 while let Some(chunk) = stream.next().await {
1361 let chunk = chunk?;
1362 if node.is_none() {
1363 if let Some(metadata) = &chunk.metadata {
1364 node = Some(metadata.hostname.clone());
1365 }
1366 }
1367 data.extend(chunk.bytes);
1368 }
1369
1370 Ok(ReadResponse::new(data, node))
1371 }
1372
1373 pub async fn copy(&self, request: CopyRequest) -> Result<CopyResponse> {
1375 use tonic::codegen::tokio_stream::StreamExt;
1376
1377 let mut client = MachineServiceClient::new(self.channel.clone());
1378
1379 let proto_request: ProtoCopyRequest = request.into();
1380 let response = client.copy(proto_request).await?;
1381 let mut stream = response.into_inner();
1382
1383 let mut data = Vec::new();
1384 let mut node = None;
1385
1386 while let Some(chunk) = stream.next().await {
1387 let chunk = chunk?;
1388 if node.is_none() {
1389 if let Some(metadata) = &chunk.metadata {
1390 node = Some(metadata.hostname.clone());
1391 }
1392 }
1393 data.extend(chunk.bytes);
1394 }
1395
1396 Ok(CopyResponse::new(data, node))
1397 }
1398
1399 pub async fn disk_usage(&self, request: DiskUsageRequest) -> Result<DiskUsageResponse> {
1401 use tonic::codegen::tokio_stream::StreamExt;
1402
1403 let mut client = MachineServiceClient::new(self.channel.clone());
1404
1405 let proto_request: ProtoDiskUsageRequest = request.into();
1406 let response = client.disk_usage(proto_request).await?;
1407 let mut stream = response.into_inner();
1408
1409 let mut entries = Vec::new();
1410 while let Some(info) = stream.next().await {
1411 let info = info?;
1412 entries.push(DiskUsageInfo::from(info));
1413 }
1414
1415 Ok(DiskUsageResponse::new(entries))
1416 }
1417
1418 pub async fn rollback(&self) -> Result<RollbackResponse> {
1424 let mut client = MachineServiceClient::new(self.channel.clone());
1425
1426 let response = client.rollback(ProtoRollbackRequest {}).await?;
1427 let inner = response.into_inner();
1428
1429 Ok(RollbackResponse::from(inner))
1430 }
1431
1432 pub async fn generate_client_configuration(
1434 &self,
1435 request: GenerateClientConfigurationRequest,
1436 ) -> Result<GenerateClientConfigurationResponse> {
1437 let mut client = MachineServiceClient::new(self.channel.clone());
1438
1439 let proto_request: ProtoGenerateClientConfigRequest = request.into();
1440 let response = client.generate_client_configuration(proto_request).await?;
1441 let inner = response.into_inner();
1442
1443 Ok(GenerateClientConfigurationResponse::from(inner))
1444 }
1445
1446 pub async fn packet_capture(
1448 &self,
1449 request: PacketCaptureRequest,
1450 ) -> Result<PacketCaptureResponse> {
1451 use tonic::codegen::tokio_stream::StreamExt;
1452
1453 let mut client = MachineServiceClient::new(self.channel.clone());
1454
1455 let proto_request: ProtoPacketCaptureRequest = request.into();
1456 let response = client.packet_capture(proto_request).await?;
1457 let mut stream = response.into_inner();
1458
1459 let mut data = Vec::new();
1460 let mut node = None;
1461
1462 while let Some(chunk) = stream.next().await {
1463 let chunk = chunk?;
1464 if node.is_none() {
1465 if let Some(metadata) = &chunk.metadata {
1466 node = Some(metadata.hostname.clone());
1467 }
1468 }
1469 data.extend(chunk.bytes);
1470 }
1471
1472 Ok(PacketCaptureResponse::new(data, node))
1473 }
1474
1475 pub async fn netstat(&self, request: NetstatRequest) -> Result<NetstatResponse> {
1477 let mut client = MachineServiceClient::new(self.channel.clone());
1478
1479 let proto_request: ProtoNetstatRequest = request.into();
1480 let response = client.netstat(proto_request).await?;
1481 let inner = response.into_inner();
1482
1483 Ok(NetstatResponse::from(inner))
1484 }
1485}
1486
1487#[derive(Debug)]
1489struct NoVerifier;
1490
1491impl rustls::client::danger::ServerCertVerifier for NoVerifier {
1492 fn verify_server_cert(
1493 &self,
1494 _end_entity: &rustls::pki_types::CertificateDer<'_>,
1495 _intermediates: &[rustls::pki_types::CertificateDer<'_>],
1496 _server_name: &rustls::pki_types::ServerName<'_>,
1497 _ocsp_response: &[u8],
1498 _now: rustls::pki_types::UnixTime,
1499 ) -> std::result::Result<rustls::client::danger::ServerCertVerified, rustls::Error> {
1500 Ok(rustls::client::danger::ServerCertVerified::assertion())
1501 }
1502
1503 fn verify_tls12_signature(
1504 &self,
1505 _message: &[u8],
1506 _cert: &rustls::pki_types::CertificateDer<'_>,
1507 _dss: &rustls::DigitallySignedStruct,
1508 ) -> std::result::Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
1509 Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
1510 }
1511
1512 fn verify_tls13_signature(
1513 &self,
1514 _message: &[u8],
1515 _cert: &rustls::pki_types::CertificateDer<'_>,
1516 _dss: &rustls::DigitallySignedStruct,
1517 ) -> std::result::Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
1518 Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
1519 }
1520
1521 fn supported_verify_schemes(&self) -> Vec<rustls::SignatureScheme> {
1522 vec![
1523 rustls::SignatureScheme::RSA_PKCS1_SHA1,
1524 rustls::SignatureScheme::ECDSA_SHA1_Legacy,
1525 rustls::SignatureScheme::RSA_PKCS1_SHA256,
1526 rustls::SignatureScheme::ECDSA_NISTP256_SHA256,
1527 rustls::SignatureScheme::RSA_PKCS1_SHA384,
1528 rustls::SignatureScheme::ECDSA_NISTP384_SHA384,
1529 rustls::SignatureScheme::RSA_PKCS1_SHA512,
1530 rustls::SignatureScheme::ECDSA_NISTP521_SHA512,
1531 rustls::SignatureScheme::RSA_PSS_SHA256,
1532 rustls::SignatureScheme::RSA_PSS_SHA384,
1533 rustls::SignatureScheme::RSA_PSS_SHA512,
1534 rustls::SignatureScheme::ED25519,
1535 rustls::SignatureScheme::ED448,
1536 ]
1537 }
1538}
1539
1540pub use pool::{ConnectionPool, ConnectionPoolConfig, EndpointHealth, HealthStatus, LoadBalancer};