Skip to main content

product_os_command_control/
lib.rs

1//! # Product OS Command and Control
2//!
3//! This crate provides a distributed command and control system for coordinating
4//! multiple Product OS server instances. It enables secure communication, service
5//! discovery, and workload distribution across a cluster of nodes.
6//!
7//! ## Features
8//!
9//! - **Secure Communication**: Authentication framework using Diffie-Hellman key exchange
10//! - **Service Discovery**: Automatic node registration and discovery
11//! - **Load Balancing**: Smart routing to available nodes based on capabilities
12//! - **Health Monitoring**: Pulse checks and automatic failure detection
13//! - **Feature Management**: Dynamic feature and service registration
14//!
15//! ## Basic Usage
16//!
17//! ```no_run
18//! use product_os_command_control::ProductOSController;
19//! use product_os_configuration::Configuration;
20//! use product_os_security::certificates::Certificates;
21//!
22//! # async fn example() {
23//! // Create configuration and certificates
24//! let config = Configuration::new();
25//! let certs = Certificates::new_self_signed(
26//!     vec![("CN".to_string(), "localhost".to_string())],
27//!     None, None, None, None, None,
28//! );
29//!
30//! // Initialize the controller
31//! let controller = ProductOSController::new(
32//!     config,
33//!     certs,
34//!     None, // Optional key-value store
35//! );
36//!
37//! // Access the registry
38//! let registry = controller.get_registry();
39//! let my_node = registry.get_me();
40//! println!("Node ID: {}", my_node.get_identifier());
41//! # }
42//! ```
43//!
44//! ## Architecture
45//!
46//! The command and control system consists of several key components:
47//!
48//! - **ProductOSController**: Main coordinator that manages nodes and services
49//! - **Registry**: Tracks available nodes and their capabilities
50//! - **Node**: Represents a single server instance in the cluster
51//! - **Commands**: Structured way to send instructions to remote nodes
52//!
53//! ## Security
54//!
55//! All node-to-node communication is secured using:
56//! - TLS/SSL certificates for transport security
57//! - Diffie-Hellman key exchange for establishing shared secrets
58//! - Message authentication using HMAC
59
60#![no_std]
61#![warn(rust_2018_idioms)]
62#![warn(clippy::all)]
63#![allow(clippy::module_name_repetitions)]
64#![allow(clippy::must_use_candidate)]
65#![allow(clippy::too_many_arguments)]
66#![allow(clippy::missing_errors_doc)]
67#![allow(clippy::missing_panics_doc)]
68#![allow(missing_docs)]
69
70extern crate no_std_compat as std;
71
72use std::prelude::v1::*;
73
74pub mod authentication;
75mod registry;
76pub mod commands;
77
78use std::collections::BTreeMap;
79
80use std::sync::Arc;
81use parking_lot::Mutex;
82#[cfg(feature = "relational_store")]
83use product_os_configuration::{KeyValueKind, KeyValueStore, RelationalKind, RelationalStore};
84#[cfg(not(feature = "relational_store"))]
85use product_os_configuration::{KeyValueKind, KeyValueStore};
86
87use std::str::FromStr;
88use std::time::Duration;
89
90pub use product_os_request::Method;
91use product_os_request::{ProductOSClient, ProductOSResponse};
92pub use registry::Node;
93
94
95
96/// Main controller for managing a distributed command and control system.
97///
98/// `ProductOSController` is the primary interface for coordinating multiple
99/// Product OS server instances. It handles node discovery, service management,
100/// secure communication, and workload distribution.
101///
102/// # Examples
103///
104/// ```no_run
105/// use product_os_command_control::ProductOSController;
106/// use product_os_configuration::Configuration;
107/// use product_os_security::certificates::Certificates;
108///
109/// let config = Configuration::new();
110/// let certs = Certificates::new_self_signed(
111///     vec![("CN".to_string(), "localhost".to_string())],
112///     None, None, None, None, None,
113/// );
114///
115/// let controller = ProductOSController::new(config, certs, None);
116/// ```
117pub struct ProductOSController {
118    configuration: product_os_configuration::Configuration,
119
120    certificates: product_os_security::certificates::Certificates,
121
122    max_servers: u8,
123
124    registry: registry::Registry,
125
126    pulse_check: bool,
127    pulse_check_cron: String,
128
129    #[cfg(feature = "monitor")]
130    monitor: bool,
131    #[cfg(feature = "monitor")]
132    monitor_cron: String,
133
134    requester: product_os_request::ProductOSRequester,
135    client: product_os_request::ProductOSRequestClient,
136
137    #[cfg(feature = "relational_store")]
138    relational_store: Arc<product_os_store::ProductOSRelationalStore>,
139    key_value_store: Arc<product_os_store::ProductOSKeyValueStore>,
140}
141
142impl ProductOSController {
143    /// Creates a new `ProductOSController` instance.
144    ///
145    /// Initializes the command and control system with the provided configuration,
146    /// certificates, and optional data stores.
147    ///
148    /// # Arguments
149    ///
150    /// * `configuration` - System configuration including network and service settings
151    /// * `certificates` - TLS/SSL certificates for secure communication
152    /// * `key_value_store` - Optional key-value store for node registry persistence
153    /// * `relational_store` - Optional relational database store (requires `relational_store` feature)
154    ///
155    /// # Examples
156    ///
157    /// ```no_run
158    /// use product_os_command_control::ProductOSController;
159    /// use product_os_configuration::Configuration;
160    /// use product_os_security::certificates::Certificates;
161    ///
162    /// let config = Configuration::new();
163    /// let certs = Certificates::new_self_signed(
164    ///     vec![("CN".to_string(), "localhost".to_string())],
165    ///     None, None, None, None, None,
166    /// );
167    ///
168    /// let controller = ProductOSController::new(config, certs, None);
169    /// ```
170    pub fn new(configuration: product_os_configuration::Configuration, certificates:
171               product_os_security::certificates::Certificates,
172               key_value_store: Option<Arc<product_os_store::ProductOSKeyValueStore>>,
173               #[cfg(feature = "relational_store")] relational_store: Option<Arc<product_os_store::ProductOSRelationalStore>>) -> Self {
174        let key_value_store: Arc<product_os_store::ProductOSKeyValueStore> = key_value_store.unwrap_or_else(|| Arc::new(product_os_store::ProductOSKeyValueStore::new(&KeyValueStore {
175            enabled: false,
176            kind: KeyValueKind::Sink,
177            host: "".to_string(),
178            port: 0,
179            secure: false,
180            db_number: 0,
181            db_name: None,
182            username: None,
183            password: None,
184            pool_size: 0,
185            default_limit: 0,
186            default_offset: 0,
187            prefix: None,
188        })));
189
190        #[cfg(feature = "relational_store")]
191        let relational_store: Arc<product_os_store::ProductOSRelationalStore> = relational_store.unwrap_or_else(|| Arc::new(product_os_store::ProductOSRelationalStore::new(&RelationalStore {
192            enabled: false,
193            kind: RelationalKind::Sink,
194            host: "".to_string(),
195            port: 0,
196            secure: false,
197            db_name: "".to_string(),
198            username: None,
199            password: None,
200            pool_size: 0,
201            default_limit: 0,
202            default_offset: 0,
203            prefix: None,
204        })));
205
206        let registry = registry::Registry::new(&configuration, key_value_store.clone(), certificates.to_owned());
207
208        let mut requester = product_os_request::ProductOSRequester::new();
209        requester.add_header("x-product-os-command", registry.get_me().get_identifier().as_str(), false);
210        let mut request_client = product_os_request::ProductOSRequestClient::new();
211        request_client.build(&requester);
212
213        Self {
214            certificates,
215
216            max_servers: configuration.get_cc_max_servers(),
217
218            pulse_check: configuration.is_pulse_check_enabled(),
219            pulse_check_cron: configuration.get_hearbeat_cron().to_string(),
220
221            #[cfg(feature = "monitor")]
222            monitor: configuration.is_monitor_enabled(),
223            #[cfg(feature = "monitor")]
224            monitor_cron: configuration.get_monitor_cron().to_string(),
225
226            configuration,
227
228            #[cfg(feature = "relational_store")]
229            relational_store,
230            key_value_store,
231
232            registry,
233
234            requester,
235            client: request_client
236        }
237    }
238
239    /// Returns a reference to the internal node registry.
240    ///
241    /// The registry contains information about all known nodes in the cluster,
242    /// including their capabilities, services, and features.
243    ///
244    /// # Examples
245    ///
246    /// ```no_run
247    /// # use product_os_command_control::ProductOSController;
248    /// # use product_os_configuration::Configuration;
249    /// # use product_os_security::certificates::Certificates;
250    /// # let controller = ProductOSController::new(
251    /// #     Configuration::new(),
252    /// #     Certificates::new_self_signed(
253    /// #         vec![("CN".to_string(), "localhost".to_string())],
254    /// #         None, None, None, None, None,
255    /// #     ),
256    /// #     None,
257    /// # );
258    /// let registry = controller.get_registry();
259    /// let my_node = registry.get_me();
260    /// println!("My node ID: {}", my_node.get_identifier());
261    /// ```
262    pub fn get_registry(&self) -> &registry::Registry {
263        &self.registry
264    }
265
266    pub async fn discover_nodes(&mut self) {
267        self.registry.discover_nodes().await;
268
269        for cert in self.registry.get_nodes_raw_certificates(0, true) {
270            self.requester.add_trusted_certificate_pem(cert);
271        }
272
273        self.client.build(&self.requester);
274    }
275
276    pub fn get_max_servers(&self) -> u8 {
277        self.max_servers.to_owned()
278    }
279
280    pub fn upsert_node_local(&mut self, identifier: String, node: Node) {
281        self.registry.upsert_node_local(identifier, node);
282    }
283
284    pub fn get_certificates(&self) -> Vec<&[u8]> {
285        let mut certificates = vec!();
286        for cert in &self.certificates.certificates {
287            certificates.push(cert.as_slice())
288        }
289
290        certificates
291    }
292
293    pub fn get_private_key(&self) -> &[u8] {
294        self.certificates.private.as_slice()
295    }
296
297    pub fn get_certificates_and_private_key(&self) -> product_os_security::certificates::Certificates {
298        self.certificates.to_owned()
299    }
300
301    pub fn validate_certificate(&self, certificate: &[u8]) -> bool {
302        tracing::trace!("Checking certificate stored {:?} vs given {:?}", self.certificates.certificates, certificate);
303
304        for cert in self.get_certificates() {
305            if cert.eq(certificate) { return true; }
306        }
307
308        false
309    }
310
311    pub fn validate_verify_tag<T>(&self, query: Option<&BTreeMap<String, String>>, payload: Option<T>,
312                                  headers: Option<&BTreeMap<String, String>>, verify_identifier: &str, verify_tag: &str) -> bool
313    where T: product_os_security::AsByteVector
314    {
315        tracing::trace!("Checking verification provided {} from {}", verify_tag, verify_identifier);
316        let key = self.registry.get_key(verify_identifier);
317        tracing::trace!("Checking verification with key {:?}", key);
318
319        match key {
320            Some(verify_key) => {
321                product_os_security::verify_auth_request(query, false, payload, headers,
322                                                       &["x-product-os-verify", "x-product-os-command", "x-product-os-control"],
323                                                       verify_tag, Some(verify_key.as_slice()))
324            },
325            None => false
326        }
327    }
328
329    pub fn authenticate_command_control<T>(&self, query: Option<&BTreeMap<String, String>>, payload: Option<T>, headers: Option<&BTreeMap<String, String>>,
330                                        verify_identifier: Option<&str>, verify_tag: Option<&str>) -> Result<bool, authentication::CommandControlAuthenticateError>
331    where T: product_os_security::AsByteVector
332    {
333        tracing::trace!("Verify identifier {:?} and tag {:?} verify result", &verify_identifier, &verify_tag);
334        if verify_identifier.is_some() && verify_tag.is_some() && self.validate_verify_tag(query, payload, headers, verify_identifier.unwrap(), verify_tag.unwrap()) {
335            Ok(true)
336        }
337        else {
338            Err(authentication::CommandControlAuthenticateError {
339                error: authentication::CommandControlAuthenticateErrorState::KeyError(String::from("One of the keys provided was not valid"))
340            })
341        }
342    }
343
344    /*
345    async fn search_and_command(&self, query: BTreeMap<String, String>, module: String, instruction: String, data: Option<serde_json::Value>)  -> Result<ProductOSResponse, product_os_request::ProductOSRequestError> {
346        let matching_node = self.registry.pick_node(query);
347        let client = &self.client;
348
349        match matching_node {
350            Some(node) => {
351                match self.registry.get_key(node.get_identifier()) {
352                    None => Err(product_os_request::ProductOSRequestError::Error(format!("No key found for matching node"))),
353                    Some(key) => commands::command_node(client, node, key, module, instruction, data).await
354                }
355            },
356            None => Err(product_os_request::ProductOSRequestError::Error(format!("No matching node found")))
357        }
358    }
359    */
360
361    pub fn search_and_prepare_command(&self, query: BTreeMap<&str, &str>, module: String, instruction: String, data: Option<serde_json::Value>)  -> Result<crate::commands::Command, product_os_request::ProductOSRequestError> {
362        let matching_node = self.registry.pick_node(query);
363        let client = &self.client;
364
365        match matching_node {
366            Some(node) => {
367                match self.registry.get_key(node.get_identifier().as_str()) {
368                    None => Err(product_os_request::ProductOSRequestError::Error(format!("No key found for matching node"))),
369                    Some(key) => Ok(commands::Command {
370                        requester: client.clone(),
371                        node_url: node.get_address().to_owned(),
372                        verify_key: key,
373                        module,
374                        instruction,
375                        data
376                    })
377                }
378            },
379            None => Err(product_os_request::ProductOSRequestError::Error(format!("No matching node found")))
380        }
381    }
382
383    /*
384    async fn find_and_command(&self, key: String, manager: String, instruction: String, data: Option<serde_json::Value>) -> Result<ProductOSResponse, product_os_request::ProductOSRequestError> {
385        let query = BTreeMap::from([
386            ("manager.key".to_string(), key),
387            ("manager.kind".to_string(), manager.to_string()),
388            ("manager.enabled".to_string(), true.to_string())
389        ]);
390
391        self.search_and_command(query, manager.to_string(), instruction, data).await
392    }
393    */
394
395    pub fn find_and_prepare_command(&self, key: &str, manager: &str, instruction: String, data: Option<serde_json::Value>) -> Result<commands::Command, product_os_request::ProductOSRequestError> {
396        let enabled = true.to_string();
397
398        let query = BTreeMap::from([
399            ("manager.key", key),
400            ("manager.kind", manager),
401            ("manager.enabled", enabled.as_str())
402        ]);
403
404        self.search_and_prepare_command(query, manager.to_string(), instruction, data)
405    }
406
407    /*
408    async fn find_any_and_command(&self, capability: String, manager: String, instruction: String, data: Option<serde_json::Value>) -> Result<ProductOSResponse, product_os_request::ProductOSRequestError> {
409        let query = BTreeMap::from([
410            ("capability".to_string(), capability),
411            ("manager.enabled".to_string(), true.to_string())
412        ]);
413
414        self.search_and_command(query, manager.to_string(), instruction, data).await
415    }
416    */
417
418    pub fn find_any_and_prepare_command(&self, capability: &str, manager: &str, instruction: &str, data: Option<serde_json::Value>) -> Result<commands::Command, product_os_request::ProductOSRequestError> {
419        let enabled = true.to_string();
420
421        let query = BTreeMap::from([
422            ("capability", capability),
423            ("manager.enabled", enabled.as_str())
424        ]);
425
426        self.search_and_prepare_command(query, manager.to_string(), instruction.to_string(), data)
427    }
428
429    /*
430    async fn find_kind_and_command(&self, kind: String, manager: String, instruction: String, data: Option<serde_json::Value>) -> Result<ProductOSResponse, product_os_request::ProductOSRequestError> {
431        let query = BTreeMap::from([
432            ("kind".to_string(), kind),
433            ("manager.enabled".to_string(), true.to_string())
434        ]);
435
436        self.search_and_command(query, manager.to_string(), instruction, data).await
437    }
438    */
439
440    pub async fn find_kind_and_prepare_command(&self, kind: &str, manager: &str, instruction: &str, data: Option<serde_json::Value>) -> Result<commands::Command, product_os_request::ProductOSRequestError> {
441        let enabled = true.to_string();
442        let query = BTreeMap::from([
443            ("kind", kind),
444            ("manager.enabled", enabled.as_str())
445        ]);
446
447        self.search_and_prepare_command(query, manager.to_string(), instruction.to_string(), data)
448    }
449
450    /*
451    async fn search_and_ask(&self, query: BTreeMap<String, String>, path: String, data: Option<serde_json::Value>, headers: BTreeMap<String, String>,
452                          params: BTreeMap<String, String>, method: product_os_request::Method) -> Result<ProductOSResponse, product_os_request::ProductOSRequestError> {
453        let matching_node = self.registry.pick_node(query);
454        let client = &self.client;
455        match matching_node {
456            Some(node) => {
457                match self.registry.get_key(node.get_identifier()) {
458                    None => Err(product_os_request::ProductOSRequestError::Error(format!("No key found for matching node"))),
459                    Some(key) => commands::ask_node(client, node, key, path, data, headers, params, method).await
460                }
461            },
462            None => Err(product_os_request::ProductOSRequestError::Error(format!("No matching node found")))
463        }
464    }
465    */
466
467    pub fn search_and_prepare_ask(&self, query: BTreeMap<&str, &str>, path: &str, data: Option<serde_json::Value>, headers: BTreeMap<String, String>,
468                                params: BTreeMap<String, String>, method: Method) -> Result<commands::Ask, product_os_request::ProductOSRequestError> {
469        let matching_node = self.registry.pick_node(query);
470        let client = &self.client;
471        match matching_node {
472            Some(node) => {
473                match self.registry.get_key(node.get_identifier().as_str()) {
474                    None => Err(product_os_request::ProductOSRequestError::Error(format!("No key found for matching node"))),
475                    Some(key) => Ok(commands::Ask {
476                        requester: client.clone(),
477                        node_url: node.get_address().to_owned(),
478                        verify_key: key,
479                        path: path.to_string(),
480                        data,
481                        headers,
482                        params,
483                        method
484                    })
485                }
486
487            },
488            None => Err(product_os_request::ProductOSRequestError::Error(format!("No matching node found")))
489        }
490    }
491
492    pub async fn find_feature_and_ask(&self, feature: &str, path: &str, data: &Option<serde_json::Value>, headers: &BTreeMap<String, String>,
493                                params: &BTreeMap<String, String>, method: &Method) -> Result<ProductOSResponse<product_os_request::BodyBytes>, product_os_request::ProductOSRequestError> {
494        let matching_node = self.registry.pick_node_for_feature(feature);
495        let client = &self.client;
496        match matching_node {
497            Some(node) => {
498                match self.registry.get_key(node.get_identifier().as_str()) {
499                    None => Err(product_os_request::ProductOSRequestError::Error(format!("No key found for matching node"))),
500                    Some(key) => commands::ask_node(client, node, key.as_slice(), path, data, headers, params, method).await
501                }
502
503            },
504            None => Err(product_os_request::ProductOSRequestError::Error(format!("No matching node found")))
505        }
506    }
507
508    pub fn find_feature_and_prepare_ask(&self, feature: &str, path: &str, data: Option<serde_json::Value>, headers: BTreeMap<String, String>,
509                                      params: BTreeMap<String, String>, method: Method) -> Result<commands::Ask, product_os_request::ProductOSRequestError> {
510        let matching_node = self.registry.pick_node_for_feature(feature);
511        let client = &self.client;
512        match matching_node {
513            Some(node) => {
514                match self.registry.get_key(node.get_identifier().as_str()) {
515                    None => Err(product_os_request::ProductOSRequestError::Error(format!("No key found for matching node"))),
516                    Some(key) => Ok(commands::Ask {
517                        requester: client.clone(),
518                        node_url: node.get_address().to_owned(),
519                        verify_key: key,
520                        path: path.to_string(),
521                        data,
522                        headers,
523                        params,
524                        method
525                    })
526                }
527
528            },
529            None => Err(product_os_request::ProductOSRequestError::Error(format!("No matching node found")))
530        }
531    }
532
533    /*
534    pub fn find_feature_and_ask_sync(&mut self, feature: String, path: String, data: Option<serde_json::Value>, headers: BTreeMap<String, String>,
535                                      params: BTreeMap<String, String>, method: product_os_request::Method) -> Result<product_os_request::ProductOSResponse, product_os_request::ProductOSRequestSyncError> {
536        let matching_node = self.registry.pick_node_for_feature(feature);
537        let client = &mut self.client;
538        match matching_node {
539            Some(node) => {
540                match self.registry.get_key(node.get_identifier()) {
541                    None => Err(product_os_request::ProductOSRequestSyncError {
542                        error: product_os_request::ProductOSRequestError::Error("No key found for matching node".to_string()),
543                        generated_error: None
544                    }),
545                    Some(key) => commands::ask_node_sync(client, node, key, path, data, headers, params, method)
546                }
547
548            },
549            None => Err(product_os_request::ProductOSRequestSyncError {
550                error: product_os_request::ProductOSRequestError::Error("No matching node found".to_string()),
551                generated_error: None
552            })
553        }
554    }
555    */
556
557    pub fn get_configuration(&self) -> product_os_configuration::Configuration {
558        self.configuration.clone()
559    }
560
561    pub fn get_key(&self, identifier: &str) -> Option<Vec<u8>> {
562        self.registry.get_key(identifier)
563    }
564
565    pub fn create_key_session(&mut self) -> (String, [u8; 32]) {
566        self.registry.create_key_session()
567    }
568
569    pub fn generate_key(&mut self, session_identifier: &str, remote_public_key: &[u8], association: String, remote_session_identifier: Option<String>) {
570        self.registry.generate_key(session_identifier, remote_public_key, association, remote_session_identifier);
571    }
572
573    #[cfg(feature = "relational_store")]
574    pub fn get_relational_store(&mut self) -> Arc<product_os_store::ProductOSRelationalStore> {
575        self.relational_store.clone()
576    }
577
578    pub fn get_key_value_store(&mut self) -> Arc<product_os_store::ProductOSKeyValueStore> {
579        self.key_value_store.clone()
580    }
581
582    pub async fn add_feature(&mut self, feature: Arc<dyn product_os_capabilities::Feature>, base_path: String, router: &mut product_os_router::ProductOSRouter) {
583        self.registry.add_feature(feature, base_path, router).await;
584    }
585
586    pub async fn add_feature_mut(&mut self, feature: Arc<Mutex<dyn product_os_capabilities::Feature>>, base_path: String, router: &mut product_os_router::ProductOSRouter) {
587        self.registry.add_feature_mut(feature, base_path, router).await;
588    }
589
590    pub async fn remove_feature(&mut self, identifier: &str) {
591        self.registry.remove_feature(identifier).await;
592    }
593
594    pub async fn add_service(&mut self, service: Arc<dyn product_os_capabilities::Service>) {
595        self.registry.add_service(service).await;
596    }
597
598    pub async fn add_service_mut(&mut self, service: Arc<Mutex<dyn product_os_capabilities::Service>>) {
599        self.registry.add_service_mut(service).await;
600    }
601
602    pub async fn set_service_active(&mut self, identifier: String, status: bool) {
603        self.registry.set_service_active(identifier, status).await;
604    }
605
606    pub async fn remove_service(&mut self, identifier: &str) {
607        self.registry.remove_service(identifier).await;
608    }
609
610    pub async fn remove_inactive_services(&mut self, query: BTreeMap<&str, &str>) {
611        self.registry.remove_inactive_services(query).await;
612    }
613
614    pub async fn start_services(&mut self) -> Result<(), ()> {
615        self.registry.start_services().await
616    }
617
618    pub async fn start_service(&mut self, identifier: &str) -> Result<(), ()> {
619        self.registry.start_service(identifier).await
620    }
621
622    pub async fn stop_service(&mut self, identifier: &str) -> Result<(), ()> {
623        self.registry.stop_service(identifier).await
624    }
625
626
627}
628
629
630
631
632
633pub async fn pulse_run(controller_unlocked: Arc<Mutex<ProductOSController>>) {
634    tracing::info!("Starting pulse run...");
635    let controller_locked = controller_unlocked.try_lock_for(core::time::Duration::new(10, 0));
636    match controller_locked {
637        Some(controller) => {
638            #[cfg(feature = "distributed")]
639            controller.discover_nodes().await;
640
641            #[cfg(feature = "distributed")]
642            let self_identifier = controller.registry.get_me().get_identifier();
643            #[cfg(feature = "distributed")]
644            let control_url = controller.registry.get_me().get_address();
645
646            #[cfg(feature = "distributed")]
647            let nodes = controller.registry.get_nodes_endpoints(0, true);
648
649            #[cfg(feature = "distributed")] {
650                match controller.registry.check_me_remote().await {
651                    Some(_) => { controller.registry.update_me_status(true); },
652                    None => {
653                        let alive = controller.registry.update_me_status(false);
654
655                        if !alive {
656                            tracing::error!("Terminating server due to lost presence on remote registry");
657                            std::process::exit(8);
658                        };
659                    }
660                }
661            }
662
663            // Check status of services
664            let services = controller.get_registry().get_me().get_services().list();
665            for (_, service) in services {
666                match service.status().await {
667                    Ok(_) => {}
668                    Err(_) => {}
669                }
670            }
671
672            #[cfg(feature = "distributed")]
673            let client = controller.client.clone();
674
675            // Explicitly drop controller
676            std::mem::drop(controller);
677
678            #[cfg(feature = "distributed")]
679            for (identifier, (url, key)) in nodes {
680                if url != control_url {
681                    match key {
682                        Some(verify_key) => {
683                            match commands::command(&client, url.clone(), verify_key, "status", "ping", None).await {
684                                Ok(response) => {
685                                    let status = response.status();
686
687                                    match status {
688                                        product_os_request::StatusCode::UNAUTHORIZED => {
689                                            let text = {
690                                                let controller_locked = controller_unlocked.try_lock_for(core::time::Duration::new(10, 0));
691                                                match controller_locked {
692                                                    Some(controller) => {
693                                                        controller.client.text(response).await.unwrap_or_else(|e| {
694                                                            tracing::error!("Failed to parse response text: {:?}", e);
695                                                            String::new()
696                                                        })
697                                                    }
698                                                    None => String::new()
699                                                }
700                                            };
701
702                                            let auth: authentication::CommandControlAuthenticateError = match serde_json::from_str(text.as_str()) {
703                                                Ok(auth_error) => auth_error,
704                                                Err(_) => authentication::CommandControlAuthenticateError { error: authentication::CommandControlAuthenticateErrorState::None }
705                                            };
706
707                                            tracing::error!("Error object auth {:?}", auth);
708
709                                            match auth.error {
710                                                authentication::CommandControlAuthenticateErrorState::KeyError(_) => {
711                                                    tracing::info!("Attempting to purge remote node - keys don't match {}", identifier);
712
713                                                    let controller_locked = controller_unlocked.try_lock_for(core::time::Duration::new(10, 0));
714                                                    match controller_locked {
715                                                        Some(mut controller) => {
716                                                            controller.registry.remove_node(identifier.as_str()).await;
717                                                        }
718                                                        None => tracing::error!("Failed to lock controller")
719                                                    }
720                                                },
721                                                authentication::CommandControlAuthenticateErrorState::None => ()
722                                            };
723                                        },
724                                        product_os_request::StatusCode::OK => {
725                                            let body = {
726                                                let controller_locked = controller_unlocked.try_lock_for(core::time::Duration::new(10, 0));
727                                                match controller_locked {
728                                                    Some(controller) => {
729                                                        controller.client.bytes(response).await.unwrap_or_else(|e| {
730                                                            tracing::error!("Failed to parse response bytes: {:?}", e);
731                                                            Bytes::new()
732                                                        })
733                                                    }
734                                                    None => Bytes::new()
735                                                }
736                                            };
737
738                                            tracing::info!("Response received from {}: {} {:?}", url, status, body);
739                                        }
740                                        _ => {
741                                            let body = {
742                                                let controller_locked = controller_unlocked.try_lock_for(core::time::Duration::new(10, 0));
743                                                match controller_locked {
744                                                    Some(controller) => {
745                                                        controller.client.bytes(response).await.unwrap_or_else(|e| {
746                                                            tracing::error!("Failed to parse error response bytes: {:?}", e);
747                                                            Bytes::new()
748                                                        })
749                                                    }
750                                                    None => Bytes::new()
751                                                }
752                                            };
753                                            tracing::error!("Error response received from {}: {} {:?}", url, status, body);
754                                            tracing::info!("Attempting to purge remote node - keys don't match {}", identifier);
755
756                                            let controller_locked = controller_unlocked.try_lock_for(core::time::Duration::new(10, 0));
757                                            match controller_locked {
758                                                Some(mut controller) => {
759                                                    controller.registry.update_pulse_status(identifier.as_str(), false).await;
760                                                }
761                                                None => tracing::error!("Failed to lock controller")
762                                            }
763                                        }
764                                    }
765                                },
766                                Err(e) => {
767                                    tracing::error!("Error encountered {:?} from {}", e, url);
768
769                                    let controller_locked = controller_unlocked.try_lock_for(core::time::Duration::new(10, 0));
770                                    match controller_locked {
771                                        Some(mut controller) => {
772                                            controller.registry.update_pulse_status(identifier.as_str(), false).await;
773                                        }
774                                        None => tracing::error!("Failed to lock controller")
775                                    }
776                                }
777                            }
778                        },
779                        None => ()
780                    }
781                } else {
782                    if self_identifier != identifier {
783                        tracing::info!("Attempting to purge self duplicate node - identifier mismatch {}", identifier);
784                        let controller_locked = controller_unlocked.try_lock_for(core::time::Duration::new(10, 0));
785                        match controller_locked {
786                            Some(mut controller) => {
787                                controller.registry.remove_node(identifier.as_str()).await;
788                            }
789                            None => tracing::error!("Failed to lock controller")
790                        }
791                    }
792                }
793            }
794
795
796            tracing::info!("Finished pulse run...");
797        }
798        None => tracing::error!("Failed to lock controller")
799    }
800}
801
802
803pub async fn run_controller<X, E>(controller_mutex: Arc<Mutex<ProductOSController>>, executor: Arc<E>)
804where
805    E: product_os_async_executor::Executor<X> + product_os_async_executor::ExecutorPerform<X> + product_os_async_executor::Timer
806{
807    let controller_unlocked = controller_mutex.clone();
808    let controller_locked = controller_unlocked.try_lock_for(core::time::Duration::new(10, 0));
809    match controller_locked {
810        Some(mut controller) => {
811
812            #[cfg(feature = "distributed")] {
813                authentication::perform_self_trust(&mut controller);
814
815                controller.registry.update_me().await;
816                tracing::info!("Added self...");
817
818                tracing::info!("Command and Control registered for {}", controller.registry.get_me().get_identifier());
819
820                controller.discover_nodes().await;
821                perform_announce(&mut controller).await;
822            }
823
824            // Start up services
825            match &controller.configuration.command_control {
826                Some(command_control_config) => {
827                    if command_control_config.auto_start_services {
828                        match controller.start_services().await {
829                            Ok(_) => {
830                                tracing::info!("Command Control services started");
831                            },
832                            Err(_) => {
833                                tracing::error!("Command Control services failed to start");
834                            }
835                        }
836                    }
837                }
838                None => {
839                    tracing::error!("Command Control configuration not enabled to start services");
840                }
841            }
842
843            let pulse_check = controller.pulse_check.clone();
844            let pulse_check_cron = cron::Schedule::from_str(controller.pulse_check_cron.as_str()).unwrap();
845            let pulse_check_next = pulse_check_cron.upcoming(chrono::Utc).nth(0).unwrap();
846            let pulse_check_following = pulse_check_cron.upcoming(chrono::Utc).nth(1).unwrap();
847
848            #[cfg(feature = "monitor")]
849            let monitor = controller.monitor.clone();
850            #[cfg(feature = "monitor")]
851            let monitor_cron = cron::Schedule::from_str(controller.monitor_cron.as_str()).unwrap();
852            #[cfg(feature = "monitor")]
853            let _monitor_next = monitor_cron.upcoming(chrono::Utc).nth(0).unwrap();
854            #[cfg(feature = "monitor")]
855            let _monitor_following = monitor_cron.upcoming(chrono::Utc).nth(1).unwrap();
856
857            // Explicitly drop controller
858            std::mem::drop(controller);
859
860            if pulse_check {
861                let _ = E::spawn_from_executor(executor.as_ref(), async move {
862                    tracing::info!("Pulse check service starting...");
863
864                    let start_duration = match u32::try_from(match pulse_check_next.signed_duration_since(chrono::Utc::now()).to_std() {
865                        Ok(d) => d,
866                        Err(_) => Duration::new(1, 0)
867                    }.as_millis()) {
868                        Ok(u) => u,
869                        Err(_) => panic!("Period defined is too large for timer")
870                    };
871                    let interval_duration = match u32::try_from(match pulse_check_following.signed_duration_since(pulse_check_next).to_std() {
872                        Ok(d) => d,
873                        Err(_) => Duration::new(1, 0)
874                    }.as_millis()) {
875                        Ok(u) => u,
876                        Err(_) => panic!("Period defined is too large for timer")
877                    };
878
879                    let mut delay = E::once(start_duration).await;
880                    let mut interval = E::interval(interval_duration).await;
881
882                    delay.tick().await;
883                    loop {
884                        interval.tick().await;
885                        tracing::debug!("Pulse check service running");
886
887                        pulse_run(controller_mutex.clone()).await;
888                    }
889                });
890            }
891
892            #[cfg(feature = "monitor")]
893            if monitor {
894                let _ = E::spawn_from_executor(executor.as_ref(), async move {
895                    tracing::info!("Monitor service starting...");
896
897                    let start_duration = match u32::try_from(match pulse_check_next.signed_duration_since(chrono::Utc::now()).to_std() {
898                        Ok(d) => d,
899                        Err(_) => Duration::new(1, 0)
900                    }.as_millis()) {
901                        Ok(u) => u,
902                        Err(_) => panic!("Period defined is too large for timer")
903                    };
904                    let interval_duration = match u32::try_from(match pulse_check_following.signed_duration_since(pulse_check_next).to_std() {
905                        Ok(d) => d,
906                        Err(_) => Duration::new(1, 0)
907                    }.as_millis()) {
908                        Ok(u) => u,
909                        Err(_) => panic!("Period defined is too large for timer")
910                    };
911
912                    let mut delay = E::once(start_duration).await;
913                    let mut interval = E::interval(interval_duration).await;
914
915                    delay.tick().await;
916                    loop {
917                        interval.tick().await;
918                        tracing::debug!("Monitor service running");
919
920                        product_os_monitoring::process_statistics(None);
921                    }
922                }); // await on the last loop
923            }
924        }
925        None => tracing::error!("Failed to lock controller")
926    }
927}
928
929
930#[cfg(feature = "distributed")]
931pub async fn perform_announce(controller: &mut MutexGuard<'_, ProductOSController>) {
932    tracing::info!("Announce searching for nodes...");
933
934    authentication::perform_key_exchange(controller).await;
935
936    let self_identifier = controller.registry.get_me().get_identifier();
937    let control_url = controller.registry.get_me().get_address();
938
939    let nodes = controller.registry.get_nodes_endpoints(0, true);
940
941    tracing::info!("Starting announce...");
942    tracing::info!("Nodes data {:?}", nodes);
943
944    for (identifier, (url, key)) in nodes {
945        if url != control_url {
946            match key {
947                Some(verify_key) => {
948                    tracing::info!("Announcing {}: {}", identifier, url);
949
950                    match commands::command(&controller.client, url.clone(), verify_key, "status", "announce", Some(serde_json::value::to_value(controller.get_registry().get_me()).unwrap())).await {
951                        Ok(response) => {
952                            let status = response.status();
953
954                            match status {
955                                product_os_request::StatusCode::UNAUTHORIZED => {
956                                    let text = controller.client.text(response).await.unwrap();
957                                    let auth: authentication::CommandControlAuthenticateError = match serde_json::from_str(text.as_str()) {
958                                        Ok(auth_error) => auth_error,
959                                        Err(_) => authentication::CommandControlAuthenticateError { error: authentication::CommandControlAuthenticateErrorState::None }
960                                    };
961
962                                    tracing::error!("Error object auth {:?}", auth);
963
964                                    match auth.error {
965                                        authentication::CommandControlAuthenticateErrorState::KeyError(_) => {
966                                            tracing::info!("Error from remote node - keys don't match {}", identifier);
967                                        },
968                                        authentication::CommandControlAuthenticateErrorState::None => ()
969                                    };
970                                },
971                                product_os_request::StatusCode::OK => {
972                                    let body = controller.client.bytes(response).await.unwrap();
973                                    tracing::info!("Response received from {}: {} {:?}", url, status, body);
974                                }
975                                _ => {
976                                    let body = controller.client.bytes(response).await.unwrap();
977                                    tracing::error!("Error response received from {}: {} {:?}", url, status, body);
978                                }
979                            }
980                        },
981                        Err(e) => {
982                            tracing::error!("Error encountered {:?} from {}", e, url);
983                        }
984                    };
985                },
986                None => ()
987            }
988        }
989        else {
990            if identifier != self_identifier {
991                tracing::info!("Attempting to purge self duplicate node - identifier mismatch {}", identifier);
992                controller.registry.remove_node(identifier.as_str()).await;
993            }
994        }
995    }
996
997    tracing::info!("Finished announcing...");
998}