1use super::*;
3
4use base::{Job, Node};
5mod dispatch;
9use crate::Client;
13use std::path::Path;
14
15impl Client {
16 pub async fn run_cmd(&self, cmd: &str, wrk_dir: &Path) -> Result<String> {
18 let wrk_dir = wrk_dir.shell_escape_lossy();
19 #[rustfmt::skip]
20 let script = format!("#! /usr/bin/env bash
21set -x
22cd {wrk_dir}
23{cmd}
24");
25 let job = Job::new(script);
26 let o = self.post("jobs", job).await?;
27
28 Ok(o)
29 }
30
31 pub async fn add_node(&self, node: impl Into<Node>) -> Result<()> {
33 self.post("nodes", node.into()).await?;
34 Ok(())
35 }
36
37 #[tokio::main()]
38 pub(crate) async fn run_cmd_block(&self, cmd: &str, wrk_dir: &Path) -> Result<String> {
40 let s = self.run_cmd(cmd, wrk_dir).await?;
41 Ok(s)
42 }
43}
44mod routes {
48 use super::*;
49 use crate::gchemol::Molecule;
50 use crate::rest::AppError;
51 use crate::worker::ComputationResult;
52 use gosh_model::Computed;
53 use dispatch::TaskClient;
54
55 use axum::extract::State;
56 use axum::Json;
57
58 #[axum::debug_handler]
60 async fn add_node(State(task): State<TaskClient>, Json(node): Json<Node>) -> Result<(), AppError> {
61 task.add_node(node).await?;
62 Ok(())
63 }
64
65 #[axum::debug_handler]
67 async fn add_mol(State(task): State<TaskClient>, Json(mol): Json<Molecule>) -> Result<Json<Computed>, AppError> {
68 let o = task.compute_molecule(mol).await?;
69 Ok(Json(o))
70 }
71
72 #[axum::debug_handler]
74 async fn add_job(
75 State(task): State<TaskClient>,
76 Json(job): Json<Job>,
77 ) -> Result<Json<ComputationResult>, AppError> {
78 let r = task.run_cmd(job).await?;
79 let c = ComputationResult::parse_from_json(&r)?;
80 Ok(Json(c))
81 }
82
83 pub(super) async fn run_restful(addr: impl Into<SocketAddr>, state: TaskClient) -> Result<()> {
84 use axum::routing::post;
85
86 let app = axum::Router::new()
87 .route("/jobs", post(add_job))
88 .with_state(state.clone())
89 .route("/mols", post(add_mol))
90 .with_state(state.clone())
91 .route("/nodes", post(add_node))
92 .with_state(state);
93 let addr = addr.into();
94
95 let x = axum::Server::bind(&addr).serve(app.into_make_service()).await?;
96 Ok(())
97 }
98}
99use gchemol::Molecule;
103
104#[derive(Debug, Clone)]
106enum Jobx {
107 Job(Job),
108 Mol(Molecule),
109}
110
111impl Jobx {
112 fn job_name(&self) -> String {
113 match self {
114 Self::Job(job) => job.name(),
115 Self::Mol(mol) => mol.title(),
116 }
117 }
118
119 async fn run_on(self, node: &Node) -> Result<String> {
120 let client = Client::connect(node);
121 match self {
122 Self::Job(job) => {
123 let o = client.post("jobs", job).await?;
124 Ok(o)
125 }
126 Self::Mol(mol) => {
127 let o = client.post("mols", mol).await?;
128 Ok(o)
129 }
130 }
131 }
132}
133use base::Nodes;
137use server::Server;
138
139impl Server {
140 pub async fn serve_as_scheduler(&self) {
142 println!("scheduler listening on {:?}", self.address);
143
144 let (mut task_server, task_client) = self::dispatch::new_interactive_task();
146 let nodes: Vec<String> = vec![];
147 let h1 = tokio::spawn(async move {
148 if let Err(e) = task_server.run_and_serve(Nodes::new(nodes)).await {
149 error!("task server: {e:?}");
150 }
151 });
152 tokio::pin!(h1);
153
154 let address = self.address;
156 let tc = task_client.clone();
157 let h2 = tokio::spawn(async move {
158 self::routes::run_restful(address, tc).await;
159 });
160 tokio::pin!(h2);
161
162 let h3 = crate::rest::shutdown_signal();
164 tokio::pin!(h3);
165
166 loop {
167 tokio::select! {
168 _res = &mut h1 => {
169 log_dbg!();
170 }
171 _res = &mut h2 => {
172 log_dbg!();
173 }
174 _res = &mut h3 => {
175 info!("User interrupted. Shutting down ...");
176 let _ = task_client.abort().await;
177 break;
178 }
179 }
180 }
181 h1.abort();
182 h2.abort();
183 }
184}
185