1use std::{collections::HashMap, sync::Arc, time::Duration};
2
3use futures::future::join_all;
4use tokio::{sync::Mutex, time};
5
6mod utils;
7pub use utils::{Error, XnodeControllerError};
8
9use crate::utils::{XnodeControllerErrorInner, add_user_config, outside};
10
11pub trait XnodeController: Send + Sync {
12 fn get_session(&self) -> &xnode_manager_sdk::utils::Session;
14
15 fn check_controller(&self) -> impl Future<Output = Option<String>> + Send;
17
18 fn controller_config(&self, controller: String) -> String;
20
21 fn xnode_identifier(&self) -> String {
23 self.get_session().base_url.clone()
24 }
25
26 fn set_controller(
28 &self,
29 controller: Option<String>,
30 ) -> impl Future<Output = Result<xnode_manager_sdk::os::SetOutput, Error>> + Send {
31 async {
32 let session = self.get_session();
33 let xnode_id = self.xnode_identifier();
34 let current_os =
35 xnode_manager_sdk::os::get(xnode_manager_sdk::os::GetInput::new(session))
36 .await
37 .map_err(Error::XnodeManagerSDKError)?;
38
39 let new_controller_config = if let Some(controller) = controller {
40 self.controller_config(controller)
41 } else {
42 "".to_string()
43 };
44 let os_block_start = format!("# START XNODE CONTROLLER {}", xnode_id);
45 let os_block_end = format!("# END XNODE CONTROLLER {}", xnode_id);
46 let new_os_config = match outside(¤t_os.flake, &os_block_start, &os_block_end) {
47 Some((before, after)) => [before, &new_controller_config, after].join("\n"),
49 None => add_user_config(
51 ¤t_os.flake,
52 "# START USER CONFIG",
53 &[
54 "",
55 &os_block_start,
56 &new_controller_config,
57 &os_block_end,
58 "",
59 ]
60 .join("\n"),
61 )
62 .ok_or(Error::XnodeControllerError(XnodeControllerError::new(
63 XnodeControllerErrorInner::NoUserConfig,
64 )))?,
65 };
66 let os_update =
67 xnode_manager_sdk::os::set(xnode_manager_sdk::os::SetInput::new_with_data(
68 session,
69 xnode_manager_sdk::os::OSChange {
70 acme_email: None,
71 domain: None,
72 flake: Some(new_os_config),
73 update_inputs: None,
74 user_passwd: None,
75 xnode_owner: None,
76 },
77 ))
78 .await
79 .map_err(Error::XnodeManagerSDKError)?;
80
81 Ok(os_update)
82 }
83 }
84}
85
86struct XnodeControllerData {
87 last_controller: Option<String>,
88}
89pub async fn update_controllers<Xnode: XnodeController + 'static>(
90 xnodes: Arc<Mutex<Vec<Xnode>>>,
91 update_interval: Duration,
92) {
93 let mut interval = time::interval(update_interval);
94 let data = Arc::new(Mutex::new(HashMap::<String, XnodeControllerData>::new()));
95
96 loop {
97 interval.tick().await;
98
99 join_all(xnodes.lock().await.iter().map(async |xnode| {
100 let xnode_id = xnode.xnode_identifier();
101 let new_controller = xnode.check_controller().await;
102 if let Some(data) = data.lock().await.get(&xnode_id) {
103 if new_controller == data.last_controller {
104 return;
105 }
106 }
107
108 log::info!("Setting controller on {xnode_id} to {new_controller:?}");
109 if let Err(e) = xnode.set_controller(new_controller.clone()).await {
110 log::warn!("Error setting controller on {xnode_id} to {new_controller:?}: {e:?}");
111 }
112 data.lock()
113 .await
114 .entry(xnode_id)
115 .and_modify(|data| {
116 data.last_controller = new_controller.clone();
117 })
118 .or_insert(XnodeControllerData {
119 last_controller: new_controller,
120 });
121 }))
122 .await;
123 }
124}