ya_runtime_sdk/
context.rs

1use futures::channel::oneshot;
2use futures::future::BoxFuture;
3use futures::FutureExt;
4use serde::Serialize;
5use std::cell::RefCell;
6use std::future::Future;
7use std::io::Write;
8use std::path::{Path, PathBuf};
9use std::rc::Rc;
10use std::sync::atomic::AtomicU64;
11use std::sync::atomic::Ordering::Relaxed;
12
13use ya_runtime_api::server::{RuntimeCounter, RuntimeHandler, RuntimeState};
14
15use crate::common::{write_output, IntoVec};
16use crate::env::{DefaultEnv, Env};
17use crate::error::Error;
18use crate::event::EventEmitter;
19use crate::runtime::{ProcessId, ProcessIdResponse};
20use crate::runtime::{Runtime, RuntimeControl, RuntimeDef};
21use crate::serialize::json;
22use crate::RuntimeMode;
23
24/// Runtime execution context
25pub struct Context<R: Runtime + ?Sized> {
26    /// Command line parameters
27    pub cli: <R as RuntimeDef>::Cli,
28    /// Configuration read from the configuration file
29    pub conf: <R as RuntimeDef>::Conf,
30    /// Configuration file path
31    pub conf_path: PathBuf,
32    /// Environment instance
33    pub env: Box<dyn Env<<R as RuntimeDef>::Cli>>,
34    /// Event emitter, available when
35    /// `Runtime::MODE == RuntimeMode::Server`
36    /// and
37    /// `command != Command::Deploy`
38    pub emitter: Option<EventEmitter>,
39    /// Process ID sequence
40    pid_seq: AtomicU64,
41    /// Runtime control
42    pub(crate) control: RuntimeControl,
43}
44
45impl<R> Context<R>
46where
47    R: Runtime + ?Sized,
48    <R as RuntimeDef>::Cli: 'static,
49{
50    const CONF_EXTENSIONS: [&'static str; 4] = ["toml", "yaml", "yml", "json"];
51
52    /// Create a new instance with a default environment configuration
53    pub fn try_new() -> anyhow::Result<Self> {
54        Self::try_with(DefaultEnv::default())
55    }
56
57    /// Create a new instance with provided environment configuration
58    pub fn try_with<E>(mut env: E) -> anyhow::Result<Self>
59    where
60        E: Env<<R as RuntimeDef>::Cli> + 'static,
61    {
62        let cli = env.cli(R::NAME, R::VERSION)?;
63        let name = env.runtime_name().unwrap_or_else(|| R::NAME.to_string());
64        let conf_dir = env.data_directory(name.as_str())?;
65        let conf_path = Self::config_path(conf_dir, name.as_str())?;
66
67        let conf = if conf_path.exists() {
68            Self::read_config(&conf_path)?
69        } else {
70            Default::default()
71        };
72
73        Ok(Self {
74            cli,
75            conf,
76            conf_path,
77            env: Box::new(env),
78            emitter: None,
79            pid_seq: Default::default(),
80            control: Default::default(),
81        })
82    }
83
84    /// Read configuration from file
85    pub fn read_config<P: AsRef<Path>>(path: P) -> anyhow::Result<<R as RuntimeDef>::Conf> {
86        use anyhow::Context;
87
88        let path = path.as_ref();
89        let extension = file_extension(path)?;
90        let err = || format!("Unable to read the configuration file: {}", path.display());
91
92        let contents = std::fs::read_to_string(path).with_context(err)?;
93        let conf: <R as RuntimeDef>::Conf = match extension.as_str() {
94            "toml" => toml::de::from_str(&contents).with_context(err)?,
95            "yaml" | "yml" => serde_yaml::from_str(&contents).with_context(err)?,
96            "json" => serde_json::from_str(&contents).with_context(err)?,
97            _ => anyhow::bail!("Unsupported extension: {}", extension),
98        };
99
100        Ok(conf)
101    }
102
103    /// Write configuration to file
104    pub fn write_config<P: AsRef<Path>>(
105        conf: &<R as RuntimeDef>::Conf,
106        path: P,
107    ) -> anyhow::Result<()> {
108        use anyhow::Context;
109
110        let path = path.as_ref();
111        let extension = file_extension(path)?;
112        let err = || format!("Unable to write configuration: {}", path.display());
113
114        let parent_dir = path.parent().ok_or_else(|| {
115            anyhow::anyhow!("Unable to resolve parent directory of {}", path.display())
116        })?;
117        if !parent_dir.exists() {
118            std::fs::create_dir_all(&parent_dir).with_context(err)?;
119        }
120
121        let contents = match extension.as_str() {
122            "toml" => toml::ser::to_string_pretty(conf).with_context(err)?,
123            "yaml" | "yml" => serde_yaml::to_string(conf).with_context(err)?,
124            "json" => serde_json::to_string_pretty(conf).with_context(err)?,
125            _ => anyhow::bail!("Unsupported extension: {}", extension),
126        };
127        std::fs::write(path, contents).with_context(err)?;
128
129        Ok(())
130    }
131
132    /// Return a runtime control object
133    pub fn control(&self) -> RuntimeControl {
134        self.control.clone()
135    }
136
137    fn config_path<P: AsRef<Path>>(dir: P, name: &str) -> anyhow::Result<PathBuf> {
138        let dir = dir.as_ref();
139        let candidates = Self::CONF_EXTENSIONS
140            .iter()
141            .map(|ext| dir.join(format!("{}.{}", name, ext)))
142            .collect::<Vec<_>>();
143        let conf_path = candidates
144            .iter()
145            .find(|path| path.exists())
146            .unwrap_or_else(|| candidates.last().unwrap())
147            .clone();
148
149        Ok(conf_path)
150    }
151
152    pub(crate) fn next_run_ctx(&self) -> RunCommandContext {
153        let id = self.pid_seq.fetch_add(1, Relaxed);
154        RunCommandContext {
155            id,
156            emitter: self.emitter.clone(),
157            control: self.control.clone(),
158        }
159    }
160
161    pub(crate) fn set_emitter(&mut self, emitter: impl RuntimeHandler + Send + Sync + 'static) {
162        self.emitter.replace(EventEmitter::spawn(emitter));
163    }
164
165    pub(crate) fn set_shutdown_tx(&mut self, tx: oneshot::Sender<()>) {
166        self.control.shutdown_tx = Rc::new(RefCell::new(Some(tx)));
167    }
168}
169
170impl<R> Context<R>
171where
172    R: Runtime + ?Sized,
173    <R as RuntimeDef>::Cli: 'static,
174{
175    pub fn command<'a, H, T, Fut>(&mut self, handler: H) -> ProcessIdResponse<'a>
176    where
177        H: (FnOnce(RunCommandContext) -> Fut) + 'static,
178        T: Serialize,
179        Fut: Future<Output = Result<T, Error>> + 'a,
180    {
181        let run_ctx = self.next_run_ctx();
182        run_command(run_ctx, move |run_ctx| {
183            async move {
184                let id = run_ctx.id;
185                let emitter = run_ctx.emitter.clone();
186                let output = handler(run_ctx).await?;
187                let value = json::to_value(&output).map_err(Error::from_string)?;
188
189                if value.is_null() {
190                    return Ok(());
191                }
192
193                match R::MODE {
194                    RuntimeMode::Command => {
195                        let _ = write_output(value).await;
196                    }
197                    RuntimeMode::Server if emitter.is_some() => {
198                        emitter.unwrap().command_stdout(id, value.to_string()).await;
199                    }
200                    RuntimeMode::Server => (),
201                }
202                Ok(())
203            }
204            .boxed_local()
205        })
206    }
207}
208
209/// Command execution handler
210#[derive(Clone)]
211pub struct RunCommandContext {
212    pub(crate) id: ProcessId,
213    pub(crate) emitter: Option<EventEmitter>,
214    pub(crate) control: RuntimeControl,
215}
216
217impl RunCommandContext {
218    /// Get command ID
219    pub fn id(&self) -> &ProcessId {
220        &self.id
221    }
222
223    pub(crate) fn started(&mut self) -> BoxFuture<()> {
224        let id = self.id;
225        self.emitter
226            .as_mut()
227            .map(|e| e.command_started(id))
228            .unwrap_or_else(|| futures::future::ready(()).boxed())
229    }
230
231    pub(crate) fn stopped(&mut self, return_code: i32) -> BoxFuture<()> {
232        let id = self.id;
233        self.emitter
234            .as_mut()
235            .map(|e| e.command_stopped(id, return_code))
236            .unwrap_or_else(|| futures::future::ready(()).boxed())
237    }
238
239    /// Emit a RUN command output event (stdout)
240    pub fn stdout(&mut self, output: impl IntoVec<u8>) -> BoxFuture<()> {
241        let id = self.id;
242        let output = output.into_vec();
243        match self.emitter {
244            Some(ref mut e) => e.command_stdout(id, output),
245            None => Self::print_output(output),
246        }
247    }
248
249    /// Emit a RUN command output event (stderr)
250    pub fn stderr(&mut self, output: impl IntoVec<u8>) -> BoxFuture<()> {
251        let id = self.id;
252        let output = output.into_vec();
253        match self.emitter {
254            Some(ref mut e) => e.command_stderr(id, output),
255            None => Self::print_output(output),
256        }
257    }
258
259    /// Emit a STATE event
260    pub fn state(&mut self, name: String, value: json::Value) -> BoxFuture<Result<(), Error>> {
261        match self.emitter {
262            Some(ref mut e) => async move {
263                let json_str = json::to_string(&value)
264                    .map_err(|e| anyhow::anyhow!("Serialization error: {}", e))?;
265                let json_bytes = json_str.into_bytes();
266
267                e.state(RuntimeState {
268                    name,
269                    value: json_bytes,
270                })
271                .await;
272
273                Ok(())
274            }
275            .boxed(),
276            None => futures::future::ok(()).boxed(),
277        }
278    }
279
280    /// Emit a COUNTER event
281    pub fn counter(&mut self, name: String, value: f64) -> BoxFuture<()> {
282        match self.emitter {
283            Some(ref mut e) => e.counter(RuntimeCounter { name, value }),
284            None => futures::future::ready(()).boxed(),
285        }
286    }
287
288    /// Return runtime control object
289    pub fn control(&self) -> RuntimeControl {
290        self.control.clone()
291    }
292
293    fn print_output<'a>(output: impl IntoVec<u8>) -> BoxFuture<'a, ()> {
294        let mut stdout = std::io::stdout();
295        let _ = stdout.write_all(output.into_vec().as_slice());
296        let _ = stdout.flush();
297        futures::future::ready(()).boxed()
298    }
299}
300
301/// Wraps command lifecycle in the following manner:
302/// - manages command sequence numbers
303/// - emits command start & stop events
304/// - provides a RunCommandContext object for easier output event emission
305pub trait RunCommandExt<R: Runtime + ?Sized> {
306    type Item: 'static;
307
308    #[allow(clippy::wrong_self_convention)]
309    /// Wrap `self` in `run_command`
310    fn as_command<'a, H, Fh>(self, ctx: &mut Context<R>, handler: H) -> ProcessIdResponse<'a>
311    where
312        H: (FnOnce(Self::Item, RunCommandContext) -> Fh) + 'static,
313        Fh: Future<Output = Result<(), Error>> + 'static;
314}
315
316/// Implements `RunCommandExt` for `Future`s outputting `Result`s.
317/// The output result is checked prior to emitting any command lifecycle events.
318impl<R, F, Rt, Re> RunCommandExt<R> for F
319where
320    R: Runtime + ?Sized,
321    <R as RuntimeDef>::Cli: 'static,
322    F: Future<Output = Result<Rt, Re>> + 'static,
323    Rt: 'static,
324    Re: 'static,
325    Error: From<Re>,
326{
327    type Item = Rt;
328
329    fn as_command<'a, H, Fh>(self, ctx: &mut Context<R>, handler: H) -> ProcessIdResponse<'a>
330    where
331        H: (FnOnce(Self::Item, RunCommandContext) -> Fh) + 'static,
332        Fh: Future<Output = Result<(), Error>> + 'static,
333    {
334        let run_ctx = ctx.next_run_ctx();
335        async move {
336            let value = self.await?;
337            run_command(run_ctx, move |run_ctx| async move {
338                handler(value, run_ctx).await
339            })
340            .await
341        }
342        .boxed_local()
343    }
344}
345
346fn run_command<'a, H, F>(mut run_ctx: RunCommandContext, handler: H) -> ProcessIdResponse<'a>
347where
348    H: (FnOnce(RunCommandContext) -> F) + 'static,
349    F: Future<Output = Result<(), Error>> + 'static,
350{
351    async move {
352        let pid = run_ctx.id;
353        run_ctx.started().await;
354
355        let fut = handler(run_ctx.clone());
356        tokio::task::spawn_local(async move {
357            let return_code = fut.await.is_err() as i32;
358            run_ctx.stopped(return_code).await;
359        });
360
361        Ok(pid)
362    }
363    .boxed_local()
364}
365
366fn file_extension<P: AsRef<Path>>(path: P) -> anyhow::Result<String> {
367    Ok(path
368        .as_ref()
369        .extension()
370        .ok_or_else(|| anyhow::anyhow!("Invalid config path"))?
371        .to_string_lossy()
372        .to_lowercase())
373}