product_os_command_control/
lib.rs

1#![no_std]
2extern crate no_std_compat as std;
3
4use bytes::Bytes;
5use std::prelude::v1::*;
6
7pub mod authentication;
8mod registry;
9mod commands;
10
11use std::collections::BTreeMap;
12
13use std::sync::Arc;
14use parking_lot::{ Mutex, MutexGuard };
15use product_os_configuration::{KeyValueKind, KeyValueStore, RelationalKind, RelationalStore, Stores};
16
17use std::str::FromStr;
18use std::time::Duration;
19use serde::{ Deserialize, Serialize };
20use std::num::TryFromIntError;
21
22pub use product_os_request::Method;
23use product_os_request::{ProductOSClient, ProductOSResponse};
24pub use registry::Node;
25
26
27
28pub struct ProductOSController {
29    configuration: product_os_configuration::Configuration,
30
31    certificates: product_os_security::certificates::Certificates,
32
33    max_servers: u8,
34
35    registry: registry::Registry,
36
37    pulse_check: bool,
38    pulse_check_cron: String,
39
40    #[cfg(feature = "monitor")]
41    monitor: bool,
42    #[cfg(feature = "monitor")]
43    monitor_cron: String,
44
45    requester: product_os_request::ProductOSRequester,
46    client: product_os_request::ProductOSRequestClient,
47
48    #[cfg(feature = "relational_store")]
49    relational_store: Arc<product_os_store::ProductOSRelationalStore>,
50    key_value_store: Arc<product_os_store::ProductOSKeyValueStore>,
51}
52
53impl ProductOSController {
54    pub fn new(configuration: product_os_configuration::Configuration, certificates:
55               product_os_security::certificates::Certificates,
56               key_value_store: Option<Arc<product_os_store::ProductOSKeyValueStore>>,
57               #[cfg(feature = "relational_store")] relational_store: Option<Arc<product_os_store::ProductOSRelationalStore>>) -> Self {
58        let key_value_store: Arc<product_os_store::ProductOSKeyValueStore> = key_value_store.unwrap_or_else(|| Arc::new(product_os_store::ProductOSKeyValueStore::new(&KeyValueStore {
59            enabled: false,
60            kind: KeyValueKind::Sink,
61            host: "".to_string(),
62            port: 0,
63            secure: false,
64            db_number: 0,
65            db_name: None,
66            username: None,
67            password: None,
68            pool_size: 0,
69            default_limit: 0,
70            default_offset: 0,
71            prefix: None,
72        })));
73
74        #[cfg(feature = "relational_store")]
75        let relational_store: Arc<product_os_store::ProductOSRelationalStore> = relational_store.unwrap_or_else(|| Arc::new(product_os_store::ProductOSRelationalStore::new(&RelationalStore {
76            enabled: false,
77            kind: RelationalKind::Sink,
78            host: "".to_string(),
79            port: 0,
80            secure: false,
81            db_name: "".to_string(),
82            username: None,
83            password: None,
84            pool_size: 0,
85            default_limit: 0,
86            default_offset: 0,
87            prefix: None,
88        })));
89
90        let registry = registry::Registry::new(&configuration, key_value_store.clone(), certificates.to_owned());
91
92        let mut requester = product_os_request::ProductOSRequester::new();
93        requester.add_header("x-product-os-command", registry.get_me().get_identifier().as_str(), false);
94        let mut request_client = product_os_request::ProductOSRequestClient::new();
95        request_client.build(&requester);
96
97        Self {
98            certificates,
99
100            max_servers: configuration.get_cc_max_servers(),
101
102            pulse_check: configuration.is_pulse_check_enabled(),
103            pulse_check_cron: configuration.get_hearbeat_cron().to_string(),
104
105            #[cfg(feature = "monitor")]
106            monitor: configuration.is_monitor_enabled(),
107            #[cfg(feature = "monitor")]
108            monitor_cron: configuration.get_monitor_cron().to_string(),
109
110            configuration,
111
112            #[cfg(feature = "relational_store")]
113            relational_store,
114            key_value_store,
115
116            registry,
117
118            requester,
119            client: request_client
120        }
121    }
122
123    pub fn get_registry(&self) -> &registry::Registry {
124        &self.registry
125    }
126
127    pub async fn discover_nodes(&mut self) {
128        self.registry.discover_nodes().await;
129
130        for cert in self.registry.get_nodes_raw_certificates(0, true) {
131            self.requester.add_trusted_certificate_pem(cert);
132        }
133
134        self.client.build(&self.requester);
135    }
136
137    pub fn get_max_servers(&self) -> u8 {
138        self.max_servers.to_owned()
139    }
140
141    pub fn upsert_node_local(&mut self, identifier: String, node: Node) {
142        self.registry.upsert_node_local(identifier, node);
143    }
144
145    pub fn get_certificates(&self) -> Vec<&[u8]> {
146        let mut certificates = vec!();
147        for cert in &self.certificates.certificates {
148            certificates.push(cert.as_slice())
149        }
150
151        certificates
152    }
153
154    pub fn get_private_key(&self) -> &[u8] {
155        self.certificates.private.as_slice()
156    }
157
158    pub fn get_certificates_and_private_key(&self) -> product_os_security::certificates::Certificates {
159        self.certificates.to_owned()
160    }
161
162    pub fn validate_certificate(&self, certificate: &[u8]) -> bool {
163        tracing::trace!("Checking certificate stored {:?} vs given {:?}", self.certificates.certificates, certificate);
164
165        for cert in self.get_certificates() {
166            if cert.eq(certificate) { return true; }
167        }
168
169        false
170    }
171
172    pub fn validate_verify_tag<T>(&self, query: Option<&BTreeMap<String, String>>, payload: Option<T>,
173                                  headers: Option<&BTreeMap<String, String>>, verify_identifier: &str, verify_tag: &str) -> bool
174    where T: product_os_security::AsByteVector
175    {
176        tracing::trace!("Checking verification provided {} from {}", verify_tag, verify_identifier);
177        let key = self.registry.get_key(verify_identifier);
178        tracing::trace!("Checking verification with key {:?}", key);
179
180        match key {
181            Some(verify_key) => {
182                product_os_security::verify_auth_request(query, false, payload, headers,
183                                                       &["x-product-os-verify", "x-product-os-command", "x-product-os-control"],
184                                                       verify_tag, Some(verify_key.as_slice()))
185            },
186            None => false
187        }
188    }
189
190    pub fn authenticate_command_control<T>(&self, query: Option<&BTreeMap<String, String>>, payload: Option<T>, headers: Option<&BTreeMap<String, String>>,
191                                        verify_identifier: Option<&str>, verify_tag: Option<&str>) -> Result<bool, authentication::CommandControlAuthenticateError>
192    where T: product_os_security::AsByteVector
193    {
194        tracing::trace!("Verify identifier {:?} and tag {:?} verify result", &verify_identifier, &verify_tag);
195        if verify_identifier.is_some() && verify_tag.is_some() && self.validate_verify_tag(query, payload, headers, verify_identifier.unwrap(), verify_tag.unwrap()) {
196            Ok(true)
197        }
198        else {
199            Err(authentication::CommandControlAuthenticateError {
200                error: authentication::CommandControlAuthenticateErrorState::KeyError(String::from("One of the keys provided was not valid"))
201            })
202        }
203    }
204
205    /*
206    async fn search_and_command(&self, query: BTreeMap<String, String>, module: String, instruction: String, data: Option<serde_json::Value>)  -> Result<ProductOSResponse, product_os_request::ProductOSRequestError> {
207        let matching_node = self.registry.pick_node(query);
208        let client = &self.client;
209
210        match matching_node {
211            Some(node) => {
212                match self.registry.get_key(node.get_identifier()) {
213                    None => Err(product_os_request::ProductOSRequestError::Error(format!("No key found for matching node"))),
214                    Some(key) => commands::command_node(client, node, key, module, instruction, data).await
215                }
216            },
217            None => Err(product_os_request::ProductOSRequestError::Error(format!("No matching node found")))
218        }
219    }
220    */
221
222    pub fn search_and_prepare_command(&self, query: BTreeMap<&str, &str>, module: String, instruction: String, data: Option<serde_json::Value>)  -> Result<crate::commands::Command, product_os_request::ProductOSRequestError> {
223        let matching_node = self.registry.pick_node(query);
224        let client = &self.client;
225
226        match matching_node {
227            Some(node) => {
228                match self.registry.get_key(node.get_identifier().as_str()) {
229                    None => Err(product_os_request::ProductOSRequestError::Error(format!("No key found for matching node"))),
230                    Some(key) => Ok(commands::Command {
231                        requester: client.clone(),
232                        node_url: node.get_address().to_owned(),
233                        verify_key: key,
234                        module,
235                        instruction,
236                        data
237                    })
238                }
239            },
240            None => Err(product_os_request::ProductOSRequestError::Error(format!("No matching node found")))
241        }
242    }
243
244    /*
245    async fn find_and_command(&self, key: String, manager: String, instruction: String, data: Option<serde_json::Value>) -> Result<ProductOSResponse, product_os_request::ProductOSRequestError> {
246        let query = BTreeMap::from([
247            ("manager.key".to_string(), key),
248            ("manager.kind".to_string(), manager.to_string()),
249            ("manager.enabled".to_string(), true.to_string())
250        ]);
251
252        self.search_and_command(query, manager.to_string(), instruction, data).await
253    }
254    */
255
256    pub fn find_and_prepare_command(&self, key: &str, manager: &str, instruction: String, data: Option<serde_json::Value>) -> Result<commands::Command, product_os_request::ProductOSRequestError> {
257        let enabled = true.to_string();
258
259        let query = BTreeMap::from([
260            ("manager.key", key),
261            ("manager.kind", manager),
262            ("manager.enabled", enabled.as_str())
263        ]);
264
265        self.search_and_prepare_command(query, manager.to_string(), instruction, data)
266    }
267
268    /*
269    async fn find_any_and_command(&self, capability: String, manager: String, instruction: String, data: Option<serde_json::Value>) -> Result<ProductOSResponse, product_os_request::ProductOSRequestError> {
270        let query = BTreeMap::from([
271            ("capability".to_string(), capability),
272            ("manager.enabled".to_string(), true.to_string())
273        ]);
274
275        self.search_and_command(query, manager.to_string(), instruction, data).await
276    }
277    */
278
279    pub fn find_any_and_prepare_command(&self, capability: &str, manager: &str, instruction: &str, data: Option<serde_json::Value>) -> Result<commands::Command, product_os_request::ProductOSRequestError> {
280        let enabled = true.to_string();
281
282        let query = BTreeMap::from([
283            ("capability", capability),
284            ("manager.enabled", enabled.as_str())
285        ]);
286
287        self.search_and_prepare_command(query, manager.to_string(), instruction.to_string(), data)
288    }
289
290    /*
291    async fn find_kind_and_command(&self, kind: String, manager: String, instruction: String, data: Option<serde_json::Value>) -> Result<ProductOSResponse, product_os_request::ProductOSRequestError> {
292        let query = BTreeMap::from([
293            ("kind".to_string(), kind),
294            ("manager.enabled".to_string(), true.to_string())
295        ]);
296
297        self.search_and_command(query, manager.to_string(), instruction, data).await
298    }
299    */
300
301    pub async fn find_kind_and_prepare_command(&self, kind: &str, manager: &str, instruction: &str, data: Option<serde_json::Value>) -> Result<commands::Command, product_os_request::ProductOSRequestError> {
302        let enabled = true.to_string();
303        let query = BTreeMap::from([
304            ("kind", kind),
305            ("manager.enabled", enabled.as_str())
306        ]);
307
308        self.search_and_prepare_command(query, manager.to_string(), instruction.to_string(), data)
309    }
310
311    /*
312    async fn search_and_ask(&self, query: BTreeMap<String, String>, path: String, data: Option<serde_json::Value>, headers: BTreeMap<String, String>,
313                          params: BTreeMap<String, String>, method: product_os_request::Method) -> Result<ProductOSResponse, product_os_request::ProductOSRequestError> {
314        let matching_node = self.registry.pick_node(query);
315        let client = &self.client;
316        match matching_node {
317            Some(node) => {
318                match self.registry.get_key(node.get_identifier()) {
319                    None => Err(product_os_request::ProductOSRequestError::Error(format!("No key found for matching node"))),
320                    Some(key) => commands::ask_node(client, node, key, path, data, headers, params, method).await
321                }
322            },
323            None => Err(product_os_request::ProductOSRequestError::Error(format!("No matching node found")))
324        }
325    }
326    */
327
328    pub fn search_and_prepare_ask(&self, query: BTreeMap<&str, &str>, path: &str, data: Option<serde_json::Value>, headers: BTreeMap<String, String>,
329                                params: BTreeMap<String, String>, method: Method) -> Result<commands::Ask, product_os_request::ProductOSRequestError> {
330        let matching_node = self.registry.pick_node(query);
331        let client = &self.client;
332        match matching_node {
333            Some(node) => {
334                match self.registry.get_key(node.get_identifier().as_str()) {
335                    None => Err(product_os_request::ProductOSRequestError::Error(format!("No key found for matching node"))),
336                    Some(key) => Ok(commands::Ask {
337                        requester: client.clone(),
338                        node_url: node.get_address().to_owned(),
339                        verify_key: key,
340                        path: path.to_string(),
341                        data,
342                        headers,
343                        params,
344                        method
345                    })
346                }
347
348            },
349            None => Err(product_os_request::ProductOSRequestError::Error(format!("No matching node found")))
350        }
351    }
352
353    pub async fn find_feature_and_ask(&self, feature: &str, path: &str, data: &Option<serde_json::Value>, headers: &BTreeMap<String, String>,
354                                params: &BTreeMap<String, String>, method: &Method) -> Result<ProductOSResponse<product_os_request::BodyBytes>, product_os_request::ProductOSRequestError> {
355        let matching_node = self.registry.pick_node_for_feature(feature);
356        let client = &self.client;
357        match matching_node {
358            Some(node) => {
359                match self.registry.get_key(node.get_identifier().as_str()) {
360                    None => Err(product_os_request::ProductOSRequestError::Error(format!("No key found for matching node"))),
361                    Some(key) => commands::ask_node(client, node, key.as_slice(), path, data, headers, params, method).await
362                }
363
364            },
365            None => Err(product_os_request::ProductOSRequestError::Error(format!("No matching node found")))
366        }
367    }
368
369    pub fn find_feature_and_prepare_ask(&self, feature: &str, path: &str, data: Option<serde_json::Value>, headers: BTreeMap<String, String>,
370                                      params: BTreeMap<String, String>, method: Method) -> Result<commands::Ask, product_os_request::ProductOSRequestError> {
371        let matching_node = self.registry.pick_node_for_feature(feature);
372        let client = &self.client;
373        match matching_node {
374            Some(node) => {
375                match self.registry.get_key(node.get_identifier().as_str()) {
376                    None => Err(product_os_request::ProductOSRequestError::Error(format!("No key found for matching node"))),
377                    Some(key) => Ok(commands::Ask {
378                        requester: client.clone(),
379                        node_url: node.get_address().to_owned(),
380                        verify_key: key,
381                        path: path.to_string(),
382                        data,
383                        headers,
384                        params,
385                        method
386                    })
387                }
388
389            },
390            None => Err(product_os_request::ProductOSRequestError::Error(format!("No matching node found")))
391        }
392    }
393
394    /*
395    pub fn find_feature_and_ask_sync(&mut self, feature: String, path: String, data: Option<serde_json::Value>, headers: BTreeMap<String, String>,
396                                      params: BTreeMap<String, String>, method: product_os_request::Method) -> Result<product_os_request::ProductOSResponse, product_os_request::ProductOSRequestSyncError> {
397        let matching_node = self.registry.pick_node_for_feature(feature);
398        let client = &mut self.client;
399        match matching_node {
400            Some(node) => {
401                match self.registry.get_key(node.get_identifier()) {
402                    None => Err(product_os_request::ProductOSRequestSyncError {
403                        error: product_os_request::ProductOSRequestError::Error("No key found for matching node".to_string()),
404                        generated_error: None
405                    }),
406                    Some(key) => commands::ask_node_sync(client, node, key, path, data, headers, params, method)
407                }
408
409            },
410            None => Err(product_os_request::ProductOSRequestSyncError {
411                error: product_os_request::ProductOSRequestError::Error("No matching node found".to_string()),
412                generated_error: None
413            })
414        }
415    }
416    */
417
418    pub fn get_configuration(&self) -> product_os_configuration::Configuration {
419        self.configuration.clone()
420    }
421
422    pub fn get_key(&self, identifier: &str) -> Option<Vec<u8>> {
423        self.registry.get_key(identifier)
424    }
425
426    pub fn create_key_session(&mut self) -> (String, [u8; 32]) {
427        self.registry.create_key_session()
428    }
429
430    pub fn generate_key(&mut self, session_identifier: &str, remote_public_key: &[u8], association: String, remote_session_identifier: Option<String>) {
431        self.registry.generate_key(session_identifier, remote_public_key, association, remote_session_identifier);
432    }
433
434    #[cfg(feature = "relational_store")]
435    pub fn get_relational_store(&mut self) -> Arc<product_os_store::ProductOSRelationalStore> {
436        self.relational_store.clone()
437    }
438
439    pub fn get_key_value_store(&mut self) -> Arc<product_os_store::ProductOSKeyValueStore> {
440        self.key_value_store.clone()
441    }
442
443    pub async fn add_feature(&mut self, feature: Arc<dyn product_os_capabilities::Feature>, base_path: String, router: &mut product_os_router::ProductOSRouter) {
444        self.registry.add_feature(feature, base_path, router).await;
445    }
446
447    pub async fn add_feature_mut(&mut self, feature: Arc<Mutex<dyn product_os_capabilities::Feature>>, base_path: String, router: &mut product_os_router::ProductOSRouter) {
448        self.registry.add_feature_mut(feature, base_path, router).await;
449    }
450
451    pub async fn remove_feature(&mut self, identifier: &str) {
452        self.registry.remove_feature(identifier).await;
453    }
454
455    pub async fn add_service(&mut self, service: Arc<dyn product_os_capabilities::Service>) {
456        self.registry.add_service(service).await;
457    }
458
459    pub async fn add_service_mut(&mut self, service: Arc<Mutex<dyn product_os_capabilities::Service>>) {
460        self.registry.add_service_mut(service).await;
461    }
462
463    pub async fn set_service_active(&mut self, identifier: String, status: bool) {
464        self.registry.set_service_active(identifier, status).await;
465    }
466
467    pub async fn remove_service(&mut self, identifier: &str) {
468        self.registry.remove_service(identifier).await;
469    }
470
471    pub async fn remove_inactive_services(&mut self, query: BTreeMap<&str, &str>) {
472        self.registry.remove_inactive_services(query).await;
473    }
474
475    pub async fn start_services(&mut self) {
476        self.registry.start_services().await;
477    }
478}
479
480
481
482
483
484pub async fn pulse_run(controller_unlocked: Arc<Mutex<ProductOSController>>) {
485    tracing::info!("Starting pulse run...");
486    let controller_locked = controller_unlocked.try_lock_for(core::time::Duration::new(10, 0));
487    match controller_locked {
488        Some(mut controller) => {
489            #[cfg(feature = "distributed")]
490            controller.discover_nodes().await;
491
492            #[cfg(feature = "distributed")]
493            let self_identifier = controller.registry.get_me().get_identifier();
494            #[cfg(feature = "distributed")]
495            let control_url = controller.registry.get_me().get_address();
496
497            #[cfg(feature = "distributed")]
498            let nodes = controller.registry.get_nodes_endpoints(0, true);
499
500            #[cfg(feature = "distributed")] {
501                match controller.registry.check_me_remote().await {
502                    Some(_) => { controller.registry.update_me_status(true); },
503                    None => {
504                        let alive = controller.registry.update_me_status(false);
505
506                        if !alive {
507                            tracing::error!("Terminating server due to lost presence on remote registry");
508                            std::process::exit(8);
509                        };
510                    }
511                }
512            }
513
514            // Check status of services
515            let services = controller.get_registry().get_me().get_services().list();
516            for (_, service) in services {
517                match service.status().await {
518                    Ok(_) => {}
519                    Err(_) => {}
520                }
521            }
522
523            #[cfg(feature = "distributed")]
524            let client = controller.client.clone();
525
526            // Explicitly drop controller
527            std::mem::drop(controller);
528
529            #[cfg(feature = "distributed")]
530            for (identifier, (url, key)) in nodes {
531                if url != control_url {
532                    match key {
533                        Some(verify_key) => {
534                            match commands::command(&client, url.clone(), verify_key, "status", "ping", None).await {
535                                Ok(response) => {
536                                    let status = response.status();
537
538                                    match status {
539                                        product_os_request::StatusCode::UNAUTHORIZED => {
540                                            let text = {
541                                                let controller_locked = controller_unlocked.try_lock_for(core::time::Duration::new(10, 0));
542                                                match controller_locked {
543                                                    Some(mut controller) => {
544                                                        controller.client.text(response).await.unwrap()
545                                                    }
546                                                    None => String::new()
547                                                }
548                                            };
549
550                                            let auth: authentication::CommandControlAuthenticateError = match serde_json::from_str(text.as_str()) {
551                                                Ok(auth_error) => auth_error,
552                                                Err(_) => authentication::CommandControlAuthenticateError { error: authentication::CommandControlAuthenticateErrorState::None }
553                                            };
554
555                                            tracing::error!("Error object auth {:?}", auth);
556
557                                            match auth.error {
558                                                authentication::CommandControlAuthenticateErrorState::KeyError(_) => {
559                                                    tracing::info!("Attempting to purge remote node - keys don't match {}", identifier);
560
561                                                    let controller_locked = controller_unlocked.try_lock_for(core::time::Duration::new(10, 0));
562                                                    match controller_locked {
563                                                        Some(mut controller) => {
564                                                            controller.registry.remove_node(identifier.as_str()).await;
565                                                        }
566                                                        None => tracing::error!("Failed to lock controller")
567                                                    }
568                                                },
569                                                authentication::CommandControlAuthenticateErrorState::None => ()
570                                            };
571                                        },
572                                        product_os_request::StatusCode::OK => {
573                                            let body = {
574                                                let controller_locked = controller_unlocked.try_lock_for(core::time::Duration::new(10, 0));
575                                                match controller_locked {
576                                                    Some(mut controller) => {
577                                                        controller.client.bytes(response).await.unwrap()
578                                                    }
579                                                    None => Bytes::new()
580                                                }
581                                            };
582
583                                            tracing::info!("Response received from {}: {} {:?}", url, status, body);
584                                        }
585                                        _ => {
586                                            let body = {
587                                                let controller_locked = controller_unlocked.try_lock_for(core::time::Duration::new(10, 0));
588                                                match controller_locked {
589                                                    Some(mut controller) => {
590                                                        controller.client.bytes(response).await.unwrap()
591                                                    }
592                                                    None => Bytes::new()
593                                                }
594                                            };
595                                            tracing::error!("Error response received from {}: {} {:?}", url, status, body);
596                                            tracing::info!("Attempting to purge remote node - keys don't match {}", identifier);
597
598                                            let controller_locked = controller_unlocked.try_lock_for(core::time::Duration::new(10, 0));
599                                            match controller_locked {
600                                                Some(mut controller) => {
601                                                    controller.registry.update_pulse_status(identifier.as_str(), false).await;
602                                                }
603                                                None => tracing::error!("Failed to lock controller")
604                                            }
605                                        }
606                                    }
607                                },
608                                Err(e) => {
609                                    tracing::error!("Error encountered {:?} from {}", e, url);
610
611                                    let controller_locked = controller_unlocked.try_lock_for(core::time::Duration::new(10, 0));
612                                    match controller_locked {
613                                        Some(mut controller) => {
614                                            controller.registry.update_pulse_status(identifier.as_str(), false).await;
615                                        }
616                                        None => tracing::error!("Failed to lock controller")
617                                    }
618                                }
619                            }
620                        },
621                        None => ()
622                    }
623                } else {
624                    if self_identifier != identifier {
625                        tracing::info!("Attempting to purge self duplicate node - identifier mismatch {}", identifier);
626                        let controller_locked = controller_unlocked.try_lock_for(core::time::Duration::new(10, 0));
627                        match controller_locked {
628                            Some(mut controller) => {
629                                controller.registry.remove_node(identifier.as_str()).await;
630                            }
631                            None => tracing::error!("Failed to lock controller")
632                        }
633                    }
634                }
635            }
636
637
638            tracing::info!("Finished pulse run...");
639        }
640        None => tracing::error!("Failed to lock controller")
641    }
642}
643
644
645pub async fn run_controller<X, E>(controller_mutex: Arc<Mutex<ProductOSController>>, executor: Arc<E>)
646where
647    E: product_os_async_executor::Executor<X> + product_os_async_executor::ExecutorPerform<X> + product_os_async_executor::Timer
648{
649    let controller_unlocked = controller_mutex.clone();
650    let controller_locked = controller_unlocked.try_lock_for(core::time::Duration::new(10, 0));
651    match controller_locked {
652        Some(mut controller) => {
653
654            #[cfg(feature = "distributed")] {
655                authentication::perform_self_trust(&mut controller);
656
657                controller.registry.update_me().await;
658                tracing::info!("Added self...");
659
660                tracing::info!("Command and Control registered for {}", controller.registry.get_me().get_identifier());
661
662                controller.discover_nodes().await;
663                perform_announce(&mut controller).await;
664
665                // Start up services
666                controller.start_services().await;
667            }
668
669            let pulse_check = controller.pulse_check.clone();
670            let pulse_check_cron = cron::Schedule::from_str(controller.pulse_check_cron.as_str()).unwrap();
671            let pulse_check_next = pulse_check_cron.upcoming(chrono::Utc).nth(0).unwrap();
672            let pulse_check_following = pulse_check_cron.upcoming(chrono::Utc).nth(1).unwrap();
673
674            #[cfg(feature = "monitor")]
675            let monitor = controller.monitor.clone();
676            #[cfg(feature = "monitor")]
677            let monitor_cron = cron::Schedule::from_str(controller.monitor_cron.as_str()).unwrap();
678            #[cfg(feature = "monitor")]
679            let monitor_next = monitor_cron.upcoming(chrono::Utc).nth(0).unwrap();
680            #[cfg(feature = "monitor")]
681            let monitor_following = monitor_cron.upcoming(chrono::Utc).nth(1).unwrap();
682
683            // Explicitly drop controller
684            std::mem::drop(controller);
685
686            if pulse_check {
687                let _ = E::spawn_from_executor(executor.as_ref(), async move {
688                    tracing::info!("Pulse check service starting...");
689
690                    let start_duration = match u32::try_from(match pulse_check_next.signed_duration_since(chrono::Utc::now()).to_std() {
691                        Ok(d) => d,
692                        Err(_) => Duration::new(1, 0)
693                    }.as_millis()) {
694                        Ok(u) => u,
695                        Err(_) => panic!("Period defined is too large for timer")
696                    };
697                    let interval_duration = match u32::try_from(match pulse_check_following.signed_duration_since(pulse_check_next).to_std() {
698                        Ok(d) => d,
699                        Err(_) => Duration::new(1, 0)
700                    }.as_millis()) {
701                        Ok(u) => u,
702                        Err(_) => panic!("Period defined is too large for timer")
703                    };
704
705                    let mut delay = E::once(start_duration).await;
706                    let mut interval = E::interval(interval_duration).await;
707
708                    delay.tick().await;
709                    loop {
710                        interval.tick().await;
711                        tracing::debug!("Pulse check service running");
712
713                        pulse_run(controller_mutex.clone()).await;
714                    }
715                });
716            }
717
718            #[cfg(feature = "monitor")]
719            if monitor {
720                let _ = E::spawn_from_executor(executor.as_ref(), async move {
721                    tracing::info!("Monitor service starting...");
722
723                    let start_duration = match u32::try_from(match pulse_check_next.signed_duration_since(chrono::Utc::now()).to_std() {
724                        Ok(d) => d,
725                        Err(_) => Duration::new(1, 0)
726                    }.as_millis()) {
727                        Ok(u) => u,
728                        Err(_) => panic!("Period defined is too large for timer")
729                    };
730                    let interval_duration = match u32::try_from(match pulse_check_following.signed_duration_since(pulse_check_next).to_std() {
731                        Ok(d) => d,
732                        Err(_) => Duration::new(1, 0)
733                    }.as_millis()) {
734                        Ok(u) => u,
735                        Err(_) => panic!("Period defined is too large for timer")
736                    };
737
738                    let mut delay = E::once(start_duration).await;
739                    let mut interval = E::interval(interval_duration).await;
740
741                    delay.tick().await;
742                    loop {
743                        interval.tick().await;
744                        tracing::debug!("Monitor service running");
745
746                        product_os_monitoring::process_statistics(None);
747                    }
748                }); // await on the last loop
749            }
750        }
751        None => tracing::error!("Failed to lock controller")
752    }
753}
754
755
756#[cfg(feature = "distributed")]
757pub async fn perform_announce(controller: &mut MutexGuard<'_, ProductOSController>) {
758    tracing::info!("Announce searching for nodes...");
759
760    authentication::perform_key_exchange(controller).await;
761
762    let self_identifier = controller.registry.get_me().get_identifier();
763    let control_url = controller.registry.get_me().get_address();
764
765    let nodes = controller.registry.get_nodes_endpoints(0, true);
766
767    tracing::info!("Starting announce...");
768    tracing::info!("Nodes data {:?}", nodes);
769
770    for (identifier, (url, key)) in nodes {
771        if url != control_url {
772            match key {
773                Some(verify_key) => {
774                    tracing::info!("Announcing {}: {}", identifier, url);
775
776                    match commands::command(&controller.client, url.clone(), verify_key, "status", "announce", Some(serde_json::value::to_value(controller.get_registry().get_me()).unwrap())).await {
777                        Ok(response) => {
778                            let status = response.status();
779
780                            match status {
781                                product_os_request::StatusCode::UNAUTHORIZED => {
782                                    let text = controller.client.text(response).await.unwrap();
783                                    let auth: authentication::CommandControlAuthenticateError = match serde_json::from_str(text.as_str()) {
784                                        Ok(auth_error) => auth_error,
785                                        Err(_) => authentication::CommandControlAuthenticateError { error: authentication::CommandControlAuthenticateErrorState::None }
786                                    };
787
788                                    tracing::error!("Error object auth {:?}", auth);
789
790                                    match auth.error {
791                                        authentication::CommandControlAuthenticateErrorState::KeyError(_) => {
792                                            tracing::info!("Error from remote node - keys don't match {}", identifier);
793                                        },
794                                        authentication::CommandControlAuthenticateErrorState::None => ()
795                                    };
796                                },
797                                product_os_request::StatusCode::OK => {
798                                    let body = controller.client.bytes(response).await.unwrap();
799                                    tracing::info!("Response received from {}: {} {:?}", url, status, body);
800                                }
801                                _ => {
802                                    let body = controller.client.bytes(response).await.unwrap();
803                                    tracing::error!("Error response received from {}: {} {:?}", url, status, body);
804                                }
805                            }
806                        },
807                        Err(e) => {
808                            tracing::error!("Error encountered {:?} from {}", e, url);
809                        }
810                    };
811                },
812                None => ()
813            }
814        }
815        else {
816            if identifier != self_identifier {
817                tracing::info!("Attempting to purge self duplicate node - identifier mismatch {}", identifier);
818                controller.registry.remove_node(identifier.as_str()).await;
819            }
820        }
821    }
822
823    tracing::info!("Finished announcing...");
824}