1use busrt::QoS;
2use busrt::client::AsyncClient;
4use busrt::rpc::{Rpc, RpcClient, RpcError, RpcEvent, RpcResult};
5use eva_common::SLEEP_STEP;
6use eva_common::acl::OIDMask;
7use eva_common::events::{
8 ANY_STATE_TOPIC, LOCAL_STATE_TOPIC, REMOTE_STATE_TOPIC, SERVICE_STATUS_TOPIC,
9};
10use eva_common::op::Op;
11use eva_common::payload::{pack, unpack};
12use eva_common::prelude::*;
13use eva_common::services;
14use eva_common::services::SERVICE_PAYLOAD_INITIAL;
15use eva_common::services::SERVICE_PAYLOAD_PING;
16pub use eva_sdk_derive::svc_main;
17use log::error;
18use parking_lot::Mutex;
19use serde::{Deserialize, Deserializer};
20use std::future::Future;
21use std::io::Read;
22use std::path::Path;
23use std::sync::{Arc, LazyLock};
24use std::sync::{OnceLock, atomic};
25use std::time::{Duration, Instant};
26use tokio::io::AsyncReadExt;
27#[cfg(not(target_os = "windows"))]
28use tokio::signal::unix::{SignalKind, signal};
29use tokio::task::futures::TaskLocalFuture;
30use tokio::time::sleep;
31use uuid::Uuid;
32
33static BUS_ERROR_SUICIDE_TIMEOUT: OnceLock<Duration> = OnceLock::new();
34
35const ERR_CRITICAL_BUS: &str = "CRITICAL: bus disconnected";
36
37pub fn set_bus_error_suicide_timeout(bes_timeout: Duration) -> EResult<()> {
39 BUS_ERROR_SUICIDE_TIMEOUT
40 .set(bes_timeout)
41 .map_err(|_| Error::failed("Unable to set BUS_ERROR_SUICIDE_TIMEOUT"))
42}
43
44fn deserialize_opt_uuid<'de, D>(deserializer: D) -> Result<Option<Uuid>, D::Error>
45where
46 D: Deserializer<'de>,
47{
48 let val: Value = Deserialize::deserialize(deserializer)?;
49 if val == Value::Unit {
50 Ok(None)
51 } else {
52 Ok(Some(
53 Uuid::deserialize(val).map_err(serde::de::Error::custom)?,
54 ))
55 }
56}
57
58fn is_debug() -> bool {
59 std::env::var("EVA_SVC_DEBUG").unwrap_or_default() == "1"
60}
61
62#[derive(Deserialize)]
63pub struct ExtendedParams {
64 #[serde(deserialize_with = "deserialize_opt_uuid")]
65 call_trace_id: Option<Uuid>,
66}
67
68pub fn process_extended_payload(full_payload: &[u8]) -> EResult<(&[u8], Option<ExtendedParams>)> {
69 if full_payload.len() > 4 && full_payload[0] == 0xc1 && full_payload[1] == 0xc1 {
70 let pos = usize::from(u16::from_le_bytes([full_payload[2], full_payload[3]])) + 4;
71 if full_payload.len() < pos {
72 return Err(Error::invalid_data("invalid extended payload"));
73 }
74 let xp = &full_payload[4..pos];
75 return Ok((&full_payload[pos..], Some(unpack(xp)?)));
76 }
77 Ok((full_payload, None))
78}
79
80#[inline]
81pub fn svc_call_scope<F>(xp: Option<ExtendedParams>, f: F) -> TaskLocalFuture<Option<Uuid>, F>
82where
83 F: Future,
84{
85 eva_common::logger::CALL_TRACE_ID.scope(
86 if let Some(x) = xp {
87 x.call_trace_id
88 } else {
89 None
90 },
91 f,
92 )
93}
94
95pub trait BusRtEapiEvent {
97 fn parse_oid(&self) -> Option<OID>;
99 fn bus_event(&self) -> BusEventKind;
101 fn is_actual_state_event(&self) -> bool {
103 matches!(
104 self.bus_event(),
105 BusEventKind::StateLocal | BusEventKind::StateRemote
106 )
107 }
108}
109
110impl BusRtEapiEvent for busrt::Frame {
111 fn parse_oid(&self) -> Option<OID> {
112 let topic = self.topic()?;
113 if let Some(oid_str) = topic.strip_prefix(LOCAL_STATE_TOPIC) {
114 return OID::from_path(oid_str).ok();
115 }
116 if let Some(oid_str) = topic.strip_prefix(REMOTE_STATE_TOPIC) {
117 return OID::from_path(oid_str).ok();
118 }
119 if let Some(oid_str) = topic.strip_prefix(eva_common::events::REMOTE_ARCHIVE_STATE_TOPIC) {
120 return OID::from_path(oid_str).ok();
121 }
122 None
123 }
124 fn bus_event(&self) -> BusEventKind {
125 let Some(topic) = self.topic() else {
126 return BusEventKind::Other;
127 };
128 if topic.starts_with(eva_common::events::RAW_STATE_TOPIC) {
129 return BusEventKind::StateRaw;
130 }
131 if topic == eva_common::events::RAW_STATE_BULK_TOPIC {
132 return BusEventKind::StateRawBulk;
133 }
134 if topic.starts_with(eva_common::events::LOCAL_STATE_TOPIC) {
135 return BusEventKind::StateLocal;
136 }
137 if topic.starts_with(eva_common::events::REMOTE_STATE_TOPIC) {
138 return BusEventKind::StateRemote;
139 }
140 if topic.starts_with(eva_common::events::REMOTE_ARCHIVE_STATE_TOPIC) {
141 return BusEventKind::StateRemoteArchive;
142 }
143 if topic == eva_common::events::AAA_ACL_TOPIC {
144 return BusEventKind::AaaAcl;
145 }
146 if topic == eva_common::events::AAA_KEY_TOPIC {
147 return BusEventKind::AaaKey;
148 }
149 if topic == eva_common::events::AAA_USER_TOPIC {
150 return BusEventKind::AaaUser;
151 }
152 BusEventKind::Other
153 }
154}
155
156#[derive(Debug, Copy, Clone, Eq, PartialEq, Deserialize)]
158#[serde(rename_all = "lowercase")]
159pub enum BusEventKind {
160 StateRaw,
161 StateRawBulk,
162 StateLocal,
163 StateRemote,
164 StateRemoteArchive,
165 AaaAcl,
166 AaaKey,
167 AaaUser,
168 Other,
169}
170
171#[derive(Debug, Copy, Clone, Eq, PartialEq, Deserialize)]
173#[serde(rename_all = "lowercase")]
174pub enum EventKind {
175 Local,
176 Remote,
177 Any,
178 Actual,
179}
180
181impl EventKind {
182 #[inline]
183 pub fn topic(&self) -> &str {
184 match self {
185 EventKind::Local => LOCAL_STATE_TOPIC,
186 EventKind::Remote => REMOTE_STATE_TOPIC,
187 EventKind::Any => ANY_STATE_TOPIC,
188 EventKind::Actual => unimplemented!(),
189 }
190 }
191}
192
193static NEED_PANIC: LazyLock<Mutex<Option<Duration>>> = LazyLock::new(<_>::default);
194
195pub async fn subscribe_oids<'a, R, M>(rpc: &R, masks: M, kind: EventKind) -> EResult<()>
197where
198 R: Rpc,
199 M: IntoIterator<Item = &'a OIDMask>,
200{
201 let topics: Vec<String> = if kind == EventKind::Actual {
202 let mut t = Vec::new();
203 for mask in masks {
204 t.push(format!("{}{}", LOCAL_STATE_TOPIC, mask.as_path()));
205 t.push(format!("{}{}", REMOTE_STATE_TOPIC, mask.as_path()));
206 }
207 t
208 } else {
209 masks
210 .into_iter()
211 .map(|mask| format!("{}{}", kind.topic(), mask.as_path()))
212 .collect()
213 };
214 if topics.is_empty() {
215 return Ok(());
216 }
217 rpc.client()
218 .lock()
219 .await
220 .subscribe_bulk(
221 &topics.iter().map(String::as_str).collect::<Vec<&str>>(),
222 QoS::No,
223 )
224 .await?;
225 Ok(())
226}
227
228pub async fn unsubscribe_oids<'a, R, M>(rpc: &R, masks: M, kind: EventKind) -> EResult<()>
230where
231 R: Rpc,
232 M: IntoIterator<Item = &'a OIDMask>,
233{
234 let topics: Vec<String> = if kind == EventKind::Actual {
235 let mut t = Vec::new();
236 for mask in masks {
237 t.push(format!("{}{}", LOCAL_STATE_TOPIC, mask.as_path()));
238 t.push(format!("{}{}", REMOTE_STATE_TOPIC, mask.as_path()));
239 }
240 t
241 } else {
242 masks
243 .into_iter()
244 .map(|mask| format!("{}{}", kind.topic(), mask.as_path()))
245 .collect()
246 };
247 if topics.is_empty() {
248 return Ok(());
249 }
250 rpc.client()
251 .lock()
252 .await
253 .unsubscribe_bulk(
254 &topics.iter().map(String::as_str).collect::<Vec<&str>>(),
255 QoS::No,
256 )
257 .await?;
258 Ok(())
259}
260
261pub async fn exclude_oids<'a, R, M>(rpc: &R, masks: M, kind: EventKind) -> EResult<()>
263where
264 R: Rpc,
265 M: IntoIterator<Item = &'a OIDMask>,
266{
267 let topics: Vec<String> = if kind == EventKind::Actual {
268 let mut t = Vec::new();
269 for mask in masks {
270 t.push(format!("{}{}", LOCAL_STATE_TOPIC, mask.as_path()));
271 t.push(format!("{}{}", REMOTE_STATE_TOPIC, mask.as_path()));
272 }
273 t
274 } else {
275 masks
276 .into_iter()
277 .map(|mask| format!("{}{}", kind.topic(), mask.as_path()))
278 .collect()
279 };
280 if topics.is_empty() {
281 return Ok(());
282 }
283 rpc.client()
284 .lock()
285 .await
286 .exclude_bulk(
287 &topics.iter().map(String::as_str).collect::<Vec<&str>>(),
288 QoS::No,
289 )
290 .await?;
291 Ok(())
292}
293
294pub fn set_poc(panic_in: Option<Duration>) {
295 *NEED_PANIC.lock() = panic_in;
296}
297
298pub fn poc() {
299 if let Some(delay) = *NEED_PANIC.lock() {
300 svc_terminate();
301 bmart::process::suicide(delay + Duration::from_secs(1), false);
302 std::thread::spawn(move || {
303 std::thread::sleep(delay);
304 std::process::exit(1);
305 });
306 }
307}
308
309pub fn svc_handle_default_rpc(method: &str, info: &services::ServiceInfo) -> RpcResult {
310 match method {
311 "test" => Ok(None),
312 "info" => Ok(Some(pack(info)?)),
313 "stop" => {
314 svc_terminate();
315 Ok(None)
316 }
317 _ => Err(RpcError::method(None)),
318 }
319}
320
321#[inline]
323pub async fn safe_rpc_call(
324 rpc: &RpcClient,
325 target: &str,
326 method: &str,
327 params: busrt::borrow::Cow<'_>,
328 qos: QoS,
329 timeout: Duration,
330) -> EResult<RpcEvent> {
331 tokio::time::timeout(timeout, rpc.call(target, method, params, qos))
332 .await?
333 .map_err(Into::into)
334}
335
336pub(crate) async fn svc_is_core_active(rpc: &RpcClient, timeout: Duration) -> bool {
337 #[derive(Deserialize)]
338 struct TR {
339 active: bool,
340 }
341 if let Ok(ev) = safe_rpc_call(
342 rpc,
343 "eva.core",
344 "test",
345 busrt::empty_payload!(),
346 QoS::Processed,
347 timeout,
348 )
349 .await
350 && let Ok(result) = unpack::<TR>(ev.payload())
351 && result.active
352 {
353 return true;
354 }
355 false
356}
357
358pub async fn svc_wait_core(
360 rpc: &RpcClient,
361 timeout: Duration,
362 wait_forever: bool,
363) -> EResult<bool> {
364 let wait_until = Instant::now() + timeout;
365 let mut core_inactive = false;
366 let mut int = tokio::time::interval(SLEEP_STEP);
367 loop {
368 int.tick().await;
369 if svc_is_terminating() {
370 return Err(Error::failed(
371 "core load wait aborted, the service is not active",
372 ));
373 }
374 if svc_is_core_active(rpc, timeout).await {
375 return Ok(core_inactive);
376 }
377 core_inactive = true;
378 if !wait_forever && wait_until <= Instant::now() {
379 return Err(Error::timeout());
380 }
381 }
382}
383
384static ACTIVE: atomic::AtomicBool = atomic::AtomicBool::new(false);
385static TERMINATING: atomic::AtomicBool = atomic::AtomicBool::new(false);
386
387#[inline]
388pub fn svc_is_active() -> bool {
389 ACTIVE.load(atomic::Ordering::SeqCst)
390}
391
392#[inline]
393pub fn svc_is_terminating() -> bool {
394 TERMINATING.load(atomic::Ordering::SeqCst)
395}
396
397#[macro_export]
398macro_rules! svc_need_ready {
399 () => {
400 if !svc_is_active() {
401 return;
402 }
403 };
404}
405
406#[macro_export]
407macro_rules! svc_rpc_need_ready {
408 () => {
409 if !svc_is_active() {
410 return Err(Error::not_ready("service is not ready").into());
411 }
412 };
413}
414
415#[inline]
417pub fn svc_terminate() {
418 ACTIVE.store(false, atomic::Ordering::SeqCst);
419 TERMINATING.store(true, atomic::Ordering::SeqCst);
420}
421
422#[inline]
426pub async fn svc_block(rpc: &RpcClient) {
427 while svc_is_active() {
428 if !rpc.is_connected() {
429 error!("{}", ERR_CRITICAL_BUS);
430 bmart::process::suicide(
431 BUS_ERROR_SUICIDE_TIMEOUT
432 .get()
433 .map_or_else(|| Duration::from_secs(0), |v| *v),
434 false,
435 );
436 break;
437 }
438 sleep(SLEEP_STEP).await;
439 }
440}
441
442#[inline]
446pub async fn svc_block2(rpc: &RpcClient, secondary: &RpcClient) {
447 while svc_is_active() {
448 if !rpc.is_connected() || !secondary.is_connected() {
449 error!("{}", ERR_CRITICAL_BUS);
450 bmart::process::suicide(
451 BUS_ERROR_SUICIDE_TIMEOUT
452 .get()
453 .map_or_else(|| Duration::from_secs(0), |v| *v),
454 false,
455 );
456 break;
457 }
458 sleep(SLEEP_STEP).await;
459 }
460}
461
462#[inline]
472pub fn svc_init_logs<C>(
473 initial: &services::Initial,
474 client: Arc<tokio::sync::Mutex<C>>,
475) -> EResult<()>
476where
477 C: ?Sized + AsyncClient + 'static,
478{
479 if is_debug() {
480 env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();
481 Ok(())
482 } else {
483 eva_common::logger::init_bus(
484 client,
485 initial.bus_queue_size(),
486 initial.eva_log_level_filter(),
487 initial.call_tracing(),
488 )
489 }
490}
491
492pub async fn svc_mark_ready<C>(client: &tokio::sync::Mutex<C>) -> EResult<()>
500where
501 C: ?Sized + AsyncClient + 'static,
502{
503 client
504 .lock()
505 .await
506 .publish(
507 SERVICE_STATUS_TOPIC,
508 pack(&services::ServiceStatusBroadcastEvent::ready())
509 .unwrap()
510 .into(),
511 QoS::Processed,
512 )
513 .await?
514 .unwrap()
515 .await??;
516 ACTIVE.store(true, atomic::Ordering::SeqCst);
517 Ok(())
518}
519
520pub async fn svc_mark_terminating<C>(client: &tokio::sync::Mutex<C>) -> EResult<()>
528where
529 C: ?Sized + AsyncClient + 'static,
530{
531 ACTIVE.store(false, atomic::Ordering::SeqCst);
532 TERMINATING.store(true, atomic::Ordering::SeqCst);
533 client
534 .lock()
535 .await
536 .publish(
537 SERVICE_STATUS_TOPIC,
538 pack(&services::ServiceStatusBroadcastEvent::terminating())
539 .unwrap()
540 .into(),
541 QoS::Processed,
542 )
543 .await?
544 .unwrap()
545 .await??;
546 Ok(())
547}
548
549#[cfg(not(target_os = "windows"))]
553pub fn svc_start_signal_handlers() {
554 macro_rules! handle_signal {
555 ($signal: expr) => {{
556 tokio::spawn(async move {
557 signal($signal).unwrap().recv().await;
558 svc_terminate();
559 });
560 }};
561 }
562 macro_rules! ignore_signal {
563 ($signal: expr) => {{
564 tokio::spawn(async move {
565 loop {
566 signal($signal).unwrap().recv().await;
567 }
568 });
569 }};
570 }
571 handle_signal!(SignalKind::terminate());
572 handle_signal!(SignalKind::hangup());
573 ignore_signal!(SignalKind::interrupt());
574}
575
576fn process_initial(buf: &[u8]) -> EResult<services::Initial> {
577 let initial: services::Initial = unpack(buf)?;
578 if initial.config_version() != services::SERVICE_CONFIG_VERSION {
579 return Err(Error::not_implemented(format!(
580 "config version not supported: {}",
581 initial.config_version()
582 )));
583 }
584 if initial.eapi_version() != crate::EAPI_VERSION {
585 return Err(Error::not_implemented(format!(
586 "EAPI version not supported: {}",
587 initial.config_version(),
588 )));
589 }
590 Ok(initial)
591}
592
593pub async fn read_initial() -> EResult<services::Initial> {
594 let op = Op::new(eva_common::DEFAULT_TIMEOUT);
595 let mut stdin = tokio::io::stdin();
596 let mut buf = [0_u8; 1];
597 tokio::time::timeout(op.timeout()?, stdin.read_exact(&mut buf)).await??;
598 if buf[0] != SERVICE_PAYLOAD_INITIAL {
599 return Err(Error::invalid_data("invalid payload"));
600 }
601 let mut buf = [0_u8; 4];
602 tokio::time::timeout(op.timeout()?, stdin.read_exact(&mut buf)).await??;
603 let len: usize = u32::from_le_bytes(buf).try_into().map_err(Error::failed)?;
604 let mut buf = vec![0_u8; len];
605 tokio::time::timeout(op.timeout()?, stdin.read_exact(&mut buf)).await??;
606 process_initial(&buf)
607}
608
609pub fn read_initial_sync() -> EResult<services::Initial> {
610 let mut buf = [0_u8; 1];
611 std::io::stdin().read_exact(&mut buf)?;
612 if buf[0] != SERVICE_PAYLOAD_INITIAL {
613 return Err(Error::invalid_data("invalid payload"));
614 }
615 let mut buf = [0_u8; 4];
616 std::io::stdin().read_exact(&mut buf)?;
617 let len: usize = u32::from_le_bytes(buf).try_into().map_err(Error::failed)?;
618 let mut buf = vec![0_u8; len];
619 std::io::stdin().read_exact(&mut buf)?;
620 process_initial(&buf)
621}
622
623#[cfg(target_os = "linux")]
624fn apply_current_thread_params(params: &services::RealtimeConfig) -> EResult<()> {
625 let mut rt_params = rtsc::thread_rt::Params::new().with_cpu_ids(¶ms.cpu_ids);
626 if let Some(priority) = params.priority {
627 rt_params = rt_params.with_priority(Some(priority));
628 if priority > 0 {
629 rt_params = rt_params.with_scheduling(rtsc::thread_rt::Scheduling::FIFO);
630 }
631 }
632 if let Err(e) = rtsc::thread_rt::apply_for_current(&rt_params) {
633 if matches!(e, rtsc::Error::AccessDenied) {
634 eprintln!("Real-time parameters are not set, the service is not launched as root");
635 } else {
636 return Err(Error::failed(format!(
637 "Real-time priority set error: {}",
638 e
639 )));
640 }
641 }
642 if let Some(prealloc_heap) = params.prealloc_heap {
643 #[cfg(target_env = "gnu")]
644 if let Err(e) = rtsc::thread_rt::preallocate_heap(prealloc_heap) {
645 if matches!(e, rtsc::Error::AccessDenied) {
646 eprintln!("Heap preallocation failed, the service is not launched as root");
647 } else {
648 return Err(Error::failed(format!("Heap preallocation error: {}", e)));
649 }
650 }
651 #[cfg(not(target_env = "gnu"))]
652 if prealloc_heap > 0 {
653 eprintln!("Heap preallocation is supported in native builds only");
654 }
655 }
656 Ok(())
657}
658
659pub fn svc_launch<L, LFut>(launcher: L) -> EResult<()>
660where
661 L: FnMut(services::Initial) -> LFut,
662 LFut: std::future::Future<Output = EResult<()>>,
663{
664 let initial = read_initial_sync()?;
665 #[cfg(target_os = "linux")]
666 apply_current_thread_params(initial.realtime())?;
667 let rt = tokio::runtime::Builder::new_multi_thread()
668 .worker_threads(initial.workers() as usize)
669 .enable_all()
670 .build()?;
671 rt.block_on(launch(launcher, initial))?;
672 Ok(())
673}
674
675fn panic_handler(info: &std::panic::PanicHookInfo) {
676 eprintln!("PANIC: {}", info);
677 poc();
679 std::thread::park();
681 loop {
683 std::thread::sleep(Duration::from_secs(1));
684 }
685}
686
687#[cfg(not(target_os = "windows"))]
688async fn launch<L, LFut>(mut launcher: L, mut initial: services::Initial) -> EResult<()>
689where
690 L: FnMut(services::Initial) -> LFut,
691 LFut: std::future::Future<Output = EResult<()>>,
692{
693 eva_common::self_test();
694 std::panic::set_hook(Box::new(panic_handler));
695 let op = Op::new(initial.startup_timeout());
696 let eva_dir = initial.eva_dir().to_owned();
697 initial.init()?;
698 if initial.is_mode_normal() {
699 let shutdown_timeout = initial.shutdown_timeout();
700 let mut stdin = tokio_fd::AsyncFd::try_from(libc::STDIN_FILENO)?;
701 tokio::spawn(async move {
702 let mut buf = [0_u8; 1];
703 let pid = std::process::id();
704 macro_rules! kill {
705 () => {
706 tokio::spawn(async move {
707 bmart::process::kill_pstree(pid, Some(shutdown_timeout), true).await;
708 });
709 svc_terminate();
710 break;
711 };
712 }
713 loop {
714 if stdin.read_exact(&mut buf).await.is_ok() {
715 if buf[0] != SERVICE_PAYLOAD_PING {
716 kill!();
717 }
718 } else {
719 kill!();
720 }
721 tokio::time::sleep(SLEEP_STEP).await;
722 }
723 });
724 if let Some(prepare_command) = initial.prepare_command() {
725 let cmd = format!("cd \"{}\" && {}", eva_dir, prepare_command);
726 let t_o = op.timeout()?.as_secs_f64().to_string();
727 let opts = bmart::process::Options::new()
728 .env("EVA_SYSTEM_NAME", initial.system_name())
729 .env("EVA_DIR", initial.eva_dir())
730 .env("EVA_SVC_ID", initial.id())
731 .env("EVA_SVC_DATA_PATH", initial.data_path().unwrap_or_default())
732 .env("EVA_TIMEOUT", t_o.as_str());
733 let res = bmart::process::command("sh", ["-c", &cmd], op.timeout()?, opts).await?;
734 if !res.ok() {
735 return Err(Error::failed(format!(
736 "prepare command failed: {}",
737 res.err.join("\n")
738 )));
739 }
740 for r in res.out {
741 println!("{}", r);
742 }
743 for r in res.err {
744 eprintln!("{}", r);
745 }
746 }
747 } else if !initial.can_rtf() {
748 return Err(Error::failed(
749 "the service is started in react-to-fail mode, but rtf disabled for the service",
750 ));
751 }
752 initial
753 .extend_config(op.timeout()?, Path::new(&eva_dir))
754 .await?;
755 launcher(initial).await
756}