ya_runtime_api/server/
service.rs1use super::RuntimeService;
2use super::{codec, proto, ErrorResponse};
3use crate::server::RuntimeHandler;
4use futures::future::BoxFuture;
5use futures::lock::Mutex;
6use futures::prelude::*;
7use futures::{FutureExt, SinkExt};
8use std::rc::Rc;
9use tokio::io;
10
11async fn handle_command(
12 service: &impl RuntimeService,
13 command: proto::request::Command,
14) -> Result<proto::response::Command, ErrorResponse> {
15 Ok(match command {
16 proto::request::Command::Hello(hello) => {
17 let version = service.hello(&hello.version).await?;
18 proto::response::Command::Hello(proto::response::Hello { version })
19 }
20 proto::request::Command::Run(run) => {
21 proto::response::Command::Run(service.run_process(run).await?)
22 }
23 proto::request::Command::Kill(kill) => {
24 service.kill_process(kill).await?;
25 proto::response::Command::Kill(Default::default())
26 }
27 proto::request::Command::Network(network) => {
28 proto::response::Command::Network(service.create_network(network).await?)
29 }
30 proto::request::Command::Shutdown(_) => {
31 service.shutdown().await?;
32 proto::response::Command::Shutdown(Default::default())
33 }
34 })
35}
36
37async fn handle(service: &impl RuntimeService, request: proto::Request) -> proto::Response {
38 proto::Response {
39 id: request.id,
40 command: Some(if let Some(command) = request.command {
41 match handle_command(service, command).await {
42 Ok(response) => response,
43 Err(err) => proto::response::Command::Error(err),
44 }
45 } else {
46 proto::response::Command::Error(ErrorResponse::msg("unknown command"))
47 }),
48 ..Default::default()
49 }
50}
51
52pub struct EventEmitter {
53 tx: futures::channel::mpsc::Sender<proto::Response>,
54}
55
56impl RuntimeHandler for EventEmitter {
57 fn on_process_status<'a>(&self, status: proto::response::ProcessStatus) -> BoxFuture<'a, ()> {
58 let mut response = proto::Response::default();
59 response.event = true;
60 response.command = Some(proto::response::Command::Status(status));
61 let mut tx = self.tx.clone();
62 async move {
63 if let Err(e) = tx.send(response).await {
64 log::error!("send event failed: {}", e)
65 }
66 }
67 .boxed()
68 }
69
70 fn on_runtime_status<'a>(&self, status: proto::response::RuntimeStatus) -> BoxFuture<'a, ()> {
71 let mut response = proto::Response::default();
72 response.event = true;
73 response.command = Some(proto::response::Command::RtStatus(status));
74 let mut tx = self.tx.clone();
75 async move {
76 if let Err(e) = tx.send(response).await {
77 log::error!("send event failed: {}", e)
78 }
79 }
80 .boxed()
81 }
82}
83
84pub async fn run_async<Factory, FutureRuntime, Runtime>(factory: Factory)
85where
86 Factory: FnOnce(EventEmitter) -> FutureRuntime,
87 FutureRuntime: Future<Output = Runtime>,
88 Runtime: RuntimeService + 'static,
89{
90 log::debug!("server starting");
91 let stdout = io::stdout();
92 let stdin = io::stdin();
93
94 let mut input = codec::Codec::<proto::Request>::stream(stdin);
95 let output = Rc::new(Mutex::new(codec::Codec::<proto::Response>::sink(stdout)));
96 let (tx, mut rx) = futures::channel::mpsc::channel::<proto::Response>(1);
97 let emitter = EventEmitter { tx };
98 let service = Rc::new(factory(emitter).await);
99
100 let local = tokio::task::LocalSet::new();
101
102 local.spawn_local({
103 let output = output.clone();
104 async move {
105 while let Some(event) = rx.next().await {
106 log::trace!("event: {:?}", event);
107 let mut output = output.lock().await;
108 let r = SinkExt::send(&mut *output, event).await;
109 log::trace!("sending event done: {:?}", r);
110 }
111 }
112 });
113
114 local
115 .run_until(async {
116 while let Some(it) = input.next().await {
117 match it {
118 Ok(request) => {
119 let service = service.clone();
120 let output = output.clone();
121 log::trace!("received request: {:?}", request);
122 let resp = handle(service.as_ref(), request).await;
123 log::trace!("response to send: {:?}", resp);
124 let mut output = output.lock().await;
125 log::trace!("sending");
126 let r = SinkExt::send(&mut *output, resp).await;
127 log::trace!("sending done: {:?}", r);
128 }
129 Err(e) => {
130 log::error!("fail: {}", e);
131 break;
132 }
133 }
134 }
135 })
136 .await;
137
138 log::debug!("server stopped");
139}
140
141pub async fn run<Factory, Runtime>(factory: Factory)
142where
143 Factory: Fn(EventEmitter) -> Runtime,
144 Runtime: RuntimeService + 'static,
145{
146 run_async(|e| async { factory(e) }).await
147}