1#![no_std]
61#![warn(rust_2018_idioms)]
62#![warn(clippy::all)]
63#![allow(clippy::module_name_repetitions)]
64#![allow(clippy::must_use_candidate)]
65#![allow(clippy::too_many_arguments)]
66#![allow(clippy::missing_errors_doc)]
67#![allow(clippy::missing_panics_doc)]
68#![allow(missing_docs)]
69
70extern crate no_std_compat as std;
71
72use std::prelude::v1::*;
73
74pub mod authentication;
75mod registry;
76pub mod commands;
77
78use std::collections::BTreeMap;
79
80use std::sync::Arc;
81use parking_lot::Mutex;
82#[cfg(feature = "relational_store")]
83use product_os_configuration::{KeyValueKind, KeyValueStore, RelationalKind, RelationalStore};
84#[cfg(not(feature = "relational_store"))]
85use product_os_configuration::{KeyValueKind, KeyValueStore};
86
87use std::str::FromStr;
88use std::time::Duration;
89
90pub use product_os_request::Method;
91use product_os_request::{ProductOSClient, ProductOSResponse};
92pub use registry::Node;
93
94
95
96pub struct ProductOSController {
118 configuration: product_os_configuration::Configuration,
119
120 certificates: product_os_security::certificates::Certificates,
121
122 max_servers: u8,
123
124 registry: registry::Registry,
125
126 pulse_check: bool,
127 pulse_check_cron: String,
128
129 #[cfg(feature = "monitor")]
130 monitor: bool,
131 #[cfg(feature = "monitor")]
132 monitor_cron: String,
133
134 requester: product_os_request::ProductOSRequester,
135 client: product_os_request::ProductOSRequestClient,
136
137 #[cfg(feature = "relational_store")]
138 relational_store: Arc<product_os_store::ProductOSRelationalStore>,
139 key_value_store: Arc<product_os_store::ProductOSKeyValueStore>,
140}
141
142impl ProductOSController {
143 pub fn new(configuration: product_os_configuration::Configuration, certificates:
171 product_os_security::certificates::Certificates,
172 key_value_store: Option<Arc<product_os_store::ProductOSKeyValueStore>>,
173 #[cfg(feature = "relational_store")] relational_store: Option<Arc<product_os_store::ProductOSRelationalStore>>) -> Self {
174 let key_value_store: Arc<product_os_store::ProductOSKeyValueStore> = key_value_store.unwrap_or_else(|| Arc::new(product_os_store::ProductOSKeyValueStore::new(&KeyValueStore {
175 enabled: false,
176 kind: KeyValueKind::Sink,
177 host: "".to_string(),
178 port: 0,
179 secure: false,
180 db_number: 0,
181 db_name: None,
182 username: None,
183 password: None,
184 pool_size: 0,
185 default_limit: 0,
186 default_offset: 0,
187 prefix: None,
188 })));
189
190 #[cfg(feature = "relational_store")]
191 let relational_store: Arc<product_os_store::ProductOSRelationalStore> = relational_store.unwrap_or_else(|| Arc::new(product_os_store::ProductOSRelationalStore::new(&RelationalStore {
192 enabled: false,
193 kind: RelationalKind::Sink,
194 host: "".to_string(),
195 port: 0,
196 secure: false,
197 db_name: "".to_string(),
198 username: None,
199 password: None,
200 pool_size: 0,
201 default_limit: 0,
202 default_offset: 0,
203 prefix: None,
204 })));
205
206 let registry = registry::Registry::new(&configuration, key_value_store.clone(), certificates.to_owned());
207
208 let mut requester = product_os_request::ProductOSRequester::new();
209 requester.add_header("x-product-os-command", registry.get_me().get_identifier().as_str(), false);
210 let mut request_client = product_os_request::ProductOSRequestClient::new();
211 request_client.build(&requester);
212
213 Self {
214 certificates,
215
216 max_servers: configuration.get_cc_max_servers(),
217
218 pulse_check: configuration.is_pulse_check_enabled(),
219 pulse_check_cron: configuration.get_hearbeat_cron().to_string(),
220
221 #[cfg(feature = "monitor")]
222 monitor: configuration.is_monitor_enabled(),
223 #[cfg(feature = "monitor")]
224 monitor_cron: configuration.get_monitor_cron().to_string(),
225
226 configuration,
227
228 #[cfg(feature = "relational_store")]
229 relational_store,
230 key_value_store,
231
232 registry,
233
234 requester,
235 client: request_client
236 }
237 }
238
239 pub fn get_registry(&self) -> ®istry::Registry {
263 &self.registry
264 }
265
266 pub async fn discover_nodes(&mut self) {
267 self.registry.discover_nodes().await;
268
269 for cert in self.registry.get_nodes_raw_certificates(0, true) {
270 self.requester.add_trusted_certificate_pem(cert);
271 }
272
273 self.client.build(&self.requester);
274 }
275
276 pub fn get_max_servers(&self) -> u8 {
277 self.max_servers.to_owned()
278 }
279
280 pub fn upsert_node_local(&mut self, identifier: String, node: Node) {
281 self.registry.upsert_node_local(identifier, node);
282 }
283
284 pub fn get_certificates(&self) -> Vec<&[u8]> {
285 let mut certificates = vec!();
286 for cert in &self.certificates.certificates {
287 certificates.push(cert.as_slice())
288 }
289
290 certificates
291 }
292
293 pub fn get_private_key(&self) -> &[u8] {
294 self.certificates.private.as_slice()
295 }
296
297 pub fn get_certificates_and_private_key(&self) -> product_os_security::certificates::Certificates {
298 self.certificates.to_owned()
299 }
300
301 pub fn validate_certificate(&self, certificate: &[u8]) -> bool {
302 tracing::trace!("Checking certificate stored {:?} vs given {:?}", self.certificates.certificates, certificate);
303
304 for cert in self.get_certificates() {
305 if cert.eq(certificate) { return true; }
306 }
307
308 false
309 }
310
311 pub fn validate_verify_tag<T>(&self, query: Option<&BTreeMap<String, String>>, payload: Option<T>,
312 headers: Option<&BTreeMap<String, String>>, verify_identifier: &str, verify_tag: &str) -> bool
313 where T: product_os_security::AsByteVector
314 {
315 tracing::trace!("Checking verification provided {} from {}", verify_tag, verify_identifier);
316 let key = self.registry.get_key(verify_identifier);
317 tracing::trace!("Checking verification with key {:?}", key);
318
319 match key {
320 Some(verify_key) => {
321 product_os_security::verify_auth_request(query, false, payload, headers,
322 &["x-product-os-verify", "x-product-os-command", "x-product-os-control"],
323 verify_tag, Some(verify_key.as_slice()))
324 },
325 None => false
326 }
327 }
328
329 pub fn authenticate_command_control<T>(&self, query: Option<&BTreeMap<String, String>>, payload: Option<T>, headers: Option<&BTreeMap<String, String>>,
330 verify_identifier: Option<&str>, verify_tag: Option<&str>) -> Result<bool, authentication::CommandControlAuthenticateError>
331 where T: product_os_security::AsByteVector
332 {
333 tracing::trace!("Verify identifier {:?} and tag {:?} verify result", &verify_identifier, &verify_tag);
334 if verify_identifier.is_some() && verify_tag.is_some() && self.validate_verify_tag(query, payload, headers, verify_identifier.unwrap(), verify_tag.unwrap()) {
335 Ok(true)
336 }
337 else {
338 Err(authentication::CommandControlAuthenticateError {
339 error: authentication::CommandControlAuthenticateErrorState::KeyError(String::from("One of the keys provided was not valid"))
340 })
341 }
342 }
343
344 pub fn search_and_prepare_command(&self, query: BTreeMap<&str, &str>, module: String, instruction: String, data: Option<serde_json::Value>) -> Result<crate::commands::Command, product_os_request::ProductOSRequestError> {
362 let matching_node = self.registry.pick_node(query);
363 let client = &self.client;
364
365 match matching_node {
366 Some(node) => {
367 match self.registry.get_key(node.get_identifier().as_str()) {
368 None => Err(product_os_request::ProductOSRequestError::Error(format!("No key found for matching node"))),
369 Some(key) => Ok(commands::Command {
370 requester: client.clone(),
371 node_url: node.get_address().to_owned(),
372 verify_key: key,
373 module,
374 instruction,
375 data
376 })
377 }
378 },
379 None => Err(product_os_request::ProductOSRequestError::Error(format!("No matching node found")))
380 }
381 }
382
383 pub fn find_and_prepare_command(&self, key: &str, manager: &str, instruction: String, data: Option<serde_json::Value>) -> Result<commands::Command, product_os_request::ProductOSRequestError> {
396 let enabled = true.to_string();
397
398 let query = BTreeMap::from([
399 ("manager.key", key),
400 ("manager.kind", manager),
401 ("manager.enabled", enabled.as_str())
402 ]);
403
404 self.search_and_prepare_command(query, manager.to_string(), instruction, data)
405 }
406
407 pub fn find_any_and_prepare_command(&self, capability: &str, manager: &str, instruction: &str, data: Option<serde_json::Value>) -> Result<commands::Command, product_os_request::ProductOSRequestError> {
419 let enabled = true.to_string();
420
421 let query = BTreeMap::from([
422 ("capability", capability),
423 ("manager.enabled", enabled.as_str())
424 ]);
425
426 self.search_and_prepare_command(query, manager.to_string(), instruction.to_string(), data)
427 }
428
429 pub async fn find_kind_and_prepare_command(&self, kind: &str, manager: &str, instruction: &str, data: Option<serde_json::Value>) -> Result<commands::Command, product_os_request::ProductOSRequestError> {
441 let enabled = true.to_string();
442 let query = BTreeMap::from([
443 ("kind", kind),
444 ("manager.enabled", enabled.as_str())
445 ]);
446
447 self.search_and_prepare_command(query, manager.to_string(), instruction.to_string(), data)
448 }
449
450 pub fn search_and_prepare_ask(&self, query: BTreeMap<&str, &str>, path: &str, data: Option<serde_json::Value>, headers: BTreeMap<String, String>,
468 params: BTreeMap<String, String>, method: Method) -> Result<commands::Ask, product_os_request::ProductOSRequestError> {
469 let matching_node = self.registry.pick_node(query);
470 let client = &self.client;
471 match matching_node {
472 Some(node) => {
473 match self.registry.get_key(node.get_identifier().as_str()) {
474 None => Err(product_os_request::ProductOSRequestError::Error(format!("No key found for matching node"))),
475 Some(key) => Ok(commands::Ask {
476 requester: client.clone(),
477 node_url: node.get_address().to_owned(),
478 verify_key: key,
479 path: path.to_string(),
480 data,
481 headers,
482 params,
483 method
484 })
485 }
486
487 },
488 None => Err(product_os_request::ProductOSRequestError::Error(format!("No matching node found")))
489 }
490 }
491
492 pub async fn find_feature_and_ask(&self, feature: &str, path: &str, data: &Option<serde_json::Value>, headers: &BTreeMap<String, String>,
493 params: &BTreeMap<String, String>, method: &Method) -> Result<ProductOSResponse<product_os_request::BodyBytes>, product_os_request::ProductOSRequestError> {
494 let matching_node = self.registry.pick_node_for_feature(feature);
495 let client = &self.client;
496 match matching_node {
497 Some(node) => {
498 match self.registry.get_key(node.get_identifier().as_str()) {
499 None => Err(product_os_request::ProductOSRequestError::Error(format!("No key found for matching node"))),
500 Some(key) => commands::ask_node(client, node, key.as_slice(), path, data, headers, params, method).await
501 }
502
503 },
504 None => Err(product_os_request::ProductOSRequestError::Error(format!("No matching node found")))
505 }
506 }
507
508 pub fn find_feature_and_prepare_ask(&self, feature: &str, path: &str, data: Option<serde_json::Value>, headers: BTreeMap<String, String>,
509 params: BTreeMap<String, String>, method: Method) -> Result<commands::Ask, product_os_request::ProductOSRequestError> {
510 let matching_node = self.registry.pick_node_for_feature(feature);
511 let client = &self.client;
512 match matching_node {
513 Some(node) => {
514 match self.registry.get_key(node.get_identifier().as_str()) {
515 None => Err(product_os_request::ProductOSRequestError::Error(format!("No key found for matching node"))),
516 Some(key) => Ok(commands::Ask {
517 requester: client.clone(),
518 node_url: node.get_address().to_owned(),
519 verify_key: key,
520 path: path.to_string(),
521 data,
522 headers,
523 params,
524 method
525 })
526 }
527
528 },
529 None => Err(product_os_request::ProductOSRequestError::Error(format!("No matching node found")))
530 }
531 }
532
533 pub fn get_configuration(&self) -> product_os_configuration::Configuration {
558 self.configuration.clone()
559 }
560
561 pub fn get_key(&self, identifier: &str) -> Option<Vec<u8>> {
562 self.registry.get_key(identifier)
563 }
564
565 pub fn create_key_session(&mut self) -> (String, [u8; 32]) {
566 self.registry.create_key_session()
567 }
568
569 pub fn generate_key(&mut self, session_identifier: &str, remote_public_key: &[u8], association: String, remote_session_identifier: Option<String>) {
570 self.registry.generate_key(session_identifier, remote_public_key, association, remote_session_identifier);
571 }
572
573 #[cfg(feature = "relational_store")]
574 pub fn get_relational_store(&mut self) -> Arc<product_os_store::ProductOSRelationalStore> {
575 self.relational_store.clone()
576 }
577
578 pub fn get_key_value_store(&mut self) -> Arc<product_os_store::ProductOSKeyValueStore> {
579 self.key_value_store.clone()
580 }
581
582 pub async fn add_feature(&mut self, feature: Arc<dyn product_os_capabilities::Feature>, base_path: String, router: &mut product_os_router::ProductOSRouter) {
583 self.registry.add_feature(feature, base_path, router).await;
584 }
585
586 pub async fn add_feature_mut(&mut self, feature: Arc<Mutex<dyn product_os_capabilities::Feature>>, base_path: String, router: &mut product_os_router::ProductOSRouter) {
587 self.registry.add_feature_mut(feature, base_path, router).await;
588 }
589
590 pub async fn remove_feature(&mut self, identifier: &str) {
591 self.registry.remove_feature(identifier).await;
592 }
593
594 pub async fn add_service(&mut self, service: Arc<dyn product_os_capabilities::Service>) {
595 self.registry.add_service(service).await;
596 }
597
598 pub async fn add_service_mut(&mut self, service: Arc<Mutex<dyn product_os_capabilities::Service>>) {
599 self.registry.add_service_mut(service).await;
600 }
601
602 pub async fn set_service_active(&mut self, identifier: String, status: bool) {
603 self.registry.set_service_active(identifier, status).await;
604 }
605
606 pub async fn remove_service(&mut self, identifier: &str) {
607 self.registry.remove_service(identifier).await;
608 }
609
610 pub async fn remove_inactive_services(&mut self, query: BTreeMap<&str, &str>) {
611 self.registry.remove_inactive_services(query).await;
612 }
613
614 pub async fn start_services(&mut self) -> Result<(), ()> {
615 self.registry.start_services().await
616 }
617
618 pub async fn start_service(&mut self, identifier: &str) -> Result<(), ()> {
619 self.registry.start_service(identifier).await
620 }
621
622 pub async fn stop_service(&mut self, identifier: &str) -> Result<(), ()> {
623 self.registry.stop_service(identifier).await
624 }
625
626
627}
628
629
630
631
632
633pub async fn pulse_run(controller_unlocked: Arc<Mutex<ProductOSController>>) {
634 tracing::info!("Starting pulse run...");
635 let controller_locked = controller_unlocked.try_lock_for(core::time::Duration::new(10, 0));
636 match controller_locked {
637 Some(controller) => {
638 #[cfg(feature = "distributed")]
639 controller.discover_nodes().await;
640
641 #[cfg(feature = "distributed")]
642 let self_identifier = controller.registry.get_me().get_identifier();
643 #[cfg(feature = "distributed")]
644 let control_url = controller.registry.get_me().get_address();
645
646 #[cfg(feature = "distributed")]
647 let nodes = controller.registry.get_nodes_endpoints(0, true);
648
649 #[cfg(feature = "distributed")] {
650 match controller.registry.check_me_remote().await {
651 Some(_) => { controller.registry.update_me_status(true); },
652 None => {
653 let alive = controller.registry.update_me_status(false);
654
655 if !alive {
656 tracing::error!("Terminating server due to lost presence on remote registry");
657 std::process::exit(8);
658 };
659 }
660 }
661 }
662
663 let services = controller.get_registry().get_me().get_services().list();
665 for (_, service) in services {
666 match service.status().await {
667 Ok(_) => {}
668 Err(_) => {}
669 }
670 }
671
672 #[cfg(feature = "distributed")]
673 let client = controller.client.clone();
674
675 std::mem::drop(controller);
677
678 #[cfg(feature = "distributed")]
679 for (identifier, (url, key)) in nodes {
680 if url != control_url {
681 match key {
682 Some(verify_key) => {
683 match commands::command(&client, url.clone(), verify_key, "status", "ping", None).await {
684 Ok(response) => {
685 let status = response.status();
686
687 match status {
688 product_os_request::StatusCode::UNAUTHORIZED => {
689 let text = {
690 let controller_locked = controller_unlocked.try_lock_for(core::time::Duration::new(10, 0));
691 match controller_locked {
692 Some(controller) => {
693 controller.client.text(response).await.unwrap_or_else(|e| {
694 tracing::error!("Failed to parse response text: {:?}", e);
695 String::new()
696 })
697 }
698 None => String::new()
699 }
700 };
701
702 let auth: authentication::CommandControlAuthenticateError = match serde_json::from_str(text.as_str()) {
703 Ok(auth_error) => auth_error,
704 Err(_) => authentication::CommandControlAuthenticateError { error: authentication::CommandControlAuthenticateErrorState::None }
705 };
706
707 tracing::error!("Error object auth {:?}", auth);
708
709 match auth.error {
710 authentication::CommandControlAuthenticateErrorState::KeyError(_) => {
711 tracing::info!("Attempting to purge remote node - keys don't match {}", identifier);
712
713 let controller_locked = controller_unlocked.try_lock_for(core::time::Duration::new(10, 0));
714 match controller_locked {
715 Some(mut controller) => {
716 controller.registry.remove_node(identifier.as_str()).await;
717 }
718 None => tracing::error!("Failed to lock controller")
719 }
720 },
721 authentication::CommandControlAuthenticateErrorState::None => ()
722 };
723 },
724 product_os_request::StatusCode::OK => {
725 let body = {
726 let controller_locked = controller_unlocked.try_lock_for(core::time::Duration::new(10, 0));
727 match controller_locked {
728 Some(controller) => {
729 controller.client.bytes(response).await.unwrap_or_else(|e| {
730 tracing::error!("Failed to parse response bytes: {:?}", e);
731 Bytes::new()
732 })
733 }
734 None => Bytes::new()
735 }
736 };
737
738 tracing::info!("Response received from {}: {} {:?}", url, status, body);
739 }
740 _ => {
741 let body = {
742 let controller_locked = controller_unlocked.try_lock_for(core::time::Duration::new(10, 0));
743 match controller_locked {
744 Some(controller) => {
745 controller.client.bytes(response).await.unwrap_or_else(|e| {
746 tracing::error!("Failed to parse error response bytes: {:?}", e);
747 Bytes::new()
748 })
749 }
750 None => Bytes::new()
751 }
752 };
753 tracing::error!("Error response received from {}: {} {:?}", url, status, body);
754 tracing::info!("Attempting to purge remote node - keys don't match {}", identifier);
755
756 let controller_locked = controller_unlocked.try_lock_for(core::time::Duration::new(10, 0));
757 match controller_locked {
758 Some(mut controller) => {
759 controller.registry.update_pulse_status(identifier.as_str(), false).await;
760 }
761 None => tracing::error!("Failed to lock controller")
762 }
763 }
764 }
765 },
766 Err(e) => {
767 tracing::error!("Error encountered {:?} from {}", e, url);
768
769 let controller_locked = controller_unlocked.try_lock_for(core::time::Duration::new(10, 0));
770 match controller_locked {
771 Some(mut controller) => {
772 controller.registry.update_pulse_status(identifier.as_str(), false).await;
773 }
774 None => tracing::error!("Failed to lock controller")
775 }
776 }
777 }
778 },
779 None => ()
780 }
781 } else {
782 if self_identifier != identifier {
783 tracing::info!("Attempting to purge self duplicate node - identifier mismatch {}", identifier);
784 let controller_locked = controller_unlocked.try_lock_for(core::time::Duration::new(10, 0));
785 match controller_locked {
786 Some(mut controller) => {
787 controller.registry.remove_node(identifier.as_str()).await;
788 }
789 None => tracing::error!("Failed to lock controller")
790 }
791 }
792 }
793 }
794
795
796 tracing::info!("Finished pulse run...");
797 }
798 None => tracing::error!("Failed to lock controller")
799 }
800}
801
802
803pub async fn run_controller<X, E>(controller_mutex: Arc<Mutex<ProductOSController>>, executor: Arc<E>)
804where
805 E: product_os_async_executor::Executor<X> + product_os_async_executor::ExecutorPerform<X> + product_os_async_executor::Timer
806{
807 let controller_unlocked = controller_mutex.clone();
808 let controller_locked = controller_unlocked.try_lock_for(core::time::Duration::new(10, 0));
809 match controller_locked {
810 Some(mut controller) => {
811
812 #[cfg(feature = "distributed")] {
813 authentication::perform_self_trust(&mut controller);
814
815 controller.registry.update_me().await;
816 tracing::info!("Added self...");
817
818 tracing::info!("Command and Control registered for {}", controller.registry.get_me().get_identifier());
819
820 controller.discover_nodes().await;
821 perform_announce(&mut controller).await;
822 }
823
824 match &controller.configuration.command_control {
826 Some(command_control_config) => {
827 if command_control_config.auto_start_services {
828 match controller.start_services().await {
829 Ok(_) => {
830 tracing::info!("Command Control services started");
831 },
832 Err(_) => {
833 tracing::error!("Command Control services failed to start");
834 }
835 }
836 }
837 }
838 None => {
839 tracing::error!("Command Control configuration not enabled to start services");
840 }
841 }
842
843 let pulse_check = controller.pulse_check.clone();
844 let pulse_check_cron = cron::Schedule::from_str(controller.pulse_check_cron.as_str()).unwrap();
845 let pulse_check_next = pulse_check_cron.upcoming(chrono::Utc).nth(0).unwrap();
846 let pulse_check_following = pulse_check_cron.upcoming(chrono::Utc).nth(1).unwrap();
847
848 #[cfg(feature = "monitor")]
849 let monitor = controller.monitor.clone();
850 #[cfg(feature = "monitor")]
851 let monitor_cron = cron::Schedule::from_str(controller.monitor_cron.as_str()).unwrap();
852 #[cfg(feature = "monitor")]
853 let _monitor_next = monitor_cron.upcoming(chrono::Utc).nth(0).unwrap();
854 #[cfg(feature = "monitor")]
855 let _monitor_following = monitor_cron.upcoming(chrono::Utc).nth(1).unwrap();
856
857 std::mem::drop(controller);
859
860 if pulse_check {
861 let _ = E::spawn_from_executor(executor.as_ref(), async move {
862 tracing::info!("Pulse check service starting...");
863
864 let start_duration = match u32::try_from(match pulse_check_next.signed_duration_since(chrono::Utc::now()).to_std() {
865 Ok(d) => d,
866 Err(_) => Duration::new(1, 0)
867 }.as_millis()) {
868 Ok(u) => u,
869 Err(_) => panic!("Period defined is too large for timer")
870 };
871 let interval_duration = match u32::try_from(match pulse_check_following.signed_duration_since(pulse_check_next).to_std() {
872 Ok(d) => d,
873 Err(_) => Duration::new(1, 0)
874 }.as_millis()) {
875 Ok(u) => u,
876 Err(_) => panic!("Period defined is too large for timer")
877 };
878
879 let mut delay = E::once(start_duration).await;
880 let mut interval = E::interval(interval_duration).await;
881
882 delay.tick().await;
883 loop {
884 interval.tick().await;
885 tracing::debug!("Pulse check service running");
886
887 pulse_run(controller_mutex.clone()).await;
888 }
889 });
890 }
891
892 #[cfg(feature = "monitor")]
893 if monitor {
894 let _ = E::spawn_from_executor(executor.as_ref(), async move {
895 tracing::info!("Monitor service starting...");
896
897 let start_duration = match u32::try_from(match pulse_check_next.signed_duration_since(chrono::Utc::now()).to_std() {
898 Ok(d) => d,
899 Err(_) => Duration::new(1, 0)
900 }.as_millis()) {
901 Ok(u) => u,
902 Err(_) => panic!("Period defined is too large for timer")
903 };
904 let interval_duration = match u32::try_from(match pulse_check_following.signed_duration_since(pulse_check_next).to_std() {
905 Ok(d) => d,
906 Err(_) => Duration::new(1, 0)
907 }.as_millis()) {
908 Ok(u) => u,
909 Err(_) => panic!("Period defined is too large for timer")
910 };
911
912 let mut delay = E::once(start_duration).await;
913 let mut interval = E::interval(interval_duration).await;
914
915 delay.tick().await;
916 loop {
917 interval.tick().await;
918 tracing::debug!("Monitor service running");
919
920 product_os_monitoring::process_statistics(None);
921 }
922 }); }
924 }
925 None => tracing::error!("Failed to lock controller")
926 }
927}
928
929
930#[cfg(feature = "distributed")]
931pub async fn perform_announce(controller: &mut MutexGuard<'_, ProductOSController>) {
932 tracing::info!("Announce searching for nodes...");
933
934 authentication::perform_key_exchange(controller).await;
935
936 let self_identifier = controller.registry.get_me().get_identifier();
937 let control_url = controller.registry.get_me().get_address();
938
939 let nodes = controller.registry.get_nodes_endpoints(0, true);
940
941 tracing::info!("Starting announce...");
942 tracing::info!("Nodes data {:?}", nodes);
943
944 for (identifier, (url, key)) in nodes {
945 if url != control_url {
946 match key {
947 Some(verify_key) => {
948 tracing::info!("Announcing {}: {}", identifier, url);
949
950 match commands::command(&controller.client, url.clone(), verify_key, "status", "announce", Some(serde_json::value::to_value(controller.get_registry().get_me()).unwrap())).await {
951 Ok(response) => {
952 let status = response.status();
953
954 match status {
955 product_os_request::StatusCode::UNAUTHORIZED => {
956 let text = controller.client.text(response).await.unwrap();
957 let auth: authentication::CommandControlAuthenticateError = match serde_json::from_str(text.as_str()) {
958 Ok(auth_error) => auth_error,
959 Err(_) => authentication::CommandControlAuthenticateError { error: authentication::CommandControlAuthenticateErrorState::None }
960 };
961
962 tracing::error!("Error object auth {:?}", auth);
963
964 match auth.error {
965 authentication::CommandControlAuthenticateErrorState::KeyError(_) => {
966 tracing::info!("Error from remote node - keys don't match {}", identifier);
967 },
968 authentication::CommandControlAuthenticateErrorState::None => ()
969 };
970 },
971 product_os_request::StatusCode::OK => {
972 let body = controller.client.bytes(response).await.unwrap();
973 tracing::info!("Response received from {}: {} {:?}", url, status, body);
974 }
975 _ => {
976 let body = controller.client.bytes(response).await.unwrap();
977 tracing::error!("Error response received from {}: {} {:?}", url, status, body);
978 }
979 }
980 },
981 Err(e) => {
982 tracing::error!("Error encountered {:?} from {}", e, url);
983 }
984 };
985 },
986 None => ()
987 }
988 }
989 else {
990 if identifier != self_identifier {
991 tracing::info!("Attempting to purge self duplicate node - identifier mismatch {}", identifier);
992 controller.registry.remove_node(identifier.as_str()).await;
993 }
994 }
995 }
996
997 tracing::info!("Finished announcing...");
998}