ya_runtime_sdk/
runner.rs

1use futures::future::LocalBoxFuture;
2use futures::FutureExt;
3use std::future::Future;
4
5use ya_runtime_api::server::proto::{output::Type, request::RunProcess, Output};
6
7use crate::cli::{Command, CommandCli};
8use crate::common::write_output;
9use crate::context::Context;
10use crate::env::{DefaultEnv, Env};
11use crate::runtime::{Runtime, RuntimeDef, RuntimeMode};
12use crate::server::Server;
13
14/// Starts the runtime within a new `tokio::task::LocalSet`
15#[inline]
16pub async fn run<R>() -> anyhow::Result<()>
17where
18    R: Runtime + Default + 'static,
19{
20    run_with::<R, _>(DefaultEnv::<<R as RuntimeDef>::Cli>::default()).await
21}
22
23/// Starts the runtime within a new `tokio::task::LocalSet`,
24/// using a custom environment configuration provider
25pub async fn run_with<R, E>(env: E) -> anyhow::Result<()>
26where
27    R: Runtime + Default + 'static,
28    E: Env<<R as RuntimeDef>::Cli> + Send + 'static,
29{
30    build(env, move |_| async move { Ok(R::default()) }).await
31}
32
33/// Creates a new runtime execution future within a new `tokio::task::LocalSet`,
34/// using a custom environment configuration provider and a runtime factory
35pub fn build<R, E, F, Fut>(env: E, factory: F) -> LocalBoxFuture<'static, anyhow::Result<()>>
36where
37    R: Runtime + 'static,
38    E: Env<<R as RuntimeDef>::Cli> + Send + 'static,
39    F: FnOnce(&mut Context<R>) -> Fut + 'static,
40    Fut: Future<Output = anyhow::Result<R>> + 'static,
41{
42    async move {
43        let set = tokio::task::LocalSet::new();
44        set.run_until(inner::<R, E, _>(env, |ctx| {
45            async move { factory(ctx).await }.boxed_local()
46        }))
47        .await
48    }
49    .boxed_local()
50}
51
52async fn inner<R, E, F>(env: E, factory: F) -> anyhow::Result<()>
53where
54    R: Runtime + 'static,
55    E: Env<<R as RuntimeDef>::Cli> + Send + 'static,
56    F: FnOnce(&mut Context<R>) -> LocalBoxFuture<anyhow::Result<R>>,
57{
58    #[cfg(feature = "logger")]
59    {
60        if let Err(error) = crate::logger::start_file_logger() {
61            crate::logger::start_logger().expect("Failed to start logging");
62            log::warn!("Using fallback logging due to an error: {:?}", error);
63        };
64
65        std::panic::set_hook(Box::new(|e| {
66            log::error!("Runtime panic: {e}");
67        }));
68    }
69
70    let mut ctx = Context::<R>::try_with(env)?;
71    let mut runtime = factory(&mut ctx).await?;
72
73    match ctx.cli.command() {
74        Command::Deploy { .. } => {
75            let deployment = match runtime.deploy(&mut ctx).await? {
76                Some(deployment) => deployment,
77                None => {
78                    crate::serialize::json::json!({
79                        "startMode": match R::MODE {
80                            RuntimeMode::Server => "blocking",
81                            RuntimeMode::Command => "empty",
82                        },
83                        "valid": {"Ok": ""},
84                        "vols": []
85                    })
86                }
87            };
88            write_output(deployment).await?;
89        }
90        Command::Start { .. } => match R::MODE {
91            RuntimeMode::Command => {
92                if let Some(started) = runtime.start(&mut ctx).await? {
93                    write_output(started).await?;
94                }
95            }
96            RuntimeMode::Server => {
97                ya_runtime_api::server::run_async(|emitter| async move {
98                    let start = {
99                        ctx.set_emitter(emitter);
100                        runtime.start(&mut ctx)
101                    };
102
103                    match start.await {
104                        Ok(Some(out)) => {
105                            ctx.next_run_ctx().stdout(out.to_string()).await;
106                        }
107                        Err(err) => {
108                            panic!("Failed to start the runtime: {}", err);
109                        }
110                        _ => (),
111                    }
112
113                    Server::new(runtime, ctx)
114                })
115                .await;
116            }
117        },
118        Command::Run { args } => {
119            if args.is_empty() {
120                anyhow::bail!("not enough arguments");
121            }
122
123            let mut args = args.clone();
124            let bin = args.remove(0);
125            let capture = Some(Output {
126                r#type: Some(Type::AtEnd(40960)),
127            });
128            let command = RunProcess {
129                bin,
130                args,
131                work_dir: ctx
132                    .cli
133                    .workdir()
134                    .unwrap_or_default()
135                    .to_string_lossy()
136                    .to_string(),
137                stdout: capture.clone(),
138                stderr: capture,
139            };
140
141            let pid = runtime
142                .run_command(command, RuntimeMode::Command, &mut ctx)
143                .await?;
144
145            if let RuntimeMode::Server = R::MODE {
146                write_output(serde_json::json!(pid)).await?;
147            }
148        }
149        Command::OfferTemplate { .. } => {
150            if let Some(template) = runtime.offer(&mut ctx).await? {
151                write_output(template).await?;
152            }
153        }
154        Command::Test { .. } => runtime.test(&mut ctx).await?,
155    }
156
157    Ok(())
158}