ya_runtime_api/server/
service.rs

1use super::RuntimeService;
2use super::{codec, proto, ErrorResponse};
3use crate::server::RuntimeHandler;
4use futures::future::BoxFuture;
5use futures::lock::Mutex;
6use futures::prelude::*;
7use futures::{FutureExt, SinkExt};
8use std::rc::Rc;
9use tokio::io;
10
11async fn handle_command(
12    service: &impl RuntimeService,
13    command: proto::request::Command,
14) -> Result<proto::response::Command, ErrorResponse> {
15    Ok(match command {
16        proto::request::Command::Hello(hello) => {
17            let version = service.hello(&hello.version).await?;
18            proto::response::Command::Hello(proto::response::Hello { version })
19        }
20        proto::request::Command::Run(run) => {
21            proto::response::Command::Run(service.run_process(run).await?)
22        }
23        proto::request::Command::Kill(kill) => {
24            service.kill_process(kill).await?;
25            proto::response::Command::Kill(Default::default())
26        }
27        proto::request::Command::Network(network) => {
28            proto::response::Command::Network(service.create_network(network).await?)
29        }
30        proto::request::Command::Shutdown(_) => {
31            service.shutdown().await?;
32            proto::response::Command::Shutdown(Default::default())
33        }
34    })
35}
36
37async fn handle(service: &impl RuntimeService, request: proto::Request) -> proto::Response {
38    proto::Response {
39        id: request.id,
40        command: Some(if let Some(command) = request.command {
41            match handle_command(service, command).await {
42                Ok(response) => response,
43                Err(err) => proto::response::Command::Error(err),
44            }
45        } else {
46            proto::response::Command::Error(ErrorResponse::msg("unknown command"))
47        }),
48        ..Default::default()
49    }
50}
51
52pub struct EventEmitter {
53    tx: futures::channel::mpsc::Sender<proto::Response>,
54}
55
56impl RuntimeHandler for EventEmitter {
57    fn on_process_status<'a>(&self, status: proto::response::ProcessStatus) -> BoxFuture<'a, ()> {
58        let mut response = proto::Response::default();
59        response.event = true;
60        response.command = Some(proto::response::Command::Status(status));
61        let mut tx = self.tx.clone();
62        async move {
63            if let Err(e) = tx.send(response).await {
64                log::error!("send event failed: {}", e)
65            }
66        }
67        .boxed()
68    }
69
70    fn on_runtime_status<'a>(&self, status: proto::response::RuntimeStatus) -> BoxFuture<'a, ()> {
71        let mut response = proto::Response::default();
72        response.event = true;
73        response.command = Some(proto::response::Command::RtStatus(status));
74        let mut tx = self.tx.clone();
75        async move {
76            if let Err(e) = tx.send(response).await {
77                log::error!("send event failed: {}", e)
78            }
79        }
80        .boxed()
81    }
82}
83
84pub async fn run_async<Factory, FutureRuntime, Runtime>(factory: Factory)
85where
86    Factory: FnOnce(EventEmitter) -> FutureRuntime,
87    FutureRuntime: Future<Output = Runtime>,
88    Runtime: RuntimeService + 'static,
89{
90    log::debug!("server starting");
91    let stdout = io::stdout();
92    let stdin = io::stdin();
93
94    let mut input = codec::Codec::<proto::Request>::stream(stdin);
95    let output = Rc::new(Mutex::new(codec::Codec::<proto::Response>::sink(stdout)));
96    let (tx, mut rx) = futures::channel::mpsc::channel::<proto::Response>(1);
97    let emitter = EventEmitter { tx };
98    let service = Rc::new(factory(emitter).await);
99
100    let local = tokio::task::LocalSet::new();
101
102    local.spawn_local({
103        let output = output.clone();
104        async move {
105            while let Some(event) = rx.next().await {
106                log::trace!("event: {:?}", event);
107                let mut output = output.lock().await;
108                let r = SinkExt::send(&mut *output, event).await;
109                log::trace!("sending event done: {:?}", r);
110            }
111        }
112    });
113
114    local
115        .run_until(async {
116            while let Some(it) = input.next().await {
117                match it {
118                    Ok(request) => {
119                        let service = service.clone();
120                        let output = output.clone();
121                        log::trace!("received request: {:?}", request);
122                        let resp = handle(service.as_ref(), request).await;
123                        log::trace!("response to send: {:?}", resp);
124                        let mut output = output.lock().await;
125                        log::trace!("sending");
126                        let r = SinkExt::send(&mut *output, resp).await;
127                        log::trace!("sending done: {:?}", r);
128                    }
129                    Err(e) => {
130                        log::error!("fail: {}", e);
131                        break;
132                    }
133                }
134            }
135        })
136        .await;
137
138    log::debug!("server stopped");
139}
140
141pub async fn run<Factory, Runtime>(factory: Factory)
142where
143    Factory: Fn(EventEmitter) -> Runtime,
144    Runtime: RuntimeService + 'static,
145{
146    run_async(|e| async { factory(e) }).await
147}