1use futures::future::LocalBoxFuture;
2use futures::FutureExt;
3use std::future::Future;
4
5use ya_runtime_api::server::proto::{output::Type, request::RunProcess, Output};
6
7use crate::cli::{Command, CommandCli};
8use crate::common::write_output;
9use crate::context::Context;
10use crate::env::{DefaultEnv, Env};
11use crate::runtime::{Runtime, RuntimeDef, RuntimeMode};
12use crate::server::Server;
13
14#[inline]
16pub async fn run<R>() -> anyhow::Result<()>
17where
18 R: Runtime + Default + 'static,
19{
20 run_with::<R, _>(DefaultEnv::<<R as RuntimeDef>::Cli>::default()).await
21}
22
23pub async fn run_with<R, E>(env: E) -> anyhow::Result<()>
26where
27 R: Runtime + Default + 'static,
28 E: Env<<R as RuntimeDef>::Cli> + Send + 'static,
29{
30 build(env, move |_| async move { Ok(R::default()) }).await
31}
32
33pub fn build<R, E, F, Fut>(env: E, factory: F) -> LocalBoxFuture<'static, anyhow::Result<()>>
36where
37 R: Runtime + 'static,
38 E: Env<<R as RuntimeDef>::Cli> + Send + 'static,
39 F: FnOnce(&mut Context<R>) -> Fut + 'static,
40 Fut: Future<Output = anyhow::Result<R>> + 'static,
41{
42 async move {
43 let set = tokio::task::LocalSet::new();
44 set.run_until(inner::<R, E, _>(env, |ctx| {
45 async move { factory(ctx).await }.boxed_local()
46 }))
47 .await
48 }
49 .boxed_local()
50}
51
52async fn inner<R, E, F>(env: E, factory: F) -> anyhow::Result<()>
53where
54 R: Runtime + 'static,
55 E: Env<<R as RuntimeDef>::Cli> + Send + 'static,
56 F: FnOnce(&mut Context<R>) -> LocalBoxFuture<anyhow::Result<R>>,
57{
58 #[cfg(feature = "logger")]
59 {
60 if let Err(error) = crate::logger::start_file_logger() {
61 crate::logger::start_logger().expect("Failed to start logging");
62 log::warn!("Using fallback logging due to an error: {:?}", error);
63 };
64
65 std::panic::set_hook(Box::new(|e| {
66 log::error!("Runtime panic: {e}");
67 }));
68 }
69
70 let mut ctx = Context::<R>::try_with(env)?;
71 let mut runtime = factory(&mut ctx).await?;
72
73 match ctx.cli.command() {
74 Command::Deploy { .. } => {
75 let deployment = match runtime.deploy(&mut ctx).await? {
76 Some(deployment) => deployment,
77 None => {
78 crate::serialize::json::json!({
79 "startMode": match R::MODE {
80 RuntimeMode::Server => "blocking",
81 RuntimeMode::Command => "empty",
82 },
83 "valid": {"Ok": ""},
84 "vols": []
85 })
86 }
87 };
88 write_output(deployment).await?;
89 }
90 Command::Start { .. } => match R::MODE {
91 RuntimeMode::Command => {
92 if let Some(started) = runtime.start(&mut ctx).await? {
93 write_output(started).await?;
94 }
95 }
96 RuntimeMode::Server => {
97 ya_runtime_api::server::run_async(|emitter| async move {
98 let start = {
99 ctx.set_emitter(emitter);
100 runtime.start(&mut ctx)
101 };
102
103 match start.await {
104 Ok(Some(out)) => {
105 ctx.next_run_ctx().stdout(out.to_string()).await;
106 }
107 Err(err) => {
108 panic!("Failed to start the runtime: {}", err);
109 }
110 _ => (),
111 }
112
113 Server::new(runtime, ctx)
114 })
115 .await;
116 }
117 },
118 Command::Run { args } => {
119 if args.is_empty() {
120 anyhow::bail!("not enough arguments");
121 }
122
123 let mut args = args.clone();
124 let bin = args.remove(0);
125 let capture = Some(Output {
126 r#type: Some(Type::AtEnd(40960)),
127 });
128 let command = RunProcess {
129 bin,
130 args,
131 work_dir: ctx
132 .cli
133 .workdir()
134 .unwrap_or_default()
135 .to_string_lossy()
136 .to_string(),
137 stdout: capture.clone(),
138 stderr: capture,
139 };
140
141 let pid = runtime
142 .run_command(command, RuntimeMode::Command, &mut ctx)
143 .await?;
144
145 if let RuntimeMode::Server = R::MODE {
146 write_output(serde_json::json!(pid)).await?;
147 }
148 }
149 Command::OfferTemplate { .. } => {
150 if let Some(template) = runtime.offer(&mut ctx).await? {
151 write_output(template).await?;
152 }
153 }
154 Command::Test { .. } => runtime.test(&mut ctx).await?,
155 }
156
157 Ok(())
158}