eva_sdk/
service.rs

1use busrt::QoS;
2/// Service basics
3use busrt::client::AsyncClient;
4use busrt::rpc::{Rpc, RpcClient, RpcError, RpcEvent, RpcResult};
5use eva_common::SLEEP_STEP;
6use eva_common::acl::OIDMask;
7use eva_common::events::{
8    ANY_STATE_TOPIC, LOCAL_STATE_TOPIC, REMOTE_STATE_TOPIC, SERVICE_STATUS_TOPIC,
9};
10use eva_common::op::Op;
11use eva_common::payload::{pack, unpack};
12use eva_common::prelude::*;
13use eva_common::services;
14use eva_common::services::SERVICE_PAYLOAD_INITIAL;
15use eva_common::services::SERVICE_PAYLOAD_PING;
16pub use eva_sdk_derive::svc_main;
17use log::error;
18use parking_lot::Mutex;
19use serde::{Deserialize, Deserializer};
20use std::future::Future;
21use std::io::Read;
22use std::path::Path;
23use std::sync::{Arc, LazyLock};
24use std::sync::{OnceLock, atomic};
25use std::time::{Duration, Instant};
26use tokio::io::AsyncReadExt;
27#[cfg(not(target_os = "windows"))]
28use tokio::signal::unix::{SignalKind, signal};
29use tokio::task::futures::TaskLocalFuture;
30use tokio::time::sleep;
31use uuid::Uuid;
32
33static BUS_ERROR_SUICIDE_TIMEOUT: OnceLock<Duration> = OnceLock::new();
34
35const ERR_CRITICAL_BUS: &str = "CRITICAL: bus disconnected";
36
37/// Must be called once
38pub fn set_bus_error_suicide_timeout(bes_timeout: Duration) -> EResult<()> {
39    BUS_ERROR_SUICIDE_TIMEOUT
40        .set(bes_timeout)
41        .map_err(|_| Error::failed("Unable to set BUS_ERROR_SUICIDE_TIMEOUT"))
42}
43
44fn deserialize_opt_uuid<'de, D>(deserializer: D) -> Result<Option<Uuid>, D::Error>
45where
46    D: Deserializer<'de>,
47{
48    let val: Value = Deserialize::deserialize(deserializer)?;
49    if val == Value::Unit {
50        Ok(None)
51    } else {
52        Ok(Some(
53            Uuid::deserialize(val).map_err(serde::de::Error::custom)?,
54        ))
55    }
56}
57
58fn is_debug() -> bool {
59    std::env::var("EVA_SVC_DEBUG").unwrap_or_default() == "1"
60}
61
62#[derive(Deserialize)]
63pub struct ExtendedParams {
64    #[serde(deserialize_with = "deserialize_opt_uuid")]
65    call_trace_id: Option<Uuid>,
66}
67
68pub fn process_extended_payload(full_payload: &[u8]) -> EResult<(&[u8], Option<ExtendedParams>)> {
69    if full_payload.len() > 4 && full_payload[0] == 0xc1 && full_payload[1] == 0xc1 {
70        let pos = usize::from(u16::from_le_bytes([full_payload[2], full_payload[3]])) + 4;
71        if full_payload.len() < pos {
72            return Err(Error::invalid_data("invalid extended payload"));
73        }
74        let xp = &full_payload[4..pos];
75        return Ok((&full_payload[pos..], Some(unpack(xp)?)));
76    }
77    Ok((full_payload, None))
78}
79
80#[inline]
81pub fn svc_call_scope<F>(xp: Option<ExtendedParams>, f: F) -> TaskLocalFuture<Option<Uuid>, F>
82where
83    F: Future,
84{
85    eva_common::logger::CALL_TRACE_ID.scope(
86        if let Some(x) = xp {
87            x.call_trace_id
88        } else {
89            None
90        },
91        f,
92    )
93}
94
95/// Helper methods for BUS/RT frame topic
96pub trait BusRtEapiEvent {
97    /// Parse OID from the topic
98    fn parse_oid(&self) -> Option<OID>;
99    /// Get the event kind from the topic
100    fn bus_event(&self) -> BusEventKind;
101    /// Returns true for actual state events (local, remote)
102    fn is_actual_state_event(&self) -> bool {
103        matches!(
104            self.bus_event(),
105            BusEventKind::StateLocal | BusEventKind::StateRemote
106        )
107    }
108}
109
110impl BusRtEapiEvent for busrt::Frame {
111    fn parse_oid(&self) -> Option<OID> {
112        let topic = self.topic()?;
113        if let Some(oid_str) = topic.strip_prefix(LOCAL_STATE_TOPIC) {
114            return OID::from_path(oid_str).ok();
115        }
116        if let Some(oid_str) = topic.strip_prefix(REMOTE_STATE_TOPIC) {
117            return OID::from_path(oid_str).ok();
118        }
119        if let Some(oid_str) = topic.strip_prefix(eva_common::events::REMOTE_ARCHIVE_STATE_TOPIC) {
120            return OID::from_path(oid_str).ok();
121        }
122        None
123    }
124    fn bus_event(&self) -> BusEventKind {
125        let Some(topic) = self.topic() else {
126            return BusEventKind::Other;
127        };
128        if topic.starts_with(eva_common::events::RAW_STATE_TOPIC) {
129            return BusEventKind::StateRaw;
130        }
131        if topic == eva_common::events::RAW_STATE_BULK_TOPIC {
132            return BusEventKind::StateRawBulk;
133        }
134        if topic.starts_with(eva_common::events::LOCAL_STATE_TOPIC) {
135            return BusEventKind::StateLocal;
136        }
137        if topic.starts_with(eva_common::events::REMOTE_STATE_TOPIC) {
138            return BusEventKind::StateRemote;
139        }
140        if topic.starts_with(eva_common::events::REMOTE_ARCHIVE_STATE_TOPIC) {
141            return BusEventKind::StateRemoteArchive;
142        }
143        if topic == eva_common::events::AAA_ACL_TOPIC {
144            return BusEventKind::AaaAcl;
145        }
146        if topic == eva_common::events::AAA_KEY_TOPIC {
147            return BusEventKind::AaaKey;
148        }
149        if topic == eva_common::events::AAA_USER_TOPIC {
150            return BusEventKind::AaaUser;
151        }
152        BusEventKind::Other
153    }
154}
155
156/// Subscription event kind
157#[derive(Debug, Copy, Clone, Eq, PartialEq, Deserialize)]
158#[serde(rename_all = "lowercase")]
159pub enum BusEventKind {
160    StateRaw,
161    StateRawBulk,
162    StateLocal,
163    StateRemote,
164    StateRemoteArchive,
165    AaaAcl,
166    AaaKey,
167    AaaUser,
168    Other,
169}
170
171/// Subscription event kind
172#[derive(Debug, Copy, Clone, Eq, PartialEq, Deserialize)]
173#[serde(rename_all = "lowercase")]
174pub enum EventKind {
175    Local,
176    Remote,
177    Any,
178    Actual,
179}
180
181impl EventKind {
182    #[inline]
183    pub fn topic(&self) -> &str {
184        match self {
185            EventKind::Local => LOCAL_STATE_TOPIC,
186            EventKind::Remote => REMOTE_STATE_TOPIC,
187            EventKind::Any => ANY_STATE_TOPIC,
188            EventKind::Actual => unimplemented!(),
189        }
190    }
191}
192
193static NEED_PANIC: LazyLock<Mutex<Option<Duration>>> = LazyLock::new(<_>::default);
194
195/// Will be deprecated soon. Use eva_sdk::eapi instead
196pub async fn subscribe_oids<'a, R, M>(rpc: &R, masks: M, kind: EventKind) -> EResult<()>
197where
198    R: Rpc,
199    M: IntoIterator<Item = &'a OIDMask>,
200{
201    let topics: Vec<String> = if kind == EventKind::Actual {
202        let mut t = Vec::new();
203        for mask in masks {
204            t.push(format!("{}{}", LOCAL_STATE_TOPIC, mask.as_path()));
205            t.push(format!("{}{}", REMOTE_STATE_TOPIC, mask.as_path()));
206        }
207        t
208    } else {
209        masks
210            .into_iter()
211            .map(|mask| format!("{}{}", kind.topic(), mask.as_path()))
212            .collect()
213    };
214    if topics.is_empty() {
215        return Ok(());
216    }
217    rpc.client()
218        .lock()
219        .await
220        .subscribe_bulk(
221            &topics.iter().map(String::as_str).collect::<Vec<&str>>(),
222            QoS::No,
223        )
224        .await?;
225    Ok(())
226}
227
228/// Will be deprecated soon. Use eva_sdk::eapi instead
229pub async fn unsubscribe_oids<'a, R, M>(rpc: &R, masks: M, kind: EventKind) -> EResult<()>
230where
231    R: Rpc,
232    M: IntoIterator<Item = &'a OIDMask>,
233{
234    let topics: Vec<String> = if kind == EventKind::Actual {
235        let mut t = Vec::new();
236        for mask in masks {
237            t.push(format!("{}{}", LOCAL_STATE_TOPIC, mask.as_path()));
238            t.push(format!("{}{}", REMOTE_STATE_TOPIC, mask.as_path()));
239        }
240        t
241    } else {
242        masks
243            .into_iter()
244            .map(|mask| format!("{}{}", kind.topic(), mask.as_path()))
245            .collect()
246    };
247    if topics.is_empty() {
248        return Ok(());
249    }
250    rpc.client()
251        .lock()
252        .await
253        .unsubscribe_bulk(
254            &topics.iter().map(String::as_str).collect::<Vec<&str>>(),
255            QoS::No,
256        )
257        .await?;
258    Ok(())
259}
260
261/// Will be deprecated soon. Use eva_sdk::eapi instead
262pub async fn exclude_oids<'a, R, M>(rpc: &R, masks: M, kind: EventKind) -> EResult<()>
263where
264    R: Rpc,
265    M: IntoIterator<Item = &'a OIDMask>,
266{
267    let topics: Vec<String> = if kind == EventKind::Actual {
268        let mut t = Vec::new();
269        for mask in masks {
270            t.push(format!("{}{}", LOCAL_STATE_TOPIC, mask.as_path()));
271            t.push(format!("{}{}", REMOTE_STATE_TOPIC, mask.as_path()));
272        }
273        t
274    } else {
275        masks
276            .into_iter()
277            .map(|mask| format!("{}{}", kind.topic(), mask.as_path()))
278            .collect()
279    };
280    if topics.is_empty() {
281        return Ok(());
282    }
283    rpc.client()
284        .lock()
285        .await
286        .exclude_bulk(
287            &topics.iter().map(String::as_str).collect::<Vec<&str>>(),
288            QoS::No,
289        )
290        .await?;
291    Ok(())
292}
293
294pub fn set_poc(panic_in: Option<Duration>) {
295    *NEED_PANIC.lock() = panic_in;
296}
297
298pub fn poc() {
299    if let Some(delay) = *NEED_PANIC.lock() {
300        svc_terminate();
301        bmart::process::suicide(delay + Duration::from_secs(1), false);
302        std::thread::spawn(move || {
303            std::thread::sleep(delay);
304            std::process::exit(1);
305        });
306    }
307}
308
309pub fn svc_handle_default_rpc(method: &str, info: &services::ServiceInfo) -> RpcResult {
310    match method {
311        "test" => Ok(None),
312        "info" => Ok(Some(pack(info)?)),
313        "stop" => {
314            svc_terminate();
315            Ok(None)
316        }
317        _ => Err(RpcError::method(None)),
318    }
319}
320
321/// Will be deprecated soon. Use eva_sdk::eapi instead
322#[inline]
323pub async fn safe_rpc_call(
324    rpc: &RpcClient,
325    target: &str,
326    method: &str,
327    params: busrt::borrow::Cow<'_>,
328    qos: QoS,
329    timeout: Duration,
330) -> EResult<RpcEvent> {
331    tokio::time::timeout(timeout, rpc.call(target, method, params, qos))
332        .await?
333        .map_err(Into::into)
334}
335
336pub(crate) async fn svc_is_core_active(rpc: &RpcClient, timeout: Duration) -> bool {
337    #[derive(Deserialize)]
338    struct TR {
339        active: bool,
340    }
341    if let Ok(ev) = safe_rpc_call(
342        rpc,
343        "eva.core",
344        "test",
345        busrt::empty_payload!(),
346        QoS::Processed,
347        timeout,
348    )
349    .await
350        && let Ok(result) = unpack::<TR>(ev.payload())
351        && result.active
352    {
353        return true;
354    }
355    false
356}
357
358/// Will be deprecated soon. Use eva_sdk::eapi instead
359pub async fn svc_wait_core(
360    rpc: &RpcClient,
361    timeout: Duration,
362    wait_forever: bool,
363) -> EResult<bool> {
364    let wait_until = Instant::now() + timeout;
365    let mut core_inactive = false;
366    let mut int = tokio::time::interval(SLEEP_STEP);
367    loop {
368        int.tick().await;
369        if svc_is_terminating() {
370            return Err(Error::failed(
371                "core load wait aborted, the service is not active",
372            ));
373        }
374        if svc_is_core_active(rpc, timeout).await {
375            return Ok(core_inactive);
376        }
377        core_inactive = true;
378        if !wait_forever && wait_until <= Instant::now() {
379            return Err(Error::timeout());
380        }
381    }
382}
383
384static ACTIVE: atomic::AtomicBool = atomic::AtomicBool::new(false);
385static TERMINATING: atomic::AtomicBool = atomic::AtomicBool::new(false);
386
387#[inline]
388pub fn svc_is_active() -> bool {
389    ACTIVE.load(atomic::Ordering::SeqCst)
390}
391
392#[inline]
393pub fn svc_is_terminating() -> bool {
394    TERMINATING.load(atomic::Ordering::SeqCst)
395}
396
397#[macro_export]
398macro_rules! svc_need_ready {
399    () => {
400        if !svc_is_active() {
401            return;
402        }
403    };
404}
405
406#[macro_export]
407macro_rules! svc_rpc_need_ready {
408    () => {
409        if !svc_is_active() {
410            return Err(Error::not_ready("service is not ready").into());
411        }
412    };
413}
414
415/// Terminate the service (canceling block)
416#[inline]
417pub fn svc_terminate() {
418    ACTIVE.store(false, atomic::Ordering::SeqCst);
419    TERMINATING.store(true, atomic::Ordering::SeqCst);
420}
421
422/// Block the service until terminate is called
423///
424/// Will be deprecated soon. Use eva_sdk::eapi instead
425#[inline]
426pub async fn svc_block(rpc: &RpcClient) {
427    while svc_is_active() {
428        if !rpc.is_connected() {
429            error!("{}", ERR_CRITICAL_BUS);
430            bmart::process::suicide(
431                BUS_ERROR_SUICIDE_TIMEOUT
432                    .get()
433                    .map_or_else(|| Duration::from_secs(0), |v| *v),
434                false,
435            );
436            break;
437        }
438        sleep(SLEEP_STEP).await;
439    }
440}
441
442/// Block the service until terminate is called, checking both primary and secondary RPC
443///
444/// Will be deprecated soon. Use eva_sdk::eapi instead
445#[inline]
446pub async fn svc_block2(rpc: &RpcClient, secondary: &RpcClient) {
447    while svc_is_active() {
448        if !rpc.is_connected() || !secondary.is_connected() {
449            error!("{}", ERR_CRITICAL_BUS);
450            bmart::process::suicide(
451                BUS_ERROR_SUICIDE_TIMEOUT
452                    .get()
453                    .map_or_else(|| Duration::from_secs(0), |v| *v),
454                false,
455            );
456            break;
457        }
458        sleep(SLEEP_STEP).await;
459    }
460}
461
462/// Initializing service logs
463///
464/// Will be deprecated soon. Use eva_sdk::eapi instead
465///
466/// After calling, log macros can be used, all records are transferred to bus LOG/IN/ topics
467///
468/// # Panics
469///
470/// Will panic if the mutex is poisoned
471#[inline]
472pub fn svc_init_logs<C>(
473    initial: &services::Initial,
474    client: Arc<tokio::sync::Mutex<C>>,
475) -> EResult<()>
476where
477    C: ?Sized + AsyncClient + 'static,
478{
479    if is_debug() {
480        env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();
481        Ok(())
482    } else {
483        eva_common::logger::init_bus(
484            client,
485            initial.bus_queue_size(),
486            initial.eva_log_level_filter(),
487            initial.call_tracing(),
488        )
489    }
490}
491
492/// Sends a broadcast event to mark the service ready at launcher and announce neighbors
493///
494/// Will be deprecated soon. Use eva_sdk::eapi instead
495///
496/// # Panics
497///
498/// Will panic only if payload::pack is broken
499pub async fn svc_mark_ready<C>(client: &tokio::sync::Mutex<C>) -> EResult<()>
500where
501    C: ?Sized + AsyncClient + 'static,
502{
503    client
504        .lock()
505        .await
506        .publish(
507            SERVICE_STATUS_TOPIC,
508            pack(&services::ServiceStatusBroadcastEvent::ready())
509                .unwrap()
510                .into(),
511            QoS::Processed,
512        )
513        .await?
514        .unwrap()
515        .await??;
516    ACTIVE.store(true, atomic::Ordering::SeqCst);
517    Ok(())
518}
519
520/// Sends a broadcast event to mark the service terminating at launcher and announce neighbors
521///
522/// Will be deprecated soon. Use eva_sdk::eapi instead
523///
524/// # Panics
525///
526/// Will panic only if payload::pack is broken
527pub async fn svc_mark_terminating<C>(client: &tokio::sync::Mutex<C>) -> EResult<()>
528where
529    C: ?Sized + AsyncClient + 'static,
530{
531    ACTIVE.store(false, atomic::Ordering::SeqCst);
532    TERMINATING.store(true, atomic::Ordering::SeqCst);
533    client
534        .lock()
535        .await
536        .publish(
537            SERVICE_STATUS_TOPIC,
538            pack(&services::ServiceStatusBroadcastEvent::terminating())
539                .unwrap()
540                .into(),
541            QoS::Processed,
542        )
543        .await?
544        .unwrap()
545        .await??;
546    Ok(())
547}
548
549/// Start service signal handlers (SIGTERM and SIGINT)
550///
551/// Calls the terminate method when received
552#[cfg(not(target_os = "windows"))]
553pub fn svc_start_signal_handlers() {
554    macro_rules! handle_signal {
555        ($signal: expr) => {{
556            tokio::spawn(async move {
557                signal($signal).unwrap().recv().await;
558                svc_terminate();
559            });
560        }};
561    }
562    macro_rules! ignore_signal {
563        ($signal: expr) => {{
564            tokio::spawn(async move {
565                loop {
566                    signal($signal).unwrap().recv().await;
567                }
568            });
569        }};
570    }
571    handle_signal!(SignalKind::terminate());
572    handle_signal!(SignalKind::hangup());
573    ignore_signal!(SignalKind::interrupt());
574}
575
576fn process_initial(buf: &[u8]) -> EResult<services::Initial> {
577    let initial: services::Initial = unpack(buf)?;
578    if initial.config_version() != services::SERVICE_CONFIG_VERSION {
579        return Err(Error::not_implemented(format!(
580            "config version not supported: {}",
581            initial.config_version()
582        )));
583    }
584    if initial.eapi_version() != crate::EAPI_VERSION {
585        return Err(Error::not_implemented(format!(
586            "EAPI version not supported: {}",
587            initial.config_version(),
588        )));
589    }
590    Ok(initial)
591}
592
593pub async fn read_initial() -> EResult<services::Initial> {
594    let op = Op::new(eva_common::DEFAULT_TIMEOUT);
595    let mut stdin = tokio::io::stdin();
596    let mut buf = [0_u8; 1];
597    tokio::time::timeout(op.timeout()?, stdin.read_exact(&mut buf)).await??;
598    if buf[0] != SERVICE_PAYLOAD_INITIAL {
599        return Err(Error::invalid_data("invalid payload"));
600    }
601    let mut buf = [0_u8; 4];
602    tokio::time::timeout(op.timeout()?, stdin.read_exact(&mut buf)).await??;
603    let len: usize = u32::from_le_bytes(buf).try_into().map_err(Error::failed)?;
604    let mut buf = vec![0_u8; len];
605    tokio::time::timeout(op.timeout()?, stdin.read_exact(&mut buf)).await??;
606    process_initial(&buf)
607}
608
609pub fn read_initial_sync() -> EResult<services::Initial> {
610    let mut buf = [0_u8; 1];
611    std::io::stdin().read_exact(&mut buf)?;
612    if buf[0] != SERVICE_PAYLOAD_INITIAL {
613        return Err(Error::invalid_data("invalid payload"));
614    }
615    let mut buf = [0_u8; 4];
616    std::io::stdin().read_exact(&mut buf)?;
617    let len: usize = u32::from_le_bytes(buf).try_into().map_err(Error::failed)?;
618    let mut buf = vec![0_u8; len];
619    std::io::stdin().read_exact(&mut buf)?;
620    process_initial(&buf)
621}
622
623#[cfg(target_os = "linux")]
624fn apply_current_thread_params(params: &services::RealtimeConfig) -> EResult<()> {
625    let mut rt_params = rtsc::thread_rt::Params::new().with_cpu_ids(&params.cpu_ids);
626    if let Some(priority) = params.priority {
627        rt_params = rt_params.with_priority(Some(priority));
628        if priority > 0 {
629            rt_params = rt_params.with_scheduling(rtsc::thread_rt::Scheduling::FIFO);
630        }
631    }
632    if let Err(e) = rtsc::thread_rt::apply_for_current(&rt_params) {
633        if matches!(e, rtsc::Error::AccessDenied) {
634            eprintln!("Real-time parameters are not set, the service is not launched as root");
635        } else {
636            return Err(Error::failed(format!(
637                "Real-time priority set error: {}",
638                e
639            )));
640        }
641    }
642    if let Some(prealloc_heap) = params.prealloc_heap {
643        #[cfg(target_env = "gnu")]
644        if let Err(e) = rtsc::thread_rt::preallocate_heap(prealloc_heap) {
645            if matches!(e, rtsc::Error::AccessDenied) {
646                eprintln!("Heap preallocation failed, the service is not launched as root");
647            } else {
648                return Err(Error::failed(format!("Heap preallocation error: {}", e)));
649            }
650        }
651        #[cfg(not(target_env = "gnu"))]
652        if prealloc_heap > 0 {
653            eprintln!("Heap preallocation is supported in native builds only");
654        }
655    }
656    Ok(())
657}
658
659pub fn svc_launch<L, LFut>(launcher: L) -> EResult<()>
660where
661    L: FnMut(services::Initial) -> LFut,
662    LFut: std::future::Future<Output = EResult<()>>,
663{
664    let initial = read_initial_sync()?;
665    #[cfg(target_os = "linux")]
666    apply_current_thread_params(initial.realtime())?;
667    let rt = tokio::runtime::Builder::new_multi_thread()
668        .worker_threads(initial.workers() as usize)
669        .enable_all()
670        .build()?;
671    rt.block_on(launch(launcher, initial))?;
672    Ok(())
673}
674
675fn panic_handler(info: &std::panic::PanicHookInfo) {
676    eprintln!("PANIC: {}", info);
677    // call panic on critical to gracefully terminate the service
678    poc();
679    // park the thread
680    std::thread::park();
681    // in case if the thread has been externally unparked, will block forever
682    loop {
683        std::thread::sleep(Duration::from_secs(1));
684    }
685}
686
687#[cfg(not(target_os = "windows"))]
688async fn launch<L, LFut>(mut launcher: L, mut initial: services::Initial) -> EResult<()>
689where
690    L: FnMut(services::Initial) -> LFut,
691    LFut: std::future::Future<Output = EResult<()>>,
692{
693    eva_common::self_test();
694    std::panic::set_hook(Box::new(panic_handler));
695    let op = Op::new(initial.startup_timeout());
696    let eva_dir = initial.eva_dir().to_owned();
697    initial.init()?;
698    if initial.is_mode_normal() {
699        let shutdown_timeout = initial.shutdown_timeout();
700        let mut stdin = tokio_fd::AsyncFd::try_from(libc::STDIN_FILENO)?;
701        tokio::spawn(async move {
702            let mut buf = [0_u8; 1];
703            let pid = std::process::id();
704            macro_rules! kill {
705                () => {
706                    tokio::spawn(async move {
707                        bmart::process::kill_pstree(pid, Some(shutdown_timeout), true).await;
708                    });
709                    svc_terminate();
710                    break;
711                };
712            }
713            loop {
714                if stdin.read_exact(&mut buf).await.is_ok() {
715                    if buf[0] != SERVICE_PAYLOAD_PING {
716                        kill!();
717                    }
718                } else {
719                    kill!();
720                }
721                tokio::time::sleep(SLEEP_STEP).await;
722            }
723        });
724        if let Some(prepare_command) = initial.prepare_command() {
725            let cmd = format!("cd \"{}\" && {}", eva_dir, prepare_command);
726            let t_o = op.timeout()?.as_secs_f64().to_string();
727            let opts = bmart::process::Options::new()
728                .env("EVA_SYSTEM_NAME", initial.system_name())
729                .env("EVA_DIR", initial.eva_dir())
730                .env("EVA_SVC_ID", initial.id())
731                .env("EVA_SVC_DATA_PATH", initial.data_path().unwrap_or_default())
732                .env("EVA_TIMEOUT", t_o.as_str());
733            let res = bmart::process::command("sh", ["-c", &cmd], op.timeout()?, opts).await?;
734            if !res.ok() {
735                return Err(Error::failed(format!(
736                    "prepare command failed: {}",
737                    res.err.join("\n")
738                )));
739            }
740            for r in res.out {
741                println!("{}", r);
742            }
743            for r in res.err {
744                eprintln!("{}", r);
745            }
746        }
747    } else if !initial.can_rtf() {
748        return Err(Error::failed(
749            "the service is started in react-to-fail mode, but rtf disabled for the service",
750        ));
751    }
752    initial
753        .extend_config(op.timeout()?, Path::new(&eva_dir))
754        .await?;
755    launcher(initial).await
756}