1use std::cell::RefCell;
2use std::rc::Rc;
3
4use futures::channel::oneshot;
5use futures::{FutureExt, TryFutureExt};
6use ya_runtime_api::server::proto::response::create_network::Endpoint;
7use ya_runtime_api::server::{
8 AsyncResponse, CreateNetwork, CreateNetworkResp, KillProcess, RunProcess, RunProcessResp,
9 RuntimeService,
10};
11
12pub use ya_runtime_api::deploy::ContainerEndpoint;
13
14use crate::runtime::RuntimeMode;
15use crate::{Context, Runtime, RuntimeDef};
16
17pub struct Server<R: Runtime> {
18 pub(crate) runtime: Rc<RefCell<R>>,
19 pub(crate) ctx: Rc<RefCell<Context<R>>>,
20}
21
22impl<R: Runtime + 'static> Server<R> {
23 pub fn new(runtime: R, mut ctx: Context<R>) -> Self {
24 let (tx, rx) = oneshot::channel();
25 ctx.set_shutdown_tx(tx);
26
27 let server = Self {
28 runtime: Rc::new(RefCell::new(runtime)),
29 ctx: Rc::new(RefCell::new(ctx)),
30 };
31
32 server.shutdown_on(rx);
33 server
34 }
35
36 pub fn shutdown_on(&self, rx: oneshot::Receiver<()>) {
37 let server = self.clone();
38 tokio::task::spawn_local(rx.then(move |result| async move {
39 if result.is_ok() {
40 let _ = server.shutdown().await;
41 std::process::exit(0);
42 }
43 }));
44 }
45}
46
47impl<R: Runtime> Clone for Server<R> {
48 fn clone(&self) -> Self {
49 Self {
50 runtime: self.runtime.clone(),
51 ctx: self.ctx.clone(),
52 }
53 }
54}
55
56impl<R: Runtime> RuntimeService for Server<R> {
57 fn hello(&self, _version: &str) -> AsyncResponse<'_, String> {
58 async { Ok(<R as RuntimeDef>::VERSION.to_owned()) }.boxed_local()
59 }
60
61 fn run_process(&self, run: RunProcess) -> AsyncResponse<'_, RunProcessResp> {
62 let mut runtime = self.runtime.borrow_mut();
63 let mut ctx = self.ctx.borrow_mut();
64 runtime
65 .run_command(run, RuntimeMode::Server, &mut ctx)
66 .then(|result| async move {
67 match result {
68 Ok(pid) => Ok(RunProcessResp { pid }),
69 Err(err) => Err(err.into()),
70 }
71 })
72 .boxed_local()
73 }
74
75 fn kill_process(&self, kill: KillProcess) -> AsyncResponse<'_, ()> {
76 let mut runtime = self.runtime.borrow_mut();
77 let mut ctx = self.ctx.borrow_mut();
78 runtime
79 .kill_command(kill, &mut ctx)
80 .map_err(Into::into)
81 .boxed_local()
82 }
83
84 fn create_network(&self, network: CreateNetwork) -> AsyncResponse<'_, CreateNetworkResp> {
85 let mut runtime = self.runtime.borrow_mut();
86 let mut ctx = self.ctx.borrow_mut();
87 runtime
88 .join_network(network, &mut ctx)
89 .map(|result| {
90 result.map(|e| CreateNetworkResp {
91 endpoint: match &e {
92 ContainerEndpoint::UnixStream(_) => {
93 Some(Endpoint::UnixStream(e.to_string()))
94 }
95 ContainerEndpoint::UnixDatagram(_) => {
96 Some(Endpoint::UnixDatagram(e.to_string()))
97 }
98 ContainerEndpoint::UdpDatagram(_) => {
99 Some(Endpoint::UdpDatagram(e.to_string()))
100 }
101 ContainerEndpoint::TcpStream(_) => Some(Endpoint::TcpStream(e.to_string())),
102 _ => None,
103 },
104 })
105 })
106 .map_err(Into::into)
107 .boxed_local()
108 }
109
110 fn shutdown(&self) -> AsyncResponse<'_, ()> {
111 let mut runtime = self.runtime.borrow_mut();
112 let mut ctx = self.ctx.borrow_mut();
113 runtime.stop(&mut ctx).map_err(Into::into).boxed_local()
114 }
115}