gosh_remote/
scheduler.rs

1// [[file:../remote.note::c07df478][c07df478]]
2use super::*;
3
4use base::{Job, Node};
5// c07df478 ends here
6
7// [[file:../remote.note::b1a3ac5f][b1a3ac5f]]
8mod dispatch;
9// b1a3ac5f ends here
10
11// [[file:../remote.note::6730a02b][6730a02b]]
12use crate::Client;
13use std::path::Path;
14
15impl Client {
16    /// Request server to run `cmd` in directory `wrk_dir`.
17    pub async fn run_cmd(&self, cmd: &str, wrk_dir: &Path) -> Result<String> {
18        let wrk_dir = wrk_dir.shell_escape_lossy();
19        #[rustfmt::skip]
20        let script = format!("#! /usr/bin/env bash
21set -x
22cd {wrk_dir}
23{cmd}
24");
25        let job = Job::new(script);
26        let o = self.post("jobs", job).await?;
27
28        Ok(o)
29    }
30
31    /// Request server to add a new node for remote computation.
32    pub async fn add_node(&self, node: impl Into<Node>) -> Result<()> {
33        self.post("nodes", node.into()).await?;
34        Ok(())
35    }
36
37    #[tokio::main()]
38    /// For non-async call
39    pub(crate) async fn run_cmd_block(&self, cmd: &str, wrk_dir: &Path) -> Result<String> {
40        let s = self.run_cmd(cmd, wrk_dir).await?;
41        Ok(s)
42    }
43}
44// 6730a02b ends here
45
46// [[file:../remote.note::dec20ace][dec20ace]]
47mod routes {
48    use super::*;
49    use crate::gchemol::Molecule;
50    use crate::rest::AppError;
51    use crate::worker::ComputationResult;
52    use gosh_model::Computed;
53    use dispatch::TaskClient;
54
55    use axum::extract::State;
56    use axum::Json;
57
58    /// Handle request for adding a new node into `Nodes`
59    #[axum::debug_handler]
60    async fn add_node(State(task): State<TaskClient>, Json(node): Json<Node>) -> Result<(), AppError> {
61        task.add_node(node).await?;
62        Ok(())
63    }
64
65    /// Handle request for adding a new mol
66    #[axum::debug_handler]
67    async fn add_mol(State(task): State<TaskClient>, Json(mol): Json<Molecule>) -> Result<Json<Computed>, AppError> {
68        let o = task.compute_molecule(mol).await?;
69        Ok(Json(o))
70    }
71
72    /// Handle request for adding a new node into `Nodes`
73    #[axum::debug_handler]
74    async fn add_job(
75        State(task): State<TaskClient>,
76        Json(job): Json<Job>,
77    ) -> Result<Json<ComputationResult>, AppError> {
78        let r = task.run_cmd(job).await?;
79        let c = ComputationResult::parse_from_json(&r)?;
80        Ok(Json(c))
81    }
82
83    pub(super) async fn run_restful(addr: impl Into<SocketAddr>, state: TaskClient) -> Result<()> {
84        use axum::routing::post;
85
86        let app = axum::Router::new()
87            .route("/jobs", post(add_job))
88            .with_state(state.clone())
89            .route("/mols", post(add_mol))
90            .with_state(state.clone())
91            .route("/nodes", post(add_node))
92            .with_state(state);
93        let addr = addr.into();
94
95        let x = axum::Server::bind(&addr).serve(app.into_make_service()).await?;
96        Ok(())
97    }
98}
99// dec20ace ends here
100
101// [[file:../remote.note::3ce50110][3ce50110]]
102use gchemol::Molecule;
103
104/// Represent any input submited to remote node for computation.
105#[derive(Debug, Clone)]
106enum Jobx {
107    Job(Job),
108    Mol(Molecule),
109}
110
111impl Jobx {
112    fn job_name(&self) -> String {
113        match self {
114            Self::Job(job) => job.name(),
115            Self::Mol(mol) => mol.title(),
116        }
117    }
118
119    async fn run_on(self, node: &Node) -> Result<String> {
120        let client = Client::connect(node);
121        match self {
122            Self::Job(job) => {
123                let o = client.post("jobs", job).await?;
124                Ok(o)
125            }
126            Self::Mol(mol) => {
127                let o = client.post("mols", mol).await?;
128                Ok(o)
129            }
130        }
131    }
132}
133// 3ce50110 ends here
134
135// [[file:../remote.note::63fb876f][63fb876f]]
136use base::Nodes;
137use server::Server;
138
139impl Server {
140    /// Start a server as a scheduler for computational jobs.
141    pub async fn serve_as_scheduler(&self) {
142        println!("scheduler listening on {:?}", self.address);
143
144        // the server side
145        let (mut task_server, task_client) = self::dispatch::new_interactive_task();
146        let nodes: Vec<String> = vec![];
147        let h1 = tokio::spawn(async move {
148            if let Err(e) = task_server.run_and_serve(Nodes::new(nodes)).await {
149                error!("task server: {e:?}");
150            }
151        });
152        tokio::pin!(h1);
153
154        // the client side
155        let address = self.address;
156        let tc = task_client.clone();
157        let h2 = tokio::spawn(async move {
158            self::routes::run_restful(address, tc).await;
159        });
160        tokio::pin!(h2);
161
162        // external interruption using unix/linux signal
163        let h3 = crate::rest::shutdown_signal();
164        tokio::pin!(h3);
165
166        loop {
167            tokio::select! {
168                _res = &mut h1 => {
169                    log_dbg!();
170                }
171                _res = &mut h2 => {
172                    log_dbg!();
173                }
174                _res = &mut h3 => {
175                    info!("User interrupted. Shutting down ...");
176                    let _ = task_client.abort().await;
177                    break;
178                }
179            }
180        }
181        h1.abort();
182        h2.abort();
183    }
184}
185// 63fb876f ends here