gosh_remote/worker/
model.rs

1// [[file:../../remote.note::6d0794c1][6d0794c1]]
2//! For remote computation of molecules with RESTful web services
3// 6d0794c1 ends here
4
5// [[file:../../remote.note::3d2c01c2][3d2c01c2]]
6#![deny(warnings)]
7
8use crate::common::*;
9use gchemol::Molecule;
10use gosh_model::Computed;
11// 3d2c01c2 ends here
12
13// [[file:../../remote.note::ccbf3ca9][ccbf3ca9]]
14use crate::task::Task;
15// type Task = crate::task::Task<Molecule, Computed>;
16type TaskReceiver = crate::task::TaskReceiver<Molecule, Computed>;
17type TxOutput = crate::task::TxOutput<Computed>;
18// ccbf3ca9 ends here
19
20// [[file:../../remote.note::ad35d99c][ad35d99c]]
21type TaskState = crate::task::TaskSender<Molecule, Computed>;
22// ad35d99c ends here
23
24// [[file:../../remote.note::7157f9ad][7157f9ad]]
25use crate::rest::AppError;
26use axum::extract::State;
27use axum::Json;
28use gosh_model::ChemicalModel;
29
30fn compute_mol_and_send_out(mol: &Molecule, model: &mut impl ChemicalModel, tx: TxOutput) -> Result<()> {
31    let mp = model.compute(mol)?;
32    tx.send(mp).map_err(|err| format_err!("send task out error: {err:?}"))?;
33    Ok(())
34}
35
36#[axum::debug_handler]
37/// Handle compute molecule request from client side
38pub async fn compute_mol(
39    State(client): State<TaskState>,
40    Json(mol): Json<Molecule>,
41) -> Result<Json<Computed>, AppError> {
42    let computed = client.send(mol).await?;
43    Ok(Json(computed))
44}
45
46/// Wait for incoming task and compute received Molecule using ChemicalModel
47pub(self) async fn serve_incoming_task_with(mut task: TaskReceiver, mut model: impl ChemicalModel) {
48    use crate::task::RemoteIO;
49
50    loop {
51        debug!("wait for new molecule to compute ...");
52        if let Some(RemoteIO(mol, tx_out)) = task.recv().await {
53            debug!("ask client to compute molecule {}", mol.title());
54            if let Err(err) = compute_mol_and_send_out(&mol, &mut model, tx_out) {
55                error!("{err:?}");
56            }
57        } else {
58            info!("Task channel closed for some reason");
59            break;
60        }
61    }
62}
63// 7157f9ad ends here
64
65// [[file:../../remote.note::59c3364a][59c3364a]]
66macro_rules! build_app_with_routes {
67    ($state: expr) => {{
68        use axum::routing::post;
69        axum::Router::new()
70            .route("/mols", post(compute_mol))
71            .with_state($state)
72            .route("/jobs", post(super::create_job))
73    }};
74}
75// 59c3364a ends here
76
77// [[file:../../remote.note::285a8db0][285a8db0]]
78use crate::client::Client;
79
80impl Client {
81    /// Request remote server compute `mol` and return computed results.
82    pub async fn compute_molecule(&self, mol: &Molecule) -> Result<Computed> {
83        info!("Request server to compute molecule {:?}", mol.title());
84        let out = self.post("mols", mol).await?;
85        let computed = serde_json::from_str(&out).with_context(|| format!("invalid json str: {out:?}"))?;
86        Ok(computed)
87    }
88
89    /// Request remote server compute `mol` and return computed results.
90    #[tokio::main]
91    pub async fn compute_molecule_blockly(&self, mol: &Molecule) -> Result<Computed> {
92        let computed = self.compute_molecule(mol).await?;
93        Ok(computed)
94    }
95}
96// 285a8db0 ends here
97
98// [[file:../../remote.note::389c909a][389c909a]]
99use crate::Server;
100use std::net::SocketAddr;
101
102/// Start restful service for molecule computation
103///
104/// # Parameters
105///
106/// * addr: socket address to bind
107/// * state: shared state between route handlers
108pub(self) async fn serve_mol_comput_requests(addr: impl Into<SocketAddr>, state: TaskState) {
109    use crate::rest::shutdown_signal;
110
111    let app = build_app_with_routes!(state);
112    if let Err(err) = axum::Server::bind(&addr.into())
113        .serve(app.into_make_service())
114        .with_graceful_shutdown(shutdown_signal())
115        .await
116    {
117        error!("error in restful serivce: {err:?}");
118    }
119}
120// 389c909a ends here
121
122// [[file:../../remote.note::f4a1566d][f4a1566d]]
123impl Server {
124    /// Serve as a computation server for chemical model.
125    pub async fn serve_as_chemical_model(&self, model: impl ChemicalModel + Send + 'static) -> Result<()> {
126        let addr = self.address;
127        println!("chemical model computation server listening on {addr:?}");
128
129        let (task_rx, task_tx) = Task::new().split();
130        // serve incoming requests for computation of mol
131        let h1 = tokio::spawn(async move { serve_mol_comput_requests(addr, task_tx).await });
132        // handle real computation using chemical model
133        let h2 = tokio::spawn(async move { serve_incoming_task_with(task_rx, model).await });
134        tokio::try_join!(h1, h2)?;
135        Ok(())
136    }
137}
138// f4a1566d ends here