1#![no_std]
2extern crate no_std_compat as std;
3
4use bytes::Bytes;
5use std::prelude::v1::*;
6
7pub mod authentication;
8mod registry;
9mod commands;
10
11use std::collections::BTreeMap;
12
13use std::sync::Arc;
14use parking_lot::{ Mutex, MutexGuard };
15use product_os_configuration::{KeyValueKind, KeyValueStore, RelationalKind, RelationalStore, Stores};
16
17use std::str::FromStr;
18use std::time::Duration;
19use serde::{ Deserialize, Serialize };
20use std::num::TryFromIntError;
21
22pub use product_os_request::Method;
23use product_os_request::{ProductOSClient, ProductOSResponse};
24pub use registry::Node;
25
26
27
28pub struct ProductOSController {
29 configuration: product_os_configuration::Configuration,
30
31 certificates: product_os_security::certificates::Certificates,
32
33 max_servers: u8,
34
35 registry: registry::Registry,
36
37 pulse_check: bool,
38 pulse_check_cron: String,
39
40 #[cfg(feature = "monitor")]
41 monitor: bool,
42 #[cfg(feature = "monitor")]
43 monitor_cron: String,
44
45 requester: product_os_request::ProductOSRequester,
46 client: product_os_request::ProductOSRequestClient,
47
48 #[cfg(feature = "relational_store")]
49 relational_store: Arc<product_os_store::ProductOSRelationalStore>,
50 key_value_store: Arc<product_os_store::ProductOSKeyValueStore>,
51}
52
53impl ProductOSController {
54 pub fn new(configuration: product_os_configuration::Configuration, certificates:
55 product_os_security::certificates::Certificates,
56 key_value_store: Option<Arc<product_os_store::ProductOSKeyValueStore>>,
57 #[cfg(feature = "relational_store")] relational_store: Option<Arc<product_os_store::ProductOSRelationalStore>>) -> Self {
58 let key_value_store: Arc<product_os_store::ProductOSKeyValueStore> = key_value_store.unwrap_or_else(|| Arc::new(product_os_store::ProductOSKeyValueStore::new(&KeyValueStore {
59 enabled: false,
60 kind: KeyValueKind::Sink,
61 host: "".to_string(),
62 port: 0,
63 secure: false,
64 db_number: 0,
65 db_name: None,
66 username: None,
67 password: None,
68 pool_size: 0,
69 default_limit: 0,
70 default_offset: 0,
71 prefix: None,
72 })));
73
74 #[cfg(feature = "relational_store")]
75 let relational_store: Arc<product_os_store::ProductOSRelationalStore> = relational_store.unwrap_or_else(|| Arc::new(product_os_store::ProductOSRelationalStore::new(&RelationalStore {
76 enabled: false,
77 kind: RelationalKind::Sink,
78 host: "".to_string(),
79 port: 0,
80 secure: false,
81 db_name: "".to_string(),
82 username: None,
83 password: None,
84 pool_size: 0,
85 default_limit: 0,
86 default_offset: 0,
87 prefix: None,
88 })));
89
90 let registry = registry::Registry::new(&configuration, key_value_store.clone(), certificates.to_owned());
91
92 let mut requester = product_os_request::ProductOSRequester::new();
93 requester.add_header("x-product-os-command", registry.get_me().get_identifier().as_str(), false);
94 let mut request_client = product_os_request::ProductOSRequestClient::new();
95 request_client.build(&requester);
96
97 Self {
98 certificates,
99
100 max_servers: configuration.get_cc_max_servers(),
101
102 pulse_check: configuration.is_pulse_check_enabled(),
103 pulse_check_cron: configuration.get_hearbeat_cron().to_string(),
104
105 #[cfg(feature = "monitor")]
106 monitor: configuration.is_monitor_enabled(),
107 #[cfg(feature = "monitor")]
108 monitor_cron: configuration.get_monitor_cron().to_string(),
109
110 configuration,
111
112 #[cfg(feature = "relational_store")]
113 relational_store,
114 key_value_store,
115
116 registry,
117
118 requester,
119 client: request_client
120 }
121 }
122
123 pub fn get_registry(&self) -> ®istry::Registry {
124 &self.registry
125 }
126
127 pub async fn discover_nodes(&mut self) {
128 self.registry.discover_nodes().await;
129
130 for cert in self.registry.get_nodes_raw_certificates(0, true) {
131 self.requester.add_trusted_certificate_pem(cert);
132 }
133
134 self.client.build(&self.requester);
135 }
136
137 pub fn get_max_servers(&self) -> u8 {
138 self.max_servers.to_owned()
139 }
140
141 pub fn upsert_node_local(&mut self, identifier: String, node: Node) {
142 self.registry.upsert_node_local(identifier, node);
143 }
144
145 pub fn get_certificates(&self) -> Vec<&[u8]> {
146 let mut certificates = vec!();
147 for cert in &self.certificates.certificates {
148 certificates.push(cert.as_slice())
149 }
150
151 certificates
152 }
153
154 pub fn get_private_key(&self) -> &[u8] {
155 self.certificates.private.as_slice()
156 }
157
158 pub fn get_certificates_and_private_key(&self) -> product_os_security::certificates::Certificates {
159 self.certificates.to_owned()
160 }
161
162 pub fn validate_certificate(&self, certificate: &[u8]) -> bool {
163 tracing::trace!("Checking certificate stored {:?} vs given {:?}", self.certificates.certificates, certificate);
164
165 for cert in self.get_certificates() {
166 if cert.eq(certificate) { return true; }
167 }
168
169 false
170 }
171
172 pub fn validate_verify_tag<T>(&self, query: Option<&BTreeMap<String, String>>, payload: Option<T>,
173 headers: Option<&BTreeMap<String, String>>, verify_identifier: &str, verify_tag: &str) -> bool
174 where T: product_os_security::AsByteVector
175 {
176 tracing::trace!("Checking verification provided {} from {}", verify_tag, verify_identifier);
177 let key = self.registry.get_key(verify_identifier);
178 tracing::trace!("Checking verification with key {:?}", key);
179
180 match key {
181 Some(verify_key) => {
182 product_os_security::verify_auth_request(query, false, payload, headers,
183 &["x-product-os-verify", "x-product-os-command", "x-product-os-control"],
184 verify_tag, Some(verify_key.as_slice()))
185 },
186 None => false
187 }
188 }
189
190 pub fn authenticate_command_control<T>(&self, query: Option<&BTreeMap<String, String>>, payload: Option<T>, headers: Option<&BTreeMap<String, String>>,
191 verify_identifier: Option<&str>, verify_tag: Option<&str>) -> Result<bool, authentication::CommandControlAuthenticateError>
192 where T: product_os_security::AsByteVector
193 {
194 tracing::trace!("Verify identifier {:?} and tag {:?} verify result", &verify_identifier, &verify_tag);
195 if verify_identifier.is_some() && verify_tag.is_some() && self.validate_verify_tag(query, payload, headers, verify_identifier.unwrap(), verify_tag.unwrap()) {
196 Ok(true)
197 }
198 else {
199 Err(authentication::CommandControlAuthenticateError {
200 error: authentication::CommandControlAuthenticateErrorState::KeyError(String::from("One of the keys provided was not valid"))
201 })
202 }
203 }
204
205 pub fn search_and_prepare_command(&self, query: BTreeMap<&str, &str>, module: String, instruction: String, data: Option<serde_json::Value>) -> Result<crate::commands::Command, product_os_request::ProductOSRequestError> {
223 let matching_node = self.registry.pick_node(query);
224 let client = &self.client;
225
226 match matching_node {
227 Some(node) => {
228 match self.registry.get_key(node.get_identifier().as_str()) {
229 None => Err(product_os_request::ProductOSRequestError::Error(format!("No key found for matching node"))),
230 Some(key) => Ok(commands::Command {
231 requester: client.clone(),
232 node_url: node.get_address().to_owned(),
233 verify_key: key,
234 module,
235 instruction,
236 data
237 })
238 }
239 },
240 None => Err(product_os_request::ProductOSRequestError::Error(format!("No matching node found")))
241 }
242 }
243
244 pub fn find_and_prepare_command(&self, key: &str, manager: &str, instruction: String, data: Option<serde_json::Value>) -> Result<commands::Command, product_os_request::ProductOSRequestError> {
257 let enabled = true.to_string();
258
259 let query = BTreeMap::from([
260 ("manager.key", key),
261 ("manager.kind", manager),
262 ("manager.enabled", enabled.as_str())
263 ]);
264
265 self.search_and_prepare_command(query, manager.to_string(), instruction, data)
266 }
267
268 pub fn find_any_and_prepare_command(&self, capability: &str, manager: &str, instruction: &str, data: Option<serde_json::Value>) -> Result<commands::Command, product_os_request::ProductOSRequestError> {
280 let enabled = true.to_string();
281
282 let query = BTreeMap::from([
283 ("capability", capability),
284 ("manager.enabled", enabled.as_str())
285 ]);
286
287 self.search_and_prepare_command(query, manager.to_string(), instruction.to_string(), data)
288 }
289
290 pub async fn find_kind_and_prepare_command(&self, kind: &str, manager: &str, instruction: &str, data: Option<serde_json::Value>) -> Result<commands::Command, product_os_request::ProductOSRequestError> {
302 let enabled = true.to_string();
303 let query = BTreeMap::from([
304 ("kind", kind),
305 ("manager.enabled", enabled.as_str())
306 ]);
307
308 self.search_and_prepare_command(query, manager.to_string(), instruction.to_string(), data)
309 }
310
311 pub fn search_and_prepare_ask(&self, query: BTreeMap<&str, &str>, path: &str, data: Option<serde_json::Value>, headers: BTreeMap<String, String>,
329 params: BTreeMap<String, String>, method: Method) -> Result<commands::Ask, product_os_request::ProductOSRequestError> {
330 let matching_node = self.registry.pick_node(query);
331 let client = &self.client;
332 match matching_node {
333 Some(node) => {
334 match self.registry.get_key(node.get_identifier().as_str()) {
335 None => Err(product_os_request::ProductOSRequestError::Error(format!("No key found for matching node"))),
336 Some(key) => Ok(commands::Ask {
337 requester: client.clone(),
338 node_url: node.get_address().to_owned(),
339 verify_key: key,
340 path: path.to_string(),
341 data,
342 headers,
343 params,
344 method
345 })
346 }
347
348 },
349 None => Err(product_os_request::ProductOSRequestError::Error(format!("No matching node found")))
350 }
351 }
352
353 pub async fn find_feature_and_ask(&self, feature: &str, path: &str, data: &Option<serde_json::Value>, headers: &BTreeMap<String, String>,
354 params: &BTreeMap<String, String>, method: &Method) -> Result<ProductOSResponse<product_os_request::BodyBytes>, product_os_request::ProductOSRequestError> {
355 let matching_node = self.registry.pick_node_for_feature(feature);
356 let client = &self.client;
357 match matching_node {
358 Some(node) => {
359 match self.registry.get_key(node.get_identifier().as_str()) {
360 None => Err(product_os_request::ProductOSRequestError::Error(format!("No key found for matching node"))),
361 Some(key) => commands::ask_node(client, node, key.as_slice(), path, data, headers, params, method).await
362 }
363
364 },
365 None => Err(product_os_request::ProductOSRequestError::Error(format!("No matching node found")))
366 }
367 }
368
369 pub fn find_feature_and_prepare_ask(&self, feature: &str, path: &str, data: Option<serde_json::Value>, headers: BTreeMap<String, String>,
370 params: BTreeMap<String, String>, method: Method) -> Result<commands::Ask, product_os_request::ProductOSRequestError> {
371 let matching_node = self.registry.pick_node_for_feature(feature);
372 let client = &self.client;
373 match matching_node {
374 Some(node) => {
375 match self.registry.get_key(node.get_identifier().as_str()) {
376 None => Err(product_os_request::ProductOSRequestError::Error(format!("No key found for matching node"))),
377 Some(key) => Ok(commands::Ask {
378 requester: client.clone(),
379 node_url: node.get_address().to_owned(),
380 verify_key: key,
381 path: path.to_string(),
382 data,
383 headers,
384 params,
385 method
386 })
387 }
388
389 },
390 None => Err(product_os_request::ProductOSRequestError::Error(format!("No matching node found")))
391 }
392 }
393
394 pub fn get_configuration(&self) -> product_os_configuration::Configuration {
419 self.configuration.clone()
420 }
421
422 pub fn get_key(&self, identifier: &str) -> Option<Vec<u8>> {
423 self.registry.get_key(identifier)
424 }
425
426 pub fn create_key_session(&mut self) -> (String, [u8; 32]) {
427 self.registry.create_key_session()
428 }
429
430 pub fn generate_key(&mut self, session_identifier: &str, remote_public_key: &[u8], association: String, remote_session_identifier: Option<String>) {
431 self.registry.generate_key(session_identifier, remote_public_key, association, remote_session_identifier);
432 }
433
434 #[cfg(feature = "relational_store")]
435 pub fn get_relational_store(&mut self) -> Arc<product_os_store::ProductOSRelationalStore> {
436 self.relational_store.clone()
437 }
438
439 pub fn get_key_value_store(&mut self) -> Arc<product_os_store::ProductOSKeyValueStore> {
440 self.key_value_store.clone()
441 }
442
443 pub async fn add_feature(&mut self, feature: Arc<dyn product_os_capabilities::Feature>, base_path: String, router: &mut product_os_router::ProductOSRouter) {
444 self.registry.add_feature(feature, base_path, router).await;
445 }
446
447 pub async fn add_feature_mut(&mut self, feature: Arc<Mutex<dyn product_os_capabilities::Feature>>, base_path: String, router: &mut product_os_router::ProductOSRouter) {
448 self.registry.add_feature_mut(feature, base_path, router).await;
449 }
450
451 pub async fn remove_feature(&mut self, identifier: &str) {
452 self.registry.remove_feature(identifier).await;
453 }
454
455 pub async fn add_service(&mut self, service: Arc<dyn product_os_capabilities::Service>) {
456 self.registry.add_service(service).await;
457 }
458
459 pub async fn add_service_mut(&mut self, service: Arc<Mutex<dyn product_os_capabilities::Service>>) {
460 self.registry.add_service_mut(service).await;
461 }
462
463 pub async fn set_service_active(&mut self, identifier: String, status: bool) {
464 self.registry.set_service_active(identifier, status).await;
465 }
466
467 pub async fn remove_service(&mut self, identifier: &str) {
468 self.registry.remove_service(identifier).await;
469 }
470
471 pub async fn remove_inactive_services(&mut self, query: BTreeMap<&str, &str>) {
472 self.registry.remove_inactive_services(query).await;
473 }
474
475 pub async fn start_services(&mut self) {
476 self.registry.start_services().await;
477 }
478}
479
480
481
482
483
484pub async fn pulse_run(controller_unlocked: Arc<Mutex<ProductOSController>>) {
485 tracing::info!("Starting pulse run...");
486 let controller_locked = controller_unlocked.try_lock_for(core::time::Duration::new(10, 0));
487 match controller_locked {
488 Some(mut controller) => {
489 #[cfg(feature = "distributed")]
490 controller.discover_nodes().await;
491
492 #[cfg(feature = "distributed")]
493 let self_identifier = controller.registry.get_me().get_identifier();
494 #[cfg(feature = "distributed")]
495 let control_url = controller.registry.get_me().get_address();
496
497 #[cfg(feature = "distributed")]
498 let nodes = controller.registry.get_nodes_endpoints(0, true);
499
500 #[cfg(feature = "distributed")] {
501 match controller.registry.check_me_remote().await {
502 Some(_) => { controller.registry.update_me_status(true); },
503 None => {
504 let alive = controller.registry.update_me_status(false);
505
506 if !alive {
507 tracing::error!("Terminating server due to lost presence on remote registry");
508 std::process::exit(8);
509 };
510 }
511 }
512 }
513
514 let services = controller.get_registry().get_me().get_services().list();
516 for (_, service) in services {
517 match service.status().await {
518 Ok(_) => {}
519 Err(_) => {}
520 }
521 }
522
523 #[cfg(feature = "distributed")]
524 let client = controller.client.clone();
525
526 std::mem::drop(controller);
528
529 #[cfg(feature = "distributed")]
530 for (identifier, (url, key)) in nodes {
531 if url != control_url {
532 match key {
533 Some(verify_key) => {
534 match commands::command(&client, url.clone(), verify_key, "status", "ping", None).await {
535 Ok(response) => {
536 let status = response.status();
537
538 match status {
539 product_os_request::StatusCode::UNAUTHORIZED => {
540 let text = {
541 let controller_locked = controller_unlocked.try_lock_for(core::time::Duration::new(10, 0));
542 match controller_locked {
543 Some(mut controller) => {
544 controller.client.text(response).await.unwrap()
545 }
546 None => String::new()
547 }
548 };
549
550 let auth: authentication::CommandControlAuthenticateError = match serde_json::from_str(text.as_str()) {
551 Ok(auth_error) => auth_error,
552 Err(_) => authentication::CommandControlAuthenticateError { error: authentication::CommandControlAuthenticateErrorState::None }
553 };
554
555 tracing::error!("Error object auth {:?}", auth);
556
557 match auth.error {
558 authentication::CommandControlAuthenticateErrorState::KeyError(_) => {
559 tracing::info!("Attempting to purge remote node - keys don't match {}", identifier);
560
561 let controller_locked = controller_unlocked.try_lock_for(core::time::Duration::new(10, 0));
562 match controller_locked {
563 Some(mut controller) => {
564 controller.registry.remove_node(identifier.as_str()).await;
565 }
566 None => tracing::error!("Failed to lock controller")
567 }
568 },
569 authentication::CommandControlAuthenticateErrorState::None => ()
570 };
571 },
572 product_os_request::StatusCode::OK => {
573 let body = {
574 let controller_locked = controller_unlocked.try_lock_for(core::time::Duration::new(10, 0));
575 match controller_locked {
576 Some(mut controller) => {
577 controller.client.bytes(response).await.unwrap()
578 }
579 None => Bytes::new()
580 }
581 };
582
583 tracing::info!("Response received from {}: {} {:?}", url, status, body);
584 }
585 _ => {
586 let body = {
587 let controller_locked = controller_unlocked.try_lock_for(core::time::Duration::new(10, 0));
588 match controller_locked {
589 Some(mut controller) => {
590 controller.client.bytes(response).await.unwrap()
591 }
592 None => Bytes::new()
593 }
594 };
595 tracing::error!("Error response received from {}: {} {:?}", url, status, body);
596 tracing::info!("Attempting to purge remote node - keys don't match {}", identifier);
597
598 let controller_locked = controller_unlocked.try_lock_for(core::time::Duration::new(10, 0));
599 match controller_locked {
600 Some(mut controller) => {
601 controller.registry.update_pulse_status(identifier.as_str(), false).await;
602 }
603 None => tracing::error!("Failed to lock controller")
604 }
605 }
606 }
607 },
608 Err(e) => {
609 tracing::error!("Error encountered {:?} from {}", e, url);
610
611 let controller_locked = controller_unlocked.try_lock_for(core::time::Duration::new(10, 0));
612 match controller_locked {
613 Some(mut controller) => {
614 controller.registry.update_pulse_status(identifier.as_str(), false).await;
615 }
616 None => tracing::error!("Failed to lock controller")
617 }
618 }
619 }
620 },
621 None => ()
622 }
623 } else {
624 if self_identifier != identifier {
625 tracing::info!("Attempting to purge self duplicate node - identifier mismatch {}", identifier);
626 let controller_locked = controller_unlocked.try_lock_for(core::time::Duration::new(10, 0));
627 match controller_locked {
628 Some(mut controller) => {
629 controller.registry.remove_node(identifier.as_str()).await;
630 }
631 None => tracing::error!("Failed to lock controller")
632 }
633 }
634 }
635 }
636
637
638 tracing::info!("Finished pulse run...");
639 }
640 None => tracing::error!("Failed to lock controller")
641 }
642}
643
644
645pub async fn run_controller<X, E>(controller_mutex: Arc<Mutex<ProductOSController>>, executor: Arc<E>)
646where
647 E: product_os_async_executor::Executor<X> + product_os_async_executor::ExecutorPerform<X> + product_os_async_executor::Timer
648{
649 let controller_unlocked = controller_mutex.clone();
650 let controller_locked = controller_unlocked.try_lock_for(core::time::Duration::new(10, 0));
651 match controller_locked {
652 Some(mut controller) => {
653
654 #[cfg(feature = "distributed")] {
655 authentication::perform_self_trust(&mut controller);
656
657 controller.registry.update_me().await;
658 tracing::info!("Added self...");
659
660 tracing::info!("Command and Control registered for {}", controller.registry.get_me().get_identifier());
661
662 controller.discover_nodes().await;
663 perform_announce(&mut controller).await;
664
665 controller.start_services().await;
667 }
668
669 let pulse_check = controller.pulse_check.clone();
670 let pulse_check_cron = cron::Schedule::from_str(controller.pulse_check_cron.as_str()).unwrap();
671 let pulse_check_next = pulse_check_cron.upcoming(chrono::Utc).nth(0).unwrap();
672 let pulse_check_following = pulse_check_cron.upcoming(chrono::Utc).nth(1).unwrap();
673
674 #[cfg(feature = "monitor")]
675 let monitor = controller.monitor.clone();
676 #[cfg(feature = "monitor")]
677 let monitor_cron = cron::Schedule::from_str(controller.monitor_cron.as_str()).unwrap();
678 #[cfg(feature = "monitor")]
679 let monitor_next = monitor_cron.upcoming(chrono::Utc).nth(0).unwrap();
680 #[cfg(feature = "monitor")]
681 let monitor_following = monitor_cron.upcoming(chrono::Utc).nth(1).unwrap();
682
683 std::mem::drop(controller);
685
686 if pulse_check {
687 let _ = E::spawn_from_executor(executor.as_ref(), async move {
688 tracing::info!("Pulse check service starting...");
689
690 let start_duration = match u32::try_from(match pulse_check_next.signed_duration_since(chrono::Utc::now()).to_std() {
691 Ok(d) => d,
692 Err(_) => Duration::new(1, 0)
693 }.as_millis()) {
694 Ok(u) => u,
695 Err(_) => panic!("Period defined is too large for timer")
696 };
697 let interval_duration = match u32::try_from(match pulse_check_following.signed_duration_since(pulse_check_next).to_std() {
698 Ok(d) => d,
699 Err(_) => Duration::new(1, 0)
700 }.as_millis()) {
701 Ok(u) => u,
702 Err(_) => panic!("Period defined is too large for timer")
703 };
704
705 let mut delay = E::once(start_duration).await;
706 let mut interval = E::interval(interval_duration).await;
707
708 delay.tick().await;
709 loop {
710 interval.tick().await;
711 tracing::debug!("Pulse check service running");
712
713 pulse_run(controller_mutex.clone()).await;
714 }
715 });
716 }
717
718 #[cfg(feature = "monitor")]
719 if monitor {
720 let _ = E::spawn_from_executor(executor.as_ref(), async move {
721 tracing::info!("Monitor service starting...");
722
723 let start_duration = match u32::try_from(match pulse_check_next.signed_duration_since(chrono::Utc::now()).to_std() {
724 Ok(d) => d,
725 Err(_) => Duration::new(1, 0)
726 }.as_millis()) {
727 Ok(u) => u,
728 Err(_) => panic!("Period defined is too large for timer")
729 };
730 let interval_duration = match u32::try_from(match pulse_check_following.signed_duration_since(pulse_check_next).to_std() {
731 Ok(d) => d,
732 Err(_) => Duration::new(1, 0)
733 }.as_millis()) {
734 Ok(u) => u,
735 Err(_) => panic!("Period defined is too large for timer")
736 };
737
738 let mut delay = E::once(start_duration).await;
739 let mut interval = E::interval(interval_duration).await;
740
741 delay.tick().await;
742 loop {
743 interval.tick().await;
744 tracing::debug!("Monitor service running");
745
746 product_os_monitoring::process_statistics(None);
747 }
748 }); }
750 }
751 None => tracing::error!("Failed to lock controller")
752 }
753}
754
755
756#[cfg(feature = "distributed")]
757pub async fn perform_announce(controller: &mut MutexGuard<'_, ProductOSController>) {
758 tracing::info!("Announce searching for nodes...");
759
760 authentication::perform_key_exchange(controller).await;
761
762 let self_identifier = controller.registry.get_me().get_identifier();
763 let control_url = controller.registry.get_me().get_address();
764
765 let nodes = controller.registry.get_nodes_endpoints(0, true);
766
767 tracing::info!("Starting announce...");
768 tracing::info!("Nodes data {:?}", nodes);
769
770 for (identifier, (url, key)) in nodes {
771 if url != control_url {
772 match key {
773 Some(verify_key) => {
774 tracing::info!("Announcing {}: {}", identifier, url);
775
776 match commands::command(&controller.client, url.clone(), verify_key, "status", "announce", Some(serde_json::value::to_value(controller.get_registry().get_me()).unwrap())).await {
777 Ok(response) => {
778 let status = response.status();
779
780 match status {
781 product_os_request::StatusCode::UNAUTHORIZED => {
782 let text = controller.client.text(response).await.unwrap();
783 let auth: authentication::CommandControlAuthenticateError = match serde_json::from_str(text.as_str()) {
784 Ok(auth_error) => auth_error,
785 Err(_) => authentication::CommandControlAuthenticateError { error: authentication::CommandControlAuthenticateErrorState::None }
786 };
787
788 tracing::error!("Error object auth {:?}", auth);
789
790 match auth.error {
791 authentication::CommandControlAuthenticateErrorState::KeyError(_) => {
792 tracing::info!("Error from remote node - keys don't match {}", identifier);
793 },
794 authentication::CommandControlAuthenticateErrorState::None => ()
795 };
796 },
797 product_os_request::StatusCode::OK => {
798 let body = controller.client.bytes(response).await.unwrap();
799 tracing::info!("Response received from {}: {} {:?}", url, status, body);
800 }
801 _ => {
802 let body = controller.client.bytes(response).await.unwrap();
803 tracing::error!("Error response received from {}: {} {:?}", url, status, body);
804 }
805 }
806 },
807 Err(e) => {
808 tracing::error!("Error encountered {:?} from {}", e, url);
809 }
810 };
811 },
812 None => ()
813 }
814 }
815 else {
816 if identifier != self_identifier {
817 tracing::info!("Attempting to purge self duplicate node - identifier mismatch {}", identifier);
818 controller.registry.remove_node(identifier.as_str()).await;
819 }
820 }
821 }
822
823 tracing::info!("Finished announcing...");
824}