1pub mod discovery;
4mod node_target;
5pub mod pool;
6#[cfg(test)]
7mod tests;
8
9pub use discovery::{ClusterDiscovery, ClusterHealth, ClusterMember, NodeHealth, NodeRole};
10pub use node_target::{NodeTarget, NODE_METADATA_KEY};
11
12use crate::api::machine::machine_service_client::MachineServiceClient;
13use crate::api::machine::ApplyConfigurationRequest as ProtoApplyConfigRequest;
14use crate::api::machine::BootstrapRequest as ProtoBootstrapRequest;
15use crate::api::machine::CopyRequest as ProtoCopyRequest;
16use crate::api::machine::DiskUsageRequest as ProtoDiskUsageRequest;
17use crate::api::machine::DmesgRequest as ProtoDmesgRequest;
18use crate::api::machine::EtcdForfeitLeadershipRequest as ProtoEtcdForfeitLeadershipRequest;
19use crate::api::machine::EtcdLeaveClusterRequest as ProtoEtcdLeaveClusterRequest;
20use crate::api::machine::EtcdMemberListRequest as ProtoEtcdMemberListRequest;
21use crate::api::machine::EtcdRemoveMemberByIdRequest as ProtoEtcdRemoveMemberByIdRequest;
22use crate::api::machine::GenerateClientConfigurationRequest as ProtoGenerateClientConfigRequest;
23use crate::api::machine::ImageListRequest as ProtoImageListRequest;
24use crate::api::machine::ImagePullRequest as ProtoImagePullRequest;
25use crate::api::machine::ListRequest as ProtoListRequest;
26use crate::api::machine::LogsRequest as ProtoLogsRequest;
27use crate::api::machine::NetstatRequest as ProtoNetstatRequest;
28use crate::api::machine::PacketCaptureRequest as ProtoPacketCaptureRequest;
29use crate::api::machine::ReadRequest as ProtoReadRequest;
30use crate::api::machine::ResetRequest as ProtoResetRequest;
31use crate::api::machine::RollbackRequest as ProtoRollbackRequest;
32use crate::api::machine::ServiceRestartRequest as ProtoServiceRestartRequest;
33use crate::api::machine::ServiceStartRequest as ProtoServiceStartRequest;
34use crate::api::machine::ServiceStopRequest as ProtoServiceStopRequest;
35use crate::api::machine::UpgradeRequest as ProtoUpgradeRequest;
36use crate::api::version::version_service_client::VersionServiceClient;
37use crate::error::Result;
38use crate::resources::{
39 ApplyConfigurationRequest, ApplyConfigurationResponse, BootstrapRequest, BootstrapResponse,
40 CopyRequest, CopyResponse, CpuInfoResponse, DiskStatsResponse, DiskUsageInfo, DiskUsageRequest,
41 DiskUsageResponse, DmesgRequest, DmesgResponse, EtcdAlarmDisarmResponse, EtcdAlarmListResponse,
42 EtcdDefragmentResponse, EtcdForfeitLeadershipRequest, EtcdForfeitLeadershipResponse,
43 EtcdLeaveClusterRequest, EtcdLeaveClusterResponse, EtcdMemberListRequest,
44 EtcdMemberListResponse, EtcdRemoveMemberByIdRequest, EtcdRemoveMemberByIdResponse,
45 EtcdStatusResponse, FileInfo, GenerateClientConfigurationRequest,
46 GenerateClientConfigurationResponse, ImageInfo, ImageListRequest, ImagePullRequest,
47 ImagePullResponse, KubeconfigResponse, ListRequest, ListResponse, LoadAvgResponse, LogsRequest,
48 LogsResponse, MemoryResponse, MountsResponse, NetstatRequest, NetstatResponse,
49 NetworkDeviceStatsResponse, PacketCaptureRequest, PacketCaptureResponse, ProcessesResponse,
50 ReadRequest, ReadResponse, ResetRequest, ResetResponse, RollbackResponse,
51 ServiceRestartRequest, ServiceRestartResponse, ServiceStartRequest, ServiceStartResponse,
52 ServiceStopRequest, ServiceStopResponse, UpgradeRequest, UpgradeResponse,
53};
54use hyper_util::rt::TokioIo;
55use rustls::pki_types::{CertificateDer, PrivateKeyDer, ServerName};
56use std::sync::Arc;
57use std::time::Duration;
58use tonic::transport::{Channel, Endpoint};
59
60#[derive(Clone, Debug)]
62pub struct TalosClientConfig {
63 pub endpoint: String,
65 pub crt_path: Option<String>,
67 pub key_path: Option<String>,
69 pub ca_path: Option<String>,
71 pub insecure: bool,
73 pub connect_timeout: Option<Duration>,
75 pub request_timeout: Option<Duration>,
77 pub keepalive_interval: Option<Duration>,
79 pub keepalive_timeout: Option<Duration>,
81}
82
83impl Default for TalosClientConfig {
84 fn default() -> Self {
85 Self {
86 endpoint: "https://127.0.0.1:50000".to_string(),
87 crt_path: None,
88 key_path: None,
89 ca_path: None,
90 insecure: false,
91 connect_timeout: Some(Duration::from_secs(10)),
92 request_timeout: Some(Duration::from_secs(30)),
93 keepalive_interval: Some(Duration::from_secs(30)),
94 keepalive_timeout: Some(Duration::from_secs(10)),
95 }
96 }
97}
98
99impl TalosClientConfig {
100 #[must_use]
102 pub fn new(endpoint: impl Into<String>) -> Self {
103 Self {
104 endpoint: endpoint.into(),
105 ..Default::default()
106 }
107 }
108
109 #[must_use]
111 pub fn builder(endpoint: impl Into<String>) -> TalosClientConfigBuilder {
112 TalosClientConfigBuilder::new(endpoint)
113 }
114
115 #[must_use]
117 pub fn with_client_cert(mut self, crt_path: impl Into<String>) -> Self {
118 self.crt_path = Some(crt_path.into());
119 self
120 }
121
122 #[must_use]
124 pub fn with_client_key(mut self, key_path: impl Into<String>) -> Self {
125 self.key_path = Some(key_path.into());
126 self
127 }
128
129 #[must_use]
131 pub fn with_ca(mut self, ca_path: impl Into<String>) -> Self {
132 self.ca_path = Some(ca_path.into());
133 self
134 }
135
136 #[must_use]
138 pub fn insecure(mut self) -> Self {
139 self.insecure = true;
140 self
141 }
142
143 #[must_use]
145 pub fn with_connect_timeout(mut self, timeout: Duration) -> Self {
146 self.connect_timeout = Some(timeout);
147 self
148 }
149
150 #[must_use]
152 pub fn with_request_timeout(mut self, timeout: Duration) -> Self {
153 self.request_timeout = Some(timeout);
154 self
155 }
156
157 #[must_use]
159 pub fn no_timeout(mut self) -> Self {
160 self.connect_timeout = None;
161 self.request_timeout = None;
162 self
163 }
164}
165
166#[derive(Debug, Clone)]
168pub struct TalosClientConfigBuilder {
169 endpoint: String,
170 crt_path: Option<String>,
171 key_path: Option<String>,
172 ca_path: Option<String>,
173 insecure: bool,
174 connect_timeout: Option<Duration>,
175 request_timeout: Option<Duration>,
176 keepalive_interval: Option<Duration>,
177 keepalive_timeout: Option<Duration>,
178}
179
180impl TalosClientConfigBuilder {
181 #[must_use]
183 pub fn new(endpoint: impl Into<String>) -> Self {
184 Self {
185 endpoint: endpoint.into(),
186 crt_path: None,
187 key_path: None,
188 ca_path: None,
189 insecure: false,
190 connect_timeout: Some(Duration::from_secs(10)),
191 request_timeout: Some(Duration::from_secs(30)),
192 keepalive_interval: Some(Duration::from_secs(30)),
193 keepalive_timeout: Some(Duration::from_secs(10)),
194 }
195 }
196
197 #[must_use]
199 pub fn client_cert(mut self, path: impl Into<String>) -> Self {
200 self.crt_path = Some(path.into());
201 self
202 }
203
204 #[must_use]
206 pub fn client_key(mut self, path: impl Into<String>) -> Self {
207 self.key_path = Some(path.into());
208 self
209 }
210
211 #[must_use]
213 pub fn ca_cert(mut self, path: impl Into<String>) -> Self {
214 self.ca_path = Some(path.into());
215 self
216 }
217
218 #[must_use]
220 pub fn insecure(mut self) -> Self {
221 self.insecure = true;
222 self
223 }
224
225 #[must_use]
227 pub fn connect_timeout(mut self, timeout: Duration) -> Self {
228 self.connect_timeout = Some(timeout);
229 self
230 }
231
232 #[must_use]
234 pub fn request_timeout(mut self, timeout: Duration) -> Self {
235 self.request_timeout = Some(timeout);
236 self
237 }
238
239 #[must_use]
241 pub fn keepalive(mut self, interval: Duration, timeout: Duration) -> Self {
242 self.keepalive_interval = Some(interval);
243 self.keepalive_timeout = Some(timeout);
244 self
245 }
246
247 #[must_use]
249 pub fn no_timeout(mut self) -> Self {
250 self.connect_timeout = None;
251 self.request_timeout = None;
252 self
253 }
254
255 #[must_use]
257 pub fn build(self) -> TalosClientConfig {
258 TalosClientConfig {
259 endpoint: self.endpoint,
260 crt_path: self.crt_path,
261 key_path: self.key_path,
262 ca_path: self.ca_path,
263 insecure: self.insecure,
264 connect_timeout: self.connect_timeout,
265 request_timeout: self.request_timeout,
266 keepalive_interval: self.keepalive_interval,
267 keepalive_timeout: self.keepalive_timeout,
268 }
269 }
270}
271
272#[derive(Clone)]
273pub struct TalosClient {
274 #[allow(dead_code)]
276 config: TalosClientConfig,
277 channel: Channel,
278 node_target: NodeTarget,
280}
281
282impl TalosClient {
283 pub async fn new(config: TalosClientConfig) -> Result<Self> {
284 let _ = rustls::crypto::ring::default_provider().install_default();
286
287 let is_http = config.endpoint.starts_with("http://");
289
290 let channel = if is_http {
291 Self::create_http_channel(&config).await?
293 } else if config.insecure {
294 Self::create_insecure_channel(&config).await?
295 } else {
296 Self::create_mtls_channel(&config).await?
297 };
298
299 Ok(Self {
300 config,
301 channel,
302 node_target: NodeTarget::Default,
303 })
304 }
305
306 pub async fn from_talosconfig(
327 config: &crate::config::TalosConfig,
328 context_name: Option<&str>,
329 ) -> Result<Self> {
330 let context = if let Some(name) = context_name {
331 config.get_context(name).ok_or_else(|| {
332 crate::error::TalosError::Config(format!("Context '{}' not found", name))
333 })?
334 } else {
335 config.active_context().ok_or_else(|| {
336 crate::error::TalosError::Config("No active context in talosconfig".to_string())
337 })?
338 };
339
340 let endpoint = context.first_endpoint().ok_or_else(|| {
341 crate::error::TalosError::Config("No endpoints in context".to_string())
342 })?;
343
344 let endpoint_url = if endpoint.contains("://") {
346 endpoint.clone()
347 } else if endpoint.contains(':') {
348 format!("https://{}", endpoint)
349 } else {
350 format!("https://{}:50000", endpoint)
351 };
352
353 let mut client_config = TalosClientConfig::new(&endpoint_url);
355
356 if let (Some(ca), Some(crt), Some(key)) = (&context.ca, &context.crt, &context.key) {
358 let temp_dir = std::env::temp_dir().join("talos-api-rs");
359 std::fs::create_dir_all(&temp_dir).map_err(|e| {
360 crate::error::TalosError::Config(format!("Failed to create temp dir: {}", e))
361 })?;
362
363 let ca_path = temp_dir.join("ca.crt");
364 let crt_path = temp_dir.join("client.crt");
365 let key_path = temp_dir.join("client.key");
366
367 std::fs::write(&ca_path, ca).map_err(|e| {
368 crate::error::TalosError::Config(format!("Failed to write CA cert: {}", e))
369 })?;
370 std::fs::write(&crt_path, crt).map_err(|e| {
371 crate::error::TalosError::Config(format!("Failed to write client cert: {}", e))
372 })?;
373 std::fs::write(&key_path, key).map_err(|e| {
374 crate::error::TalosError::Config(format!("Failed to write client key: {}", e))
375 })?;
376
377 client_config = client_config
378 .with_ca(ca_path.to_string_lossy().to_string())
379 .with_client_cert(crt_path.to_string_lossy().to_string())
380 .with_client_key(key_path.to_string_lossy().to_string());
381 }
382
383 let mut client = Self::new(client_config).await?;
384
385 if let Some(nodes) = &context.nodes {
387 if !nodes.is_empty() {
388 client.node_target = NodeTarget::from(nodes.clone());
389 }
390 }
391
392 Ok(client)
393 }
394
395 #[must_use]
413 pub fn with_node(&self, target: NodeTarget) -> Self {
414 Self {
415 config: self.config.clone(),
416 channel: self.channel.clone(),
417 node_target: target,
418 }
419 }
420
421 #[must_use]
425 pub fn with_nodes(&self, nodes: impl IntoIterator<Item = impl Into<String>>) -> Self {
426 self.with_node(NodeTarget::multiple(nodes))
427 }
428
429 #[must_use]
431 pub fn node_target(&self) -> &NodeTarget {
432 &self.node_target
433 }
434
435 async fn create_http_channel(config: &TalosClientConfig) -> Result<Channel> {
437 let mut endpoint = Channel::from_shared(config.endpoint.clone())
438 .map_err(|e| crate::error::TalosError::Config(e.to_string()))?;
439
440 if let Some(timeout) = config.connect_timeout {
442 endpoint = endpoint.connect_timeout(timeout);
443 }
444 if let Some(timeout) = config.request_timeout {
445 endpoint = endpoint.timeout(timeout);
446 }
447 if let Some(interval) = config.keepalive_interval {
448 if let Some(ka_timeout) = config.keepalive_timeout {
449 endpoint = endpoint
450 .http2_keep_alive_interval(interval)
451 .keep_alive_timeout(ka_timeout);
452 }
453 }
454
455 let channel = endpoint.connect().await?;
456 Ok(channel)
457 }
458
459 async fn create_insecure_channel(config: &TalosClientConfig) -> Result<Channel> {
461 let tls_config = rustls::ClientConfig::builder()
462 .with_root_certificates(rustls::RootCertStore::empty())
463 .with_no_client_auth();
464
465 Self::connect_with_custom_tls(config, tls_config, true).await
466 }
467
468 async fn create_mtls_channel(config: &TalosClientConfig) -> Result<Channel> {
470 let root_store = if let Some(ca_path) = &config.ca_path {
472 let ca_pem = std::fs::read(ca_path).map_err(|e| {
473 crate::error::TalosError::Config(format!("Failed to read CA cert: {e}"))
474 })?;
475 let mut root_store = rustls::RootCertStore::empty();
476 let certs = Self::load_pem_certs(&ca_pem)?;
477 for cert in certs {
478 root_store.add(cert).map_err(|e| {
479 crate::error::TalosError::Config(format!("Failed to add CA cert: {e}"))
480 })?;
481 }
482 root_store
483 } else {
484 let mut root_store = rustls::RootCertStore::empty();
486 root_store.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned());
487 root_store
488 };
489
490 let tls_config =
492 if let (Some(crt_path), Some(key_path)) = (&config.crt_path, &config.key_path) {
493 let cert_pem = std::fs::read(crt_path).map_err(|e| {
495 crate::error::TalosError::Config(format!("Failed to read client cert: {e}"))
496 })?;
497 let key_pem = std::fs::read(key_path).map_err(|e| {
498 crate::error::TalosError::Config(format!("Failed to read client key: {e}"))
499 })?;
500
501 let client_certs = Self::load_pem_certs(&cert_pem)?;
502 let client_key = Self::load_pem_key(&key_pem)?;
503
504 rustls::ClientConfig::builder()
505 .with_root_certificates(root_store)
506 .with_client_auth_cert(client_certs, client_key)
507 .map_err(|e| {
508 crate::error::TalosError::Config(format!(
509 "Failed to configure client auth: {e}"
510 ))
511 })?
512 } else {
513 rustls::ClientConfig::builder()
515 .with_root_certificates(root_store)
516 .with_no_client_auth()
517 };
518
519 Self::connect_with_custom_tls(config, tls_config, false).await
520 }
521
522 async fn connect_with_custom_tls(
524 config: &TalosClientConfig,
525 mut tls_config: rustls::ClientConfig,
526 skip_verification: bool,
527 ) -> Result<Channel> {
528 if skip_verification {
530 tls_config
531 .dangerous()
532 .set_certificate_verifier(Arc::new(NoVerifier));
533 }
534
535 tls_config.alpn_protocols = vec![b"h2".to_vec()];
537 let tls_config = Arc::new(tls_config);
538 let connector = tokio_rustls::TlsConnector::from(tls_config);
539
540 let endpoint_url = if config.endpoint.starts_with("http") {
542 config.endpoint.clone()
543 } else {
544 format!("https://{}", config.endpoint)
545 };
546 let parsed_url = url::Url::parse(&endpoint_url)
547 .map_err(|e| crate::error::TalosError::Config(format!("Invalid endpoint URL: {e}")))?;
548 let host = parsed_url
549 .host_str()
550 .ok_or_else(|| crate::error::TalosError::Config("No host in endpoint".to_string()))?
551 .to_string();
552 let port = parsed_url.port().unwrap_or(50000);
553
554 let endpoint_for_connector = format!("http://{}:{}", host, port);
556
557 let mut endpoint = Endpoint::from_shared(endpoint_for_connector)
559 .map_err(|e| crate::error::TalosError::Config(e.to_string()))?;
560
561 if let Some(timeout) = config.connect_timeout {
563 endpoint = endpoint.connect_timeout(timeout);
564 }
565 if let Some(timeout) = config.request_timeout {
566 endpoint = endpoint.timeout(timeout);
567 }
568 if let Some(interval) = config.keepalive_interval {
569 if let Some(ka_timeout) = config.keepalive_timeout {
570 endpoint = endpoint
571 .http2_keep_alive_interval(interval)
572 .keep_alive_timeout(ka_timeout);
573 }
574 }
575
576 let channel = endpoint
577 .connect_with_connector(tower::service_fn(move |uri: tonic::transport::Uri| {
578 let connector = connector.clone();
579 let host = host.clone();
580 async move {
581 let uri_host = uri.host().unwrap_or("127.0.0.1");
582 let uri_port = uri.port_u16().unwrap_or(50000);
583 let addr = format!("{}:{}", uri_host, uri_port);
584
585 let tcp = tokio::net::TcpStream::connect(addr).await?;
586
587 let server_name = ServerName::try_from(host.clone())
589 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e))?;
590
591 let tls_stream = connector.connect(server_name, tcp).await?;
592 Ok::<_, std::io::Error>(TokioIo::new(tls_stream))
593 }
594 }))
595 .await?;
596
597 Ok(channel)
598 }
599
600 #[allow(clippy::result_large_err)]
602 fn load_pem_certs(pem_data: &[u8]) -> Result<Vec<CertificateDer<'static>>> {
603 let mut reader = std::io::BufReader::new(pem_data);
604 let certs: Vec<CertificateDer<'static>> = rustls_pemfile::certs(&mut reader)
605 .collect::<std::result::Result<Vec<_>, _>>()
606 .map_err(|e| {
607 crate::error::TalosError::Config(format!("Failed to parse PEM certificates: {e}"))
608 })?;
609 if certs.is_empty() {
610 return Err(crate::error::TalosError::Config(
611 "No certificates found in PEM data".to_string(),
612 ));
613 }
614 Ok(certs)
615 }
616
617 #[allow(clippy::result_large_err)]
619 fn load_pem_key(pem_data: &[u8]) -> Result<PrivateKeyDer<'static>> {
620 let mut reader = std::io::BufReader::new(pem_data);
622
623 loop {
624 match rustls_pemfile::read_one(&mut reader) {
625 Ok(Some(rustls_pemfile::Item::Pkcs1Key(key))) => {
626 return Ok(PrivateKeyDer::Pkcs1(key));
627 }
628 Ok(Some(rustls_pemfile::Item::Pkcs8Key(key))) => {
629 return Ok(PrivateKeyDer::Pkcs8(key));
630 }
631 Ok(Some(rustls_pemfile::Item::Sec1Key(key))) => {
632 return Ok(PrivateKeyDer::Sec1(key));
633 }
634 Ok(Some(_)) => {
635 continue;
637 }
638 Ok(None) => {
639 break;
640 }
641 Err(e) => {
642 return Err(crate::error::TalosError::Config(format!(
643 "Failed to parse PEM key: {e}"
644 )));
645 }
646 }
647 }
648
649 let pem_str = std::str::from_utf8(pem_data)
652 .map_err(|e| crate::error::TalosError::Config(format!("Invalid UTF-8 in key: {e}")))?;
653
654 if pem_str.contains("-----BEGIN ED25519 PRIVATE KEY-----") {
655 let start_marker = "-----BEGIN ED25519 PRIVATE KEY-----";
657 let end_marker = "-----END ED25519 PRIVATE KEY-----";
658
659 if let Some(start) = pem_str.find(start_marker) {
660 if let Some(end) = pem_str.find(end_marker) {
661 let base64_content = &pem_str[start + start_marker.len()..end];
662 let base64_clean: String = base64_content
663 .chars()
664 .filter(|c| !c.is_whitespace())
665 .collect();
666
667 let der_bytes = base64::Engine::decode(
668 &base64::engine::general_purpose::STANDARD,
669 &base64_clean,
670 )
671 .map_err(|e| {
672 crate::error::TalosError::Config(format!(
673 "Failed to decode ED25519 key: {e}"
674 ))
675 })?;
676
677 return Ok(PrivateKeyDer::Pkcs8(
679 rustls::pki_types::PrivatePkcs8KeyDer::from(der_bytes),
680 ));
681 }
682 }
683 }
684
685 Err(crate::error::TalosError::Config(
686 "No private key found in PEM data".to_string(),
687 ))
688 }
689
690 pub fn version(&self) -> VersionServiceClient<Channel> {
692 VersionServiceClient::new(self.channel.clone())
693 }
694
695 pub fn machine(&self) -> MachineServiceClient<Channel> {
697 MachineServiceClient::new(self.channel.clone())
698 }
699
700 fn make_request<T>(&self, inner: T) -> tonic::Request<T> {
702 self.node_target
703 .apply_to_request(tonic::Request::new(inner))
704 }
705
706 pub async fn apply_configuration(
743 &self,
744 request: ApplyConfigurationRequest,
745 ) -> Result<ApplyConfigurationResponse> {
746 let proto_request: ProtoApplyConfigRequest = request.into();
747 let grpc_request = self.make_request(proto_request);
748 let response = self
749 .machine()
750 .apply_configuration(grpc_request)
751 .await?
752 .into_inner();
753 Ok(response.into())
754 }
755
756 pub async fn apply_configuration_yaml(
773 &self,
774 yaml: &str,
775 mode: crate::ApplyMode,
776 dry_run: bool,
777 ) -> Result<ApplyConfigurationResponse> {
778 let request = ApplyConfigurationRequest::builder()
779 .config_yaml(yaml)
780 .mode(mode)
781 .dry_run(dry_run)
782 .build();
783 self.apply_configuration(request).await
784 }
785
786 pub async fn bootstrap(&self, request: BootstrapRequest) -> Result<BootstrapResponse> {
827 let proto_request: ProtoBootstrapRequest = request.into();
828 let grpc_request = self.make_request(proto_request);
829 let response = self.machine().bootstrap(grpc_request).await?.into_inner();
830 Ok(response.into())
831 }
832
833 pub async fn bootstrap_cluster(&self) -> Result<BootstrapResponse> {
837 self.bootstrap(BootstrapRequest::new()).await
838 }
839
840 pub async fn kubeconfig(&self) -> Result<KubeconfigResponse> {
875 use tonic::codegen::tokio_stream::StreamExt;
876
877 let mut stream = self.machine().kubeconfig(()).await?.into_inner();
878
879 let mut data = Vec::new();
880 let mut node = None;
881
882 while let Some(chunk) = stream.next().await {
883 let chunk = chunk?;
884 if node.is_none() {
886 if let Some(metadata) = &chunk.metadata {
887 node = Some(metadata.hostname.clone());
888 }
889 }
890 data.extend(chunk.bytes);
891 }
892
893 Ok(KubeconfigResponse::new(data, node))
894 }
895
896 pub async fn reset(&self, request: ResetRequest) -> Result<ResetResponse> {
933 let mut client = MachineServiceClient::new(self.channel.clone());
934
935 let proto_request: ProtoResetRequest = request.into();
936 let response = client.reset(proto_request).await?;
937 let inner = response.into_inner();
938
939 Ok(ResetResponse::from(inner))
940 }
941
942 pub async fn reset_graceful(&self) -> Result<ResetResponse> {
951 self.reset(ResetRequest::graceful()).await
952 }
953
954 pub async fn etcd_member_list(
977 &self,
978 request: EtcdMemberListRequest,
979 ) -> Result<EtcdMemberListResponse> {
980 let mut client = MachineServiceClient::new(self.channel.clone());
981
982 let proto_request: ProtoEtcdMemberListRequest = request.into();
983 let response = client.etcd_member_list(proto_request).await?;
984 let inner = response.into_inner();
985
986 Ok(EtcdMemberListResponse::from(inner))
987 }
988
989 pub async fn etcd_remove_member_by_id(
1014 &self,
1015 request: EtcdRemoveMemberByIdRequest,
1016 ) -> Result<EtcdRemoveMemberByIdResponse> {
1017 let mut client = MachineServiceClient::new(self.channel.clone());
1018
1019 let proto_request: ProtoEtcdRemoveMemberByIdRequest = request.into();
1020 let response = client.etcd_remove_member_by_id(proto_request).await?;
1021 let inner = response.into_inner();
1022
1023 Ok(EtcdRemoveMemberByIdResponse::from(inner))
1024 }
1025
1026 pub async fn etcd_leave_cluster(
1030 &self,
1031 request: EtcdLeaveClusterRequest,
1032 ) -> Result<EtcdLeaveClusterResponse> {
1033 let mut client = MachineServiceClient::new(self.channel.clone());
1034
1035 let proto_request: ProtoEtcdLeaveClusterRequest = request.into();
1036 let response = client.etcd_leave_cluster(proto_request).await?;
1037 let inner = response.into_inner();
1038
1039 Ok(EtcdLeaveClusterResponse::from(inner))
1040 }
1041
1042 pub async fn etcd_forfeit_leadership(
1046 &self,
1047 request: EtcdForfeitLeadershipRequest,
1048 ) -> Result<EtcdForfeitLeadershipResponse> {
1049 let mut client = MachineServiceClient::new(self.channel.clone());
1050
1051 let proto_request: ProtoEtcdForfeitLeadershipRequest = request.into();
1052 let response = client.etcd_forfeit_leadership(proto_request).await?;
1053 let inner = response.into_inner();
1054
1055 Ok(EtcdForfeitLeadershipResponse::from(inner))
1056 }
1057
1058 pub async fn etcd_status(&self) -> Result<EtcdStatusResponse> {
1060 let mut client = MachineServiceClient::new(self.channel.clone());
1061
1062 let response = client.etcd_status(()).await?;
1063 let inner = response.into_inner();
1064
1065 Ok(EtcdStatusResponse::from(inner))
1066 }
1067
1068 pub async fn etcd_alarm_list(&self) -> Result<EtcdAlarmListResponse> {
1070 let mut client = MachineServiceClient::new(self.channel.clone());
1071
1072 let response = client.etcd_alarm_list(()).await?;
1073 let inner = response.into_inner();
1074
1075 Ok(EtcdAlarmListResponse::from(inner))
1076 }
1077
1078 pub async fn etcd_alarm_disarm(&self) -> Result<EtcdAlarmDisarmResponse> {
1080 let mut client = MachineServiceClient::new(self.channel.clone());
1081
1082 let response = client.etcd_alarm_disarm(()).await?;
1083 let inner = response.into_inner();
1084
1085 Ok(EtcdAlarmDisarmResponse::from(inner))
1086 }
1087
1088 pub async fn etcd_defragment(&self) -> Result<EtcdDefragmentResponse> {
1092 let mut client = MachineServiceClient::new(self.channel.clone());
1093
1094 let response = client.etcd_defragment(()).await?;
1095 let inner = response.into_inner();
1096
1097 Ok(EtcdDefragmentResponse::from(inner))
1098 }
1099
1100 pub async fn dmesg(&self, request: DmesgRequest) -> Result<DmesgResponse> {
1123 use tonic::codegen::tokio_stream::StreamExt;
1124
1125 let mut client = MachineServiceClient::new(self.channel.clone());
1126
1127 let proto_request: ProtoDmesgRequest = request.into();
1128 let response = client.dmesg(proto_request).await?;
1129 let mut stream = response.into_inner();
1130
1131 let mut data = Vec::new();
1132 let mut node = None;
1133
1134 while let Some(chunk) = stream.next().await {
1135 let chunk = chunk?;
1136 if node.is_none() {
1137 if let Some(metadata) = &chunk.metadata {
1138 node = Some(metadata.hostname.clone());
1139 }
1140 }
1141 data.extend(chunk.bytes);
1142 }
1143
1144 Ok(DmesgResponse::new(data, node))
1145 }
1146
1147 pub async fn upgrade(&self, request: UpgradeRequest) -> Result<UpgradeResponse> {
1178 let mut client = MachineServiceClient::new(self.channel.clone());
1179
1180 let proto_request: ProtoUpgradeRequest = request.into();
1181 let response = client.upgrade(proto_request).await?;
1182 let inner = response.into_inner();
1183
1184 Ok(UpgradeResponse::from(inner))
1185 }
1186
1187 pub async fn service_start(
1193 &self,
1194 request: ServiceStartRequest,
1195 ) -> Result<ServiceStartResponse> {
1196 let mut client = MachineServiceClient::new(self.channel.clone());
1197
1198 let proto_request: ProtoServiceStartRequest = request.into();
1199 let response = client.service_start(proto_request).await?;
1200 let inner = response.into_inner();
1201
1202 Ok(ServiceStartResponse::from(inner))
1203 }
1204
1205 pub async fn service_stop(&self, request: ServiceStopRequest) -> Result<ServiceStopResponse> {
1207 let mut client = MachineServiceClient::new(self.channel.clone());
1208
1209 let proto_request: ProtoServiceStopRequest = request.into();
1210 let response = client.service_stop(proto_request).await?;
1211 let inner = response.into_inner();
1212
1213 Ok(ServiceStopResponse::from(inner))
1214 }
1215
1216 pub async fn service_restart(
1218 &self,
1219 request: ServiceRestartRequest,
1220 ) -> Result<ServiceRestartResponse> {
1221 let mut client = MachineServiceClient::new(self.channel.clone());
1222
1223 let proto_request: ProtoServiceRestartRequest = request.into();
1224 let response = client.service_restart(proto_request).await?;
1225 let inner = response.into_inner();
1226
1227 Ok(ServiceRestartResponse::from(inner))
1228 }
1229
1230 pub async fn logs(&self, request: LogsRequest) -> Result<LogsResponse> {
1232 use tonic::codegen::tokio_stream::StreamExt;
1233
1234 let mut client = MachineServiceClient::new(self.channel.clone());
1235
1236 let proto_request: ProtoLogsRequest = request.into();
1237 let response = client.logs(proto_request).await?;
1238 let mut stream = response.into_inner();
1239
1240 let mut data = Vec::new();
1241 let mut node = None;
1242
1243 while let Some(chunk) = stream.next().await {
1244 let chunk = chunk?;
1245 if node.is_none() {
1246 if let Some(metadata) = &chunk.metadata {
1247 node = Some(metadata.hostname.clone());
1248 }
1249 }
1250 data.extend(chunk.bytes);
1251 }
1252
1253 Ok(LogsResponse::new(data, node))
1254 }
1255
1256 pub async fn load_avg(&self) -> Result<LoadAvgResponse> {
1262 let mut client = MachineServiceClient::new(self.channel.clone());
1263
1264 let response = client.load_avg(()).await?;
1265 let inner = response.into_inner();
1266
1267 Ok(LoadAvgResponse::from(inner))
1268 }
1269
1270 pub async fn memory(&self) -> Result<MemoryResponse> {
1272 let mut client = MachineServiceClient::new(self.channel.clone());
1273
1274 let response = client.memory(()).await?;
1275 let inner = response.into_inner();
1276
1277 Ok(MemoryResponse::from(inner))
1278 }
1279
1280 pub async fn cpu_info(&self) -> Result<CpuInfoResponse> {
1282 let mut client = MachineServiceClient::new(self.channel.clone());
1283
1284 let response = client.cpu_info(()).await?;
1285 let inner = response.into_inner();
1286
1287 Ok(CpuInfoResponse::from(inner))
1288 }
1289
1290 pub async fn disk_stats(&self) -> Result<DiskStatsResponse> {
1292 let mut client = MachineServiceClient::new(self.channel.clone());
1293
1294 let response = client.disk_stats(()).await?;
1295 let inner = response.into_inner();
1296
1297 Ok(DiskStatsResponse::from(inner))
1298 }
1299
1300 pub async fn network_device_stats(&self) -> Result<NetworkDeviceStatsResponse> {
1302 let mut client = MachineServiceClient::new(self.channel.clone());
1303
1304 let response = client.network_device_stats(()).await?;
1305 let inner = response.into_inner();
1306
1307 Ok(NetworkDeviceStatsResponse::from(inner))
1308 }
1309
1310 pub async fn mounts(&self) -> Result<MountsResponse> {
1312 let mut client = MachineServiceClient::new(self.channel.clone());
1313
1314 let response = client.mounts(()).await?;
1315 let inner = response.into_inner();
1316
1317 Ok(MountsResponse::from(inner))
1318 }
1319
1320 pub async fn processes(&self) -> Result<ProcessesResponse> {
1322 let mut client = MachineServiceClient::new(self.channel.clone());
1323
1324 let response = client.processes(()).await?;
1325 let inner = response.into_inner();
1326
1327 Ok(ProcessesResponse::from(inner))
1328 }
1329
1330 pub async fn list(&self, request: ListRequest) -> Result<ListResponse> {
1336 use tonic::codegen::tokio_stream::StreamExt;
1337
1338 let mut client = MachineServiceClient::new(self.channel.clone());
1339
1340 let proto_request: ProtoListRequest = request.into();
1341 let response = client.list(proto_request).await?;
1342 let mut stream = response.into_inner();
1343
1344 let mut entries = Vec::new();
1345 while let Some(info) = stream.next().await {
1346 let info = info?;
1347 entries.push(FileInfo::from(info));
1348 }
1349
1350 Ok(ListResponse::new(entries))
1351 }
1352
1353 pub async fn read(&self, request: ReadRequest) -> Result<ReadResponse> {
1355 use tonic::codegen::tokio_stream::StreamExt;
1356
1357 let mut client = MachineServiceClient::new(self.channel.clone());
1358
1359 let proto_request: ProtoReadRequest = request.into();
1360 let response = client.read(proto_request).await?;
1361 let mut stream = response.into_inner();
1362
1363 let mut data = Vec::new();
1364 let mut node = None;
1365
1366 while let Some(chunk) = stream.next().await {
1367 let chunk = chunk?;
1368 if node.is_none() {
1369 if let Some(metadata) = &chunk.metadata {
1370 node = Some(metadata.hostname.clone());
1371 }
1372 }
1373 data.extend(chunk.bytes);
1374 }
1375
1376 Ok(ReadResponse::new(data, node))
1377 }
1378
1379 pub async fn copy(&self, request: CopyRequest) -> Result<CopyResponse> {
1381 use tonic::codegen::tokio_stream::StreamExt;
1382
1383 let mut client = MachineServiceClient::new(self.channel.clone());
1384
1385 let proto_request: ProtoCopyRequest = request.into();
1386 let response = client.copy(proto_request).await?;
1387 let mut stream = response.into_inner();
1388
1389 let mut data = Vec::new();
1390 let mut node = None;
1391
1392 while let Some(chunk) = stream.next().await {
1393 let chunk = chunk?;
1394 if node.is_none() {
1395 if let Some(metadata) = &chunk.metadata {
1396 node = Some(metadata.hostname.clone());
1397 }
1398 }
1399 data.extend(chunk.bytes);
1400 }
1401
1402 Ok(CopyResponse::new(data, node))
1403 }
1404
1405 pub async fn disk_usage(&self, request: DiskUsageRequest) -> Result<DiskUsageResponse> {
1407 use tonic::codegen::tokio_stream::StreamExt;
1408
1409 let mut client = MachineServiceClient::new(self.channel.clone());
1410
1411 let proto_request: ProtoDiskUsageRequest = request.into();
1412 let response = client.disk_usage(proto_request).await?;
1413 let mut stream = response.into_inner();
1414
1415 let mut entries = Vec::new();
1416 while let Some(info) = stream.next().await {
1417 let info = info?;
1418 entries.push(DiskUsageInfo::from(info));
1419 }
1420
1421 Ok(DiskUsageResponse::new(entries))
1422 }
1423
1424 pub async fn rollback(&self) -> Result<RollbackResponse> {
1430 let mut client = MachineServiceClient::new(self.channel.clone());
1431
1432 let response = client.rollback(ProtoRollbackRequest {}).await?;
1433 let inner = response.into_inner();
1434
1435 Ok(RollbackResponse::from(inner))
1436 }
1437
1438 pub async fn generate_client_configuration(
1440 &self,
1441 request: GenerateClientConfigurationRequest,
1442 ) -> Result<GenerateClientConfigurationResponse> {
1443 let mut client = MachineServiceClient::new(self.channel.clone());
1444
1445 let proto_request: ProtoGenerateClientConfigRequest = request.into();
1446 let response = client.generate_client_configuration(proto_request).await?;
1447 let inner = response.into_inner();
1448
1449 Ok(GenerateClientConfigurationResponse::from(inner))
1450 }
1451
1452 pub async fn packet_capture(
1454 &self,
1455 request: PacketCaptureRequest,
1456 ) -> Result<PacketCaptureResponse> {
1457 use tonic::codegen::tokio_stream::StreamExt;
1458
1459 let mut client = MachineServiceClient::new(self.channel.clone());
1460
1461 let proto_request: ProtoPacketCaptureRequest = request.into();
1462 let response = client.packet_capture(proto_request).await?;
1463 let mut stream = response.into_inner();
1464
1465 let mut data = Vec::new();
1466 let mut node = None;
1467
1468 while let Some(chunk) = stream.next().await {
1469 let chunk = chunk?;
1470 if node.is_none() {
1471 if let Some(metadata) = &chunk.metadata {
1472 node = Some(metadata.hostname.clone());
1473 }
1474 }
1475 data.extend(chunk.bytes);
1476 }
1477
1478 Ok(PacketCaptureResponse::new(data, node))
1479 }
1480
1481 pub async fn netstat(&self, request: NetstatRequest) -> Result<NetstatResponse> {
1483 let mut client = MachineServiceClient::new(self.channel.clone());
1484
1485 let proto_request: ProtoNetstatRequest = request.into();
1486 let response = client.netstat(proto_request).await?;
1487 let inner = response.into_inner();
1488
1489 Ok(NetstatResponse::from(inner))
1490 }
1491
1492 pub async fn image_list(&self, request: ImageListRequest) -> Result<Vec<ImageInfo>> {
1523 use tonic::codegen::tokio_stream::StreamExt;
1524
1525 let mut client = MachineServiceClient::new(self.channel.clone());
1526 let proto_request: ProtoImageListRequest = request.into();
1527 let response = client.image_list(proto_request).await?;
1528
1529 let mut stream = response.into_inner();
1530 let mut images = Vec::new();
1531
1532 while let Some(item) = stream.next().await {
1534 let item = item?;
1535 if let Some(ref metadata) = item.metadata {
1537 if !metadata.error.is_empty() {
1538 return Err(crate::error::TalosError::Validation(metadata.error.clone()));
1539 }
1540 }
1541 images.push(ImageInfo::from(item));
1542 }
1543
1544 Ok(images)
1545 }
1546
1547 pub async fn image_pull(&self, request: ImagePullRequest) -> Result<ImagePullResponse> {
1573 let mut client = MachineServiceClient::new(self.channel.clone());
1574 let proto_request: ProtoImagePullRequest = request.into();
1575 let response = client.image_pull(proto_request).await?;
1576 let inner = response.into_inner();
1577
1578 Ok(ImagePullResponse::from(inner))
1579 }
1580}
1581
1582#[derive(Debug)]
1584struct NoVerifier;
1585
1586impl rustls::client::danger::ServerCertVerifier for NoVerifier {
1587 fn verify_server_cert(
1588 &self,
1589 _end_entity: &rustls::pki_types::CertificateDer<'_>,
1590 _intermediates: &[rustls::pki_types::CertificateDer<'_>],
1591 _server_name: &rustls::pki_types::ServerName<'_>,
1592 _ocsp_response: &[u8],
1593 _now: rustls::pki_types::UnixTime,
1594 ) -> std::result::Result<rustls::client::danger::ServerCertVerified, rustls::Error> {
1595 Ok(rustls::client::danger::ServerCertVerified::assertion())
1596 }
1597
1598 fn verify_tls12_signature(
1599 &self,
1600 _message: &[u8],
1601 _cert: &rustls::pki_types::CertificateDer<'_>,
1602 _dss: &rustls::DigitallySignedStruct,
1603 ) -> std::result::Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
1604 Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
1605 }
1606
1607 fn verify_tls13_signature(
1608 &self,
1609 _message: &[u8],
1610 _cert: &rustls::pki_types::CertificateDer<'_>,
1611 _dss: &rustls::DigitallySignedStruct,
1612 ) -> std::result::Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
1613 Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
1614 }
1615
1616 fn supported_verify_schemes(&self) -> Vec<rustls::SignatureScheme> {
1617 vec![
1618 rustls::SignatureScheme::RSA_PKCS1_SHA1,
1619 rustls::SignatureScheme::ECDSA_SHA1_Legacy,
1620 rustls::SignatureScheme::RSA_PKCS1_SHA256,
1621 rustls::SignatureScheme::ECDSA_NISTP256_SHA256,
1622 rustls::SignatureScheme::RSA_PKCS1_SHA384,
1623 rustls::SignatureScheme::ECDSA_NISTP384_SHA384,
1624 rustls::SignatureScheme::RSA_PKCS1_SHA512,
1625 rustls::SignatureScheme::ECDSA_NISTP521_SHA512,
1626 rustls::SignatureScheme::RSA_PSS_SHA256,
1627 rustls::SignatureScheme::RSA_PSS_SHA384,
1628 rustls::SignatureScheme::RSA_PSS_SHA512,
1629 rustls::SignatureScheme::ED25519,
1630 rustls::SignatureScheme::ED448,
1631 ]
1632 }
1633}
1634
1635pub use pool::{ConnectionPool, ConnectionPoolConfig, EndpointHealth, HealthStatus, LoadBalancer};