ya_runtime_sdk/
server.rs

1use std::cell::RefCell;
2use std::rc::Rc;
3
4use futures::channel::oneshot;
5use futures::{FutureExt, TryFutureExt};
6use ya_runtime_api::server::proto::response::create_network::Endpoint;
7use ya_runtime_api::server::{
8    AsyncResponse, CreateNetwork, CreateNetworkResp, KillProcess, RunProcess, RunProcessResp,
9    RuntimeService,
10};
11
12pub use ya_runtime_api::deploy::ContainerEndpoint;
13
14use crate::runtime::RuntimeMode;
15use crate::{Context, Runtime, RuntimeDef};
16
17pub struct Server<R: Runtime> {
18    pub(crate) runtime: Rc<RefCell<R>>,
19    pub(crate) ctx: Rc<RefCell<Context<R>>>,
20}
21
22impl<R: Runtime + 'static> Server<R> {
23    pub fn new(runtime: R, mut ctx: Context<R>) -> Self {
24        let (tx, rx) = oneshot::channel();
25        ctx.set_shutdown_tx(tx);
26
27        let server = Self {
28            runtime: Rc::new(RefCell::new(runtime)),
29            ctx: Rc::new(RefCell::new(ctx)),
30        };
31
32        server.shutdown_on(rx);
33        server
34    }
35
36    pub fn shutdown_on(&self, rx: oneshot::Receiver<()>) {
37        let server = self.clone();
38        tokio::task::spawn_local(rx.then(move |result| async move {
39            if result.is_ok() {
40                let _ = server.shutdown().await;
41                std::process::exit(0);
42            }
43        }));
44    }
45}
46
47impl<R: Runtime> Clone for Server<R> {
48    fn clone(&self) -> Self {
49        Self {
50            runtime: self.runtime.clone(),
51            ctx: self.ctx.clone(),
52        }
53    }
54}
55
56impl<R: Runtime> RuntimeService for Server<R> {
57    fn hello(&self, _version: &str) -> AsyncResponse<'_, String> {
58        async { Ok(<R as RuntimeDef>::VERSION.to_owned()) }.boxed_local()
59    }
60
61    fn run_process(&self, run: RunProcess) -> AsyncResponse<'_, RunProcessResp> {
62        let mut runtime = self.runtime.borrow_mut();
63        let mut ctx = self.ctx.borrow_mut();
64        runtime
65            .run_command(run, RuntimeMode::Server, &mut ctx)
66            .then(|result| async move {
67                match result {
68                    Ok(pid) => Ok(RunProcessResp { pid }),
69                    Err(err) => Err(err.into()),
70                }
71            })
72            .boxed_local()
73    }
74
75    fn kill_process(&self, kill: KillProcess) -> AsyncResponse<'_, ()> {
76        let mut runtime = self.runtime.borrow_mut();
77        let mut ctx = self.ctx.borrow_mut();
78        runtime
79            .kill_command(kill, &mut ctx)
80            .map_err(Into::into)
81            .boxed_local()
82    }
83
84    fn create_network(&self, network: CreateNetwork) -> AsyncResponse<'_, CreateNetworkResp> {
85        let mut runtime = self.runtime.borrow_mut();
86        let mut ctx = self.ctx.borrow_mut();
87        runtime
88            .join_network(network, &mut ctx)
89            .map(|result| {
90                result.map(|e| CreateNetworkResp {
91                    endpoint: match &e {
92                        ContainerEndpoint::UnixStream(_) => {
93                            Some(Endpoint::UnixStream(e.to_string()))
94                        }
95                        ContainerEndpoint::UnixDatagram(_) => {
96                            Some(Endpoint::UnixDatagram(e.to_string()))
97                        }
98                        ContainerEndpoint::UdpDatagram(_) => {
99                            Some(Endpoint::UdpDatagram(e.to_string()))
100                        }
101                        ContainerEndpoint::TcpStream(_) => Some(Endpoint::TcpStream(e.to_string())),
102                        _ => None,
103                    },
104                })
105            })
106            .map_err(Into::into)
107            .boxed_local()
108    }
109
110    fn shutdown(&self) -> AsyncResponse<'_, ()> {
111        let mut runtime = self.runtime.borrow_mut();
112        let mut ctx = self.ctx.borrow_mut();
113        runtime.stop(&mut ctx).map_err(Into::into).boxed_local()
114    }
115}