1use futures::channel::oneshot;
2use futures::future::BoxFuture;
3use futures::FutureExt;
4use serde::Serialize;
5use std::cell::RefCell;
6use std::future::Future;
7use std::io::Write;
8use std::path::{Path, PathBuf};
9use std::rc::Rc;
10use std::sync::atomic::AtomicU64;
11use std::sync::atomic::Ordering::Relaxed;
12
13use ya_runtime_api::server::{RuntimeCounter, RuntimeHandler, RuntimeState};
14
15use crate::common::{write_output, IntoVec};
16use crate::env::{DefaultEnv, Env};
17use crate::error::Error;
18use crate::event::EventEmitter;
19use crate::runtime::{ProcessId, ProcessIdResponse};
20use crate::runtime::{Runtime, RuntimeControl, RuntimeDef};
21use crate::serialize::json;
22use crate::RuntimeMode;
23
24pub struct Context<R: Runtime + ?Sized> {
26 pub cli: <R as RuntimeDef>::Cli,
28 pub conf: <R as RuntimeDef>::Conf,
30 pub conf_path: PathBuf,
32 pub env: Box<dyn Env<<R as RuntimeDef>::Cli>>,
34 pub emitter: Option<EventEmitter>,
39 pid_seq: AtomicU64,
41 pub(crate) control: RuntimeControl,
43}
44
45impl<R> Context<R>
46where
47 R: Runtime + ?Sized,
48 <R as RuntimeDef>::Cli: 'static,
49{
50 const CONF_EXTENSIONS: [&'static str; 4] = ["toml", "yaml", "yml", "json"];
51
52 pub fn try_new() -> anyhow::Result<Self> {
54 Self::try_with(DefaultEnv::default())
55 }
56
57 pub fn try_with<E>(mut env: E) -> anyhow::Result<Self>
59 where
60 E: Env<<R as RuntimeDef>::Cli> + 'static,
61 {
62 let cli = env.cli(R::NAME, R::VERSION)?;
63 let name = env.runtime_name().unwrap_or_else(|| R::NAME.to_string());
64 let conf_dir = env.data_directory(name.as_str())?;
65 let conf_path = Self::config_path(conf_dir, name.as_str())?;
66
67 let conf = if conf_path.exists() {
68 Self::read_config(&conf_path)?
69 } else {
70 Default::default()
71 };
72
73 Ok(Self {
74 cli,
75 conf,
76 conf_path,
77 env: Box::new(env),
78 emitter: None,
79 pid_seq: Default::default(),
80 control: Default::default(),
81 })
82 }
83
84 pub fn read_config<P: AsRef<Path>>(path: P) -> anyhow::Result<<R as RuntimeDef>::Conf> {
86 use anyhow::Context;
87
88 let path = path.as_ref();
89 let extension = file_extension(path)?;
90 let err = || format!("Unable to read the configuration file: {}", path.display());
91
92 let contents = std::fs::read_to_string(path).with_context(err)?;
93 let conf: <R as RuntimeDef>::Conf = match extension.as_str() {
94 "toml" => toml::de::from_str(&contents).with_context(err)?,
95 "yaml" | "yml" => serde_yaml::from_str(&contents).with_context(err)?,
96 "json" => serde_json::from_str(&contents).with_context(err)?,
97 _ => anyhow::bail!("Unsupported extension: {}", extension),
98 };
99
100 Ok(conf)
101 }
102
103 pub fn write_config<P: AsRef<Path>>(
105 conf: &<R as RuntimeDef>::Conf,
106 path: P,
107 ) -> anyhow::Result<()> {
108 use anyhow::Context;
109
110 let path = path.as_ref();
111 let extension = file_extension(path)?;
112 let err = || format!("Unable to write configuration: {}", path.display());
113
114 let parent_dir = path.parent().ok_or_else(|| {
115 anyhow::anyhow!("Unable to resolve parent directory of {}", path.display())
116 })?;
117 if !parent_dir.exists() {
118 std::fs::create_dir_all(&parent_dir).with_context(err)?;
119 }
120
121 let contents = match extension.as_str() {
122 "toml" => toml::ser::to_string_pretty(conf).with_context(err)?,
123 "yaml" | "yml" => serde_yaml::to_string(conf).with_context(err)?,
124 "json" => serde_json::to_string_pretty(conf).with_context(err)?,
125 _ => anyhow::bail!("Unsupported extension: {}", extension),
126 };
127 std::fs::write(path, contents).with_context(err)?;
128
129 Ok(())
130 }
131
132 pub fn control(&self) -> RuntimeControl {
134 self.control.clone()
135 }
136
137 fn config_path<P: AsRef<Path>>(dir: P, name: &str) -> anyhow::Result<PathBuf> {
138 let dir = dir.as_ref();
139 let candidates = Self::CONF_EXTENSIONS
140 .iter()
141 .map(|ext| dir.join(format!("{}.{}", name, ext)))
142 .collect::<Vec<_>>();
143 let conf_path = candidates
144 .iter()
145 .find(|path| path.exists())
146 .unwrap_or_else(|| candidates.last().unwrap())
147 .clone();
148
149 Ok(conf_path)
150 }
151
152 pub(crate) fn next_run_ctx(&self) -> RunCommandContext {
153 let id = self.pid_seq.fetch_add(1, Relaxed);
154 RunCommandContext {
155 id,
156 emitter: self.emitter.clone(),
157 control: self.control.clone(),
158 }
159 }
160
161 pub(crate) fn set_emitter(&mut self, emitter: impl RuntimeHandler + Send + Sync + 'static) {
162 self.emitter.replace(EventEmitter::spawn(emitter));
163 }
164
165 pub(crate) fn set_shutdown_tx(&mut self, tx: oneshot::Sender<()>) {
166 self.control.shutdown_tx = Rc::new(RefCell::new(Some(tx)));
167 }
168}
169
170impl<R> Context<R>
171where
172 R: Runtime + ?Sized,
173 <R as RuntimeDef>::Cli: 'static,
174{
175 pub fn command<'a, H, T, Fut>(&mut self, handler: H) -> ProcessIdResponse<'a>
176 where
177 H: (FnOnce(RunCommandContext) -> Fut) + 'static,
178 T: Serialize,
179 Fut: Future<Output = Result<T, Error>> + 'a,
180 {
181 let run_ctx = self.next_run_ctx();
182 run_command(run_ctx, move |run_ctx| {
183 async move {
184 let id = run_ctx.id;
185 let emitter = run_ctx.emitter.clone();
186 let output = handler(run_ctx).await?;
187 let value = json::to_value(&output).map_err(Error::from_string)?;
188
189 if value.is_null() {
190 return Ok(());
191 }
192
193 match R::MODE {
194 RuntimeMode::Command => {
195 let _ = write_output(value).await;
196 }
197 RuntimeMode::Server if emitter.is_some() => {
198 emitter.unwrap().command_stdout(id, value.to_string()).await;
199 }
200 RuntimeMode::Server => (),
201 }
202 Ok(())
203 }
204 .boxed_local()
205 })
206 }
207}
208
209#[derive(Clone)]
211pub struct RunCommandContext {
212 pub(crate) id: ProcessId,
213 pub(crate) emitter: Option<EventEmitter>,
214 pub(crate) control: RuntimeControl,
215}
216
217impl RunCommandContext {
218 pub fn id(&self) -> &ProcessId {
220 &self.id
221 }
222
223 pub(crate) fn started(&mut self) -> BoxFuture<()> {
224 let id = self.id;
225 self.emitter
226 .as_mut()
227 .map(|e| e.command_started(id))
228 .unwrap_or_else(|| futures::future::ready(()).boxed())
229 }
230
231 pub(crate) fn stopped(&mut self, return_code: i32) -> BoxFuture<()> {
232 let id = self.id;
233 self.emitter
234 .as_mut()
235 .map(|e| e.command_stopped(id, return_code))
236 .unwrap_or_else(|| futures::future::ready(()).boxed())
237 }
238
239 pub fn stdout(&mut self, output: impl IntoVec<u8>) -> BoxFuture<()> {
241 let id = self.id;
242 let output = output.into_vec();
243 match self.emitter {
244 Some(ref mut e) => e.command_stdout(id, output),
245 None => Self::print_output(output),
246 }
247 }
248
249 pub fn stderr(&mut self, output: impl IntoVec<u8>) -> BoxFuture<()> {
251 let id = self.id;
252 let output = output.into_vec();
253 match self.emitter {
254 Some(ref mut e) => e.command_stderr(id, output),
255 None => Self::print_output(output),
256 }
257 }
258
259 pub fn state(&mut self, name: String, value: json::Value) -> BoxFuture<Result<(), Error>> {
261 match self.emitter {
262 Some(ref mut e) => async move {
263 let json_str = json::to_string(&value)
264 .map_err(|e| anyhow::anyhow!("Serialization error: {}", e))?;
265 let json_bytes = json_str.into_bytes();
266
267 e.state(RuntimeState {
268 name,
269 value: json_bytes,
270 })
271 .await;
272
273 Ok(())
274 }
275 .boxed(),
276 None => futures::future::ok(()).boxed(),
277 }
278 }
279
280 pub fn counter(&mut self, name: String, value: f64) -> BoxFuture<()> {
282 match self.emitter {
283 Some(ref mut e) => e.counter(RuntimeCounter { name, value }),
284 None => futures::future::ready(()).boxed(),
285 }
286 }
287
288 pub fn control(&self) -> RuntimeControl {
290 self.control.clone()
291 }
292
293 fn print_output<'a>(output: impl IntoVec<u8>) -> BoxFuture<'a, ()> {
294 let mut stdout = std::io::stdout();
295 let _ = stdout.write_all(output.into_vec().as_slice());
296 let _ = stdout.flush();
297 futures::future::ready(()).boxed()
298 }
299}
300
301pub trait RunCommandExt<R: Runtime + ?Sized> {
306 type Item: 'static;
307
308 #[allow(clippy::wrong_self_convention)]
309 fn as_command<'a, H, Fh>(self, ctx: &mut Context<R>, handler: H) -> ProcessIdResponse<'a>
311 where
312 H: (FnOnce(Self::Item, RunCommandContext) -> Fh) + 'static,
313 Fh: Future<Output = Result<(), Error>> + 'static;
314}
315
316impl<R, F, Rt, Re> RunCommandExt<R> for F
319where
320 R: Runtime + ?Sized,
321 <R as RuntimeDef>::Cli: 'static,
322 F: Future<Output = Result<Rt, Re>> + 'static,
323 Rt: 'static,
324 Re: 'static,
325 Error: From<Re>,
326{
327 type Item = Rt;
328
329 fn as_command<'a, H, Fh>(self, ctx: &mut Context<R>, handler: H) -> ProcessIdResponse<'a>
330 where
331 H: (FnOnce(Self::Item, RunCommandContext) -> Fh) + 'static,
332 Fh: Future<Output = Result<(), Error>> + 'static,
333 {
334 let run_ctx = ctx.next_run_ctx();
335 async move {
336 let value = self.await?;
337 run_command(run_ctx, move |run_ctx| async move {
338 handler(value, run_ctx).await
339 })
340 .await
341 }
342 .boxed_local()
343 }
344}
345
346fn run_command<'a, H, F>(mut run_ctx: RunCommandContext, handler: H) -> ProcessIdResponse<'a>
347where
348 H: (FnOnce(RunCommandContext) -> F) + 'static,
349 F: Future<Output = Result<(), Error>> + 'static,
350{
351 async move {
352 let pid = run_ctx.id;
353 run_ctx.started().await;
354
355 let fut = handler(run_ctx.clone());
356 tokio::task::spawn_local(async move {
357 let return_code = fut.await.is_err() as i32;
358 run_ctx.stopped(return_code).await;
359 });
360
361 Ok(pid)
362 }
363 .boxed_local()
364}
365
366fn file_extension<P: AsRef<Path>>(path: P) -> anyhow::Result<String> {
367 Ok(path
368 .as_ref()
369 .extension()
370 .ok_or_else(|| anyhow::anyhow!("Invalid config path"))?
371 .to_string_lossy()
372 .to_lowercase())
373}