gosh_remote/worker/
model.rs1#![deny(warnings)]
7
8use crate::common::*;
9use gchemol::Molecule;
10use gosh_model::Computed;
11use crate::task::Task;
15type TaskReceiver = crate::task::TaskReceiver<Molecule, Computed>;
17type TxOutput = crate::task::TxOutput<Computed>;
18type TaskState = crate::task::TaskSender<Molecule, Computed>;
22use crate::rest::AppError;
26use axum::extract::State;
27use axum::Json;
28use gosh_model::ChemicalModel;
29
30fn compute_mol_and_send_out(mol: &Molecule, model: &mut impl ChemicalModel, tx: TxOutput) -> Result<()> {
31 let mp = model.compute(mol)?;
32 tx.send(mp).map_err(|err| format_err!("send task out error: {err:?}"))?;
33 Ok(())
34}
35
36#[axum::debug_handler]
37pub async fn compute_mol(
39 State(client): State<TaskState>,
40 Json(mol): Json<Molecule>,
41) -> Result<Json<Computed>, AppError> {
42 let computed = client.send(mol).await?;
43 Ok(Json(computed))
44}
45
46pub(self) async fn serve_incoming_task_with(mut task: TaskReceiver, mut model: impl ChemicalModel) {
48 use crate::task::RemoteIO;
49
50 loop {
51 debug!("wait for new molecule to compute ...");
52 if let Some(RemoteIO(mol, tx_out)) = task.recv().await {
53 debug!("ask client to compute molecule {}", mol.title());
54 if let Err(err) = compute_mol_and_send_out(&mol, &mut model, tx_out) {
55 error!("{err:?}");
56 }
57 } else {
58 info!("Task channel closed for some reason");
59 break;
60 }
61 }
62}
63macro_rules! build_app_with_routes {
67 ($state: expr) => {{
68 use axum::routing::post;
69 axum::Router::new()
70 .route("/mols", post(compute_mol))
71 .with_state($state)
72 .route("/jobs", post(super::create_job))
73 }};
74}
75use crate::client::Client;
79
80impl Client {
81 pub async fn compute_molecule(&self, mol: &Molecule) -> Result<Computed> {
83 info!("Request server to compute molecule {:?}", mol.title());
84 let out = self.post("mols", mol).await?;
85 let computed = serde_json::from_str(&out).with_context(|| format!("invalid json str: {out:?}"))?;
86 Ok(computed)
87 }
88
89 #[tokio::main]
91 pub async fn compute_molecule_blockly(&self, mol: &Molecule) -> Result<Computed> {
92 let computed = self.compute_molecule(mol).await?;
93 Ok(computed)
94 }
95}
96use crate::Server;
100use std::net::SocketAddr;
101
102pub(self) async fn serve_mol_comput_requests(addr: impl Into<SocketAddr>, state: TaskState) {
109 use crate::rest::shutdown_signal;
110
111 let app = build_app_with_routes!(state);
112 if let Err(err) = axum::Server::bind(&addr.into())
113 .serve(app.into_make_service())
114 .with_graceful_shutdown(shutdown_signal())
115 .await
116 {
117 error!("error in restful serivce: {err:?}");
118 }
119}
120impl Server {
124 pub async fn serve_as_chemical_model(&self, model: impl ChemicalModel + Send + 'static) -> Result<()> {
126 let addr = self.address;
127 println!("chemical model computation server listening on {addr:?}");
128
129 let (task_rx, task_tx) = Task::new().split();
130 let h1 = tokio::spawn(async move { serve_mol_comput_requests(addr, task_tx).await });
132 let h2 = tokio::spawn(async move { serve_incoming_task_with(task_rx, model).await });
134 tokio::try_join!(h1, h2)?;
135 Ok(())
136 }
137}
138