gosh_remote/
worker.rs

1// [[file:../remote.note::4b6cf6fa][4b6cf6fa]]
2use super::*;
3use base::{Job, Node};
4// 4b6cf6fa ends here
5
6// [[file:../remote.note::cfe8b623][cfe8b623]]
7mod model;
8// cfe8b623 ends here
9
10// [[file:../remote.note::0688d573][0688d573]]
11use gosh_model::Computed;
12
13#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
14pub enum ComputationResult {
15    JobCompleted(String),
16    JobFailed(String),
17}
18
19impl ComputationResult {
20    pub(crate) fn parse_from_json(x: &str) -> Result<Self> {
21        let computed = serde_json::from_str(&x).with_context(|| format!("invalid json str: {x:?}"))?;
22        Ok(computed)
23    }
24
25    pub fn get_computed_from_str(s: &str) -> Result<Computed> {
26        match Self::parse_from_json(s)? {
27            Self::JobCompleted(s) => {
28                let computed = s.parse()?;
29                Ok(computed)
30            }
31            Self::JobFailed(s) => {
32                bail!("job failed with error message:\n{s:?}");
33            }
34        }
35    }
36}
37// 0688d573 ends here
38
39// [[file:../remote.note::a2266f5f][a2266f5f]]
40mod handlers {
41    use super::*;
42    use crate::rest::AppError;
43    use axum::Json;
44
45    /// Run `job` locally and return stdout on success.
46    #[axum::debug_handler]
47    pub(super) async fn create_job(Json(job): Json<Job>) -> Result<Json<ComputationResult>, AppError> {
48        match job.submit() {
49            Ok(mut comput) => match comput.wait_for_output().await {
50                Ok(out) => {
51                    let ret = ComputationResult::JobCompleted(out);
52                    debug!("computation done with: {ret:?}");
53                    Ok(Json(ret))
54                }
55                Err(err) => {
56                    let msg = format!("{err:?}");
57                    let ret = ComputationResult::JobFailed(msg);
58                    debug!("computation failed with: {ret:?}");
59                    Ok(Json(ret))
60                }
61            },
62            Err(err) => {
63                let msg = format!("failed to create job: {err:?}");
64                error!("{msg}");
65                let ret = ComputationResult::JobFailed(msg);
66                Ok(Json(ret))
67            }
68        }
69    }
70}
71// a2266f5f ends here
72
73// [[file:../remote.note::57eb060f][57eb060f]]
74use self::handlers::create_job;
75use axum::Router;
76
77fn app() -> Router {
78    use axum::routing::post;
79
80    Router::new().route("/jobs", post(create_job))
81}
82// 57eb060f ends here
83
84// [[file:../remote.note::d6f1b9d7][d6f1b9d7]]
85use crate::Client;
86
87/// Submit job remotely using REST api service
88pub struct RemoteComputation {
89    job: Job,
90    client: Client,
91}
92
93impl RemoteComputation {
94    pub async fn wait_for_output(&self) -> Result<String> {
95        debug!("wait output for job {:?}", self.job);
96        let resp = self.client.post("jobs", &self.job).await?;
97        Ok(resp)
98    }
99}
100
101impl Job {
102    /// Remote submission using RESTful service
103    pub fn submit_remote(self, node: &Node) -> Result<RemoteComputation> {
104        let client = Client::connect(node);
105        let comput = RemoteComputation { job: self, client };
106
107        Ok(comput)
108    }
109}
110// d6f1b9d7 ends here
111
112// [[file:../remote.note::9407c3be][9407c3be]]
113impl server::Server {
114    /// Serve as a worker running on local node.
115    pub async fn serve_as_worker(&self) -> Result<()> {
116        use crate::rest::shutdown_signal;
117
118        let addr = self.address;
119        println!("Start remote process serivce at {addr:?}");
120        let signal = shutdown_signal();
121        let server = axum::Server::bind(&addr).serve(app().into_make_service());
122        let (tx, rx) = tokio::sync::oneshot::channel();
123        tokio::select! {
124            _ = server => {
125                eprintln!("server closed");
126            }
127            _ = signal => {
128                let _ = tx.send(());
129                eprintln!("user interruption");
130            }
131        }
132
133        // FIXME: the below will cause immediate exit
134        // let server = axum::Server::bind(&addr)
135        //     .serve(app().into_make_service())
136        //     .with_graceful_shutdown(shutdown_signal());
137
138        Ok(())
139    }
140}
141// 9407c3be ends here