1use super::*;
3use base::{Job, Node};
4mod model;
8use gosh_model::Computed;
12
13#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
14pub enum ComputationResult {
15 JobCompleted(String),
16 JobFailed(String),
17}
18
19impl ComputationResult {
20 pub(crate) fn parse_from_json(x: &str) -> Result<Self> {
21 let computed = serde_json::from_str(&x).with_context(|| format!("invalid json str: {x:?}"))?;
22 Ok(computed)
23 }
24
25 pub fn get_computed_from_str(s: &str) -> Result<Computed> {
26 match Self::parse_from_json(s)? {
27 Self::JobCompleted(s) => {
28 let computed = s.parse()?;
29 Ok(computed)
30 }
31 Self::JobFailed(s) => {
32 bail!("job failed with error message:\n{s:?}");
33 }
34 }
35 }
36}
37mod handlers {
41 use super::*;
42 use crate::rest::AppError;
43 use axum::Json;
44
45 #[axum::debug_handler]
47 pub(super) async fn create_job(Json(job): Json<Job>) -> Result<Json<ComputationResult>, AppError> {
48 match job.submit() {
49 Ok(mut comput) => match comput.wait_for_output().await {
50 Ok(out) => {
51 let ret = ComputationResult::JobCompleted(out);
52 debug!("computation done with: {ret:?}");
53 Ok(Json(ret))
54 }
55 Err(err) => {
56 let msg = format!("{err:?}");
57 let ret = ComputationResult::JobFailed(msg);
58 debug!("computation failed with: {ret:?}");
59 Ok(Json(ret))
60 }
61 },
62 Err(err) => {
63 let msg = format!("failed to create job: {err:?}");
64 error!("{msg}");
65 let ret = ComputationResult::JobFailed(msg);
66 Ok(Json(ret))
67 }
68 }
69 }
70}
71use self::handlers::create_job;
75use axum::Router;
76
77fn app() -> Router {
78 use axum::routing::post;
79
80 Router::new().route("/jobs", post(create_job))
81}
82use crate::Client;
86
87pub struct RemoteComputation {
89 job: Job,
90 client: Client,
91}
92
93impl RemoteComputation {
94 pub async fn wait_for_output(&self) -> Result<String> {
95 debug!("wait output for job {:?}", self.job);
96 let resp = self.client.post("jobs", &self.job).await?;
97 Ok(resp)
98 }
99}
100
101impl Job {
102 pub fn submit_remote(self, node: &Node) -> Result<RemoteComputation> {
104 let client = Client::connect(node);
105 let comput = RemoteComputation { job: self, client };
106
107 Ok(comput)
108 }
109}
110impl server::Server {
114 pub async fn serve_as_worker(&self) -> Result<()> {
116 use crate::rest::shutdown_signal;
117
118 let addr = self.address;
119 println!("Start remote process serivce at {addr:?}");
120 let signal = shutdown_signal();
121 let server = axum::Server::bind(&addr).serve(app().into_make_service());
122 let (tx, rx) = tokio::sync::oneshot::channel();
123 tokio::select! {
124 _ = server => {
125 eprintln!("server closed");
126 }
127 _ = signal => {
128 let _ = tx.send(());
129 eprintln!("user interruption");
130 }
131 }
132
133 Ok(())
139 }
140}
141