xnode_controller/
lib.rs

1use std::{collections::HashMap, sync::Arc, time::Duration};
2
3use futures::future::join_all;
4use tokio::{sync::Mutex, time};
5
6mod utils;
7pub use utils::{Error, XnodeControllerError};
8
9use crate::utils::{XnodeControllerErrorInner, add_user_config, outside};
10
11pub trait XnodeController: Send + Sync {
12    /// Get session to update Xnode
13    fn get_session(&self) -> &xnode_manager_sdk::utils::Session;
14
15    /// Decide who should be the current controller based on external data
16    fn check_controller(&self) -> impl Future<Output = Option<String>> + Send;
17
18    // What OS config block should be set for the controller
19    fn controller_config(&self, controller: String) -> String;
20
21    // How to uniquely identify each Xnode
22    fn xnode_identifier(&self) -> String {
23        self.get_session().base_url.clone()
24    }
25
26    /// Set a new owner
27    fn set_controller(
28        &self,
29        controller: Option<String>,
30    ) -> impl Future<Output = Result<xnode_manager_sdk::os::SetOutput, Error>> + Send {
31        async {
32            let session = self.get_session();
33            let xnode_id = self.xnode_identifier();
34            let current_os =
35                xnode_manager_sdk::os::get(xnode_manager_sdk::os::GetInput::new(session))
36                    .await
37                    .map_err(Error::XnodeManagerSDKError)?;
38
39            let new_controller_config = if let Some(controller) = controller {
40                self.controller_config(controller)
41            } else {
42                "".to_string()
43            };
44            let os_block_start = format!("# START XNODE CONTROLLER {}", xnode_id);
45            let os_block_end = format!("# END XNODE CONTROLLER {}", xnode_id);
46            let new_os_config = match outside(&current_os.flake, &os_block_start, &os_block_end) {
47                // Update existing xnode controller block
48                Some((before, after)) => [before, &new_controller_config, after].join("\n"),
49                // No xnode controller block, insert at start of user config
50                None => add_user_config(
51                    &current_os.flake,
52                    "# START USER CONFIG",
53                    &[
54                        "",
55                        &os_block_start,
56                        &new_controller_config,
57                        &os_block_end,
58                        "",
59                    ]
60                    .join("\n"),
61                )
62                .ok_or(Error::XnodeControllerError(XnodeControllerError::new(
63                    XnodeControllerErrorInner::NoUserConfig,
64                )))?,
65            };
66            let os_update =
67                xnode_manager_sdk::os::set(xnode_manager_sdk::os::SetInput::new_with_data(
68                    session,
69                    xnode_manager_sdk::os::OSChange {
70                        acme_email: None,
71                        domain: None,
72                        flake: Some(new_os_config),
73                        update_inputs: None,
74                        user_passwd: None,
75                        xnode_owner: None,
76                    },
77                ))
78                .await
79                .map_err(Error::XnodeManagerSDKError)?;
80
81            Ok(os_update)
82        }
83    }
84}
85
86struct XnodeControllerData {
87    last_controller: Option<String>,
88}
89pub async fn update_controllers<Xnode: XnodeController + 'static>(
90    xnodes: Arc<Mutex<Vec<Xnode>>>,
91    update_interval: Duration,
92) {
93    let mut interval = time::interval(update_interval);
94    let data = Arc::new(Mutex::new(HashMap::<String, XnodeControllerData>::new()));
95
96    loop {
97        interval.tick().await;
98
99        join_all(xnodes.lock().await.iter().map(async |xnode| {
100            let xnode_id = xnode.xnode_identifier();
101            let new_controller = xnode.check_controller().await;
102            if let Some(data) = data.lock().await.get(&xnode_id) {
103                if new_controller == data.last_controller {
104                    return;
105                }
106            }
107
108            log::info!("Setting controller on {xnode_id} to {new_controller:?}");
109            if let Err(e) = xnode.set_controller(new_controller.clone()).await {
110                log::warn!("Error setting controller on {xnode_id} to {new_controller:?}: {e:?}");
111            }
112            data.lock()
113                .await
114                .entry(xnode_id)
115                .and_modify(|data| {
116                    data.last_controller = new_controller.clone();
117                })
118                .or_insert(XnodeControllerData {
119                    last_controller: new_controller,
120                });
121        }))
122        .await;
123    }
124}